大数据小白一个。在使用mapreduce处理公司实际业务的过程中,有个mapreduce需要用到自定义类型,打包运行时,却遇到空指针NullPointerException异常,耽误了好长时间才找出问题的根源,特以此博客记录,留作学习使用。
场景:从hbase的一张表(activity_statistics)读取数据, 进行处理后, 写入另一张hbase表(activity_scores),mapper阶段的输出使用自定义类型UserActivityScore。
先说解决方法:使用自定义类型,需实现WritableComparable接口,除了要重写 write 、 readFields方法,还要有无参的构造方法,并对自定义类型中的参数进行初始化,本人就是定义了变量但是没有进行初始化,从而导致了空指针异常,耽误了好长时间。下面直接上代码。
GradeRunnder.java
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46package com.dxyun.dxdp.activity.scores.runner; import com.dxyun.dxdp.activity.scores.entity.UserActivityScore; import com.dxyun.dxdp.activity.scores.mapper.GradeMapper; import com.dxyun.dxdp.activity.scores.reducer.GradeReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class GradeRunner implements Tool { private Configuration conf; @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(conf, "activity_grade"); job.setJarByClass(GradeRunner.class); //args[0]:要读取的hbase表 mapper输出的key、value类型分别为Text 和 自定义类型UserActivityScore TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), GradeMapper.class, Text.class, UserActivityScore.class, job); //args[1]:要写入的hbase表 TableMapReduceUtil.initTableReducerJob(args[1], GradeReducer.class, job); boolean flag = job.waitForCompletion(true); return flag ? 0 : 1; } @Override public void setConf(Configuration conf) { ......此处省略mapreduce参数配置 this.conf = conf; } @Override public Configuration getConf() { return conf; } public static void main(String[] args) { try { int flag = ToolRunner.run(new GradeRunner(), args); System.exit(flag ? 0 : 1); } catch (Exception e) { e.printStackTrace(); } } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337复制代码UserActivityScore复制代码package com.dxyun.dxdp.activity.scores.entity; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class UserActivityScore implements WritableComparable<UserActivityScore> { private String userID; private String gmf_sj; private String gmf_email; private String scorePerYear; private String scorePerMonth; private String scorePerHourPeriod; private String scoreLast7days; private String scoreLast30days; private String scoreLast1year; //无参构造方法 public UserActivityScore() { //变量初始化,一定要有,不然空指针异常,本人就吃了没有初始化的亏。 this.userID = ""; this.gmf_sj = ""; this.gmf_email = ""; this.scorePerYear = ""; this.scorePerMonth = ""; this.scorePerHourPeriod = ""; this.scoreLast7days = ""; this.scoreLast30days = ""; this.scoreLast1year = ""; } public String getUserID() { return userID; } public String getGmf_sj() { return gmf_sj; } public String getGmf_email() { return gmf_email; } public String getScorePerYear() { return scorePerYear; } public String getScorePerMonth() { return scorePerMonth; } public String getScorePerHourPeriod() { return scorePerHourPeriod; } public String getScoreLast7days() { return scoreLast7days; } public String getScoreLast30days() { return scoreLast30days; } public String getScoreLast1year() { return scoreLast1year; } public void setUserID(String userID) { this.userID = userID; } public void setGmf_sj(String gmf_sj) { this.gmf_sj = gmf_sj; } public void setGmf_email(String gmf_email) { this.gmf_email = gmf_email; } public void setScorePerYear(String scorePerYear) { this.scorePerYear = scorePerYear; } public void setScorePerMonth(String scorePerMonth) { this.scorePerMonth = scorePerMonth; } public void setScorePerHourPeriod(String scorePerHourPeriod) { this.scorePerHourPeriod = scorePerHourPeriod; } public void setScoreLast7days(String scoreLast7days) { this.scoreLast7days = scoreLast7days; } public void setScoreLast30days(String scoreLast30days) { this.scoreLast30days = scoreLast30days; } public void setScoreLast1year(String scoreLast1year) { this.scoreLast1year = scoreLast1year; } @Override public int compareTo(UserActivityScore o) { return 0; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(userID); out.writeUTF(gmf_sj); out.writeUTF(gmf_email); out.writeUTF(scorePerYear); out.writeUTF(scorePerMonth); out.writeUTF(scorePerHourPeriod); out.writeUTF(scoreLast7days); out.writeUTF(scoreLast30days); out.writeUTF(scoreLast1year); } @Override public void readFields(DataInput in) throws IOException { this.userID = in.readUTF(); this.gmf_sj = in.readUTF(); this.gmf_email = in.readUTF(); this.scorePerYear = in.readUTF(); this.scorePerMonth = in.readUTF(); this.scorePerHourPeriod = in.readUTF(); this.scoreLast7days = in.readUTF(); this.scoreLast30days = in.readUTF(); this.scoreLast1year = in.readUTF(); } }
复制代码
1GradeMapper.java
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137复制代码GradeReducer.javapackage com.dxyun.dxdp.activity.scores.mapper; import com.dxyun.dxdp.activity.scores.entity.UserActivityScore; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class GradeMapper extends TableMapper<Text, UserActivityScore> { @Override protected void map(ImmutableBytesWritable rowkey, Result value, Context context) throws IOException, InterruptedException { String userID = Bytes.toString(value.getValue("user_info".getBytes(), "userID".getBytes())); String gmf_sj = Bytes.toString(value.getValue("user_info".getBytes(), "gmf_sj".getBytes())); String gmf_email = Bytes.toString(value.getValue("user_info".getBytes(), "gmf_email".getBytes())); String invoiceNumPerYear = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerYear".getBytes())); String invoiceNumPerMonth = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerMonth".getBytes())); String invoiceNumPerHourPeriod = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerHourPeriod".getBytes())); String invoiceNumLast7days = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast7days".getBytes())); String invoiceNumLast30days = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast30days".getBytes())); String invoiceNumLast1year = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast1year".getBytes())); //计算分数 String scorePerYear = getScores(invoiceNumPerYear); String scorePerMonth = getScores(invoiceNumPerMonth); String scorePerHourPeriod = getScores(invoiceNumPerHourPeriod); String scoreLast7days = getScoreByNum(invoiceNumLast7days); String socreLast30days = getScoreByNum(invoiceNumLast30days); String scoreLast1year = getScoreByNum(invoiceNumLast1year); UserActivityScore userActivityScore = new UserActivityScore(); if (null != userID && !(("").equals(userID))) { userActivityScore.setUserID(userID); } if (null != gmf_sj && !(("").equals(gmf_sj))) { userActivityScore.setGmf_sj(gmf_sj); } if (null != gmf_email && !(("").equals(gmf_email))) { userActivityScore.setGmf_email(gmf_email); } if (null != scorePerYear && !(("").equals(scorePerYear))) { userActivityScore.setScorePerYear(scorePerYear); } if (null != scorePerMonth && !(("").equals(scorePerMonth))) { userActivityScore.setScorePerMonth(scorePerMonth); } if (null != scorePerHourPeriod && !(("").equals(scorePerHourPeriod))) { userActivityScore.setScorePerHourPeriod(scorePerHourPeriod); } if (null != scoreLast7days && !(("").equals(scoreLast7days))) { userActivityScore.setScoreLast7days(scoreLast7days); } if (null != socreLast30days && !(("").equals(socreLast30days))) { userActivityScore.setScoreLast30days(socreLast30days); } if (null != scoreLast1year && !(("").equals(scoreLast1year))) { userActivityScore.setScoreLast1year(scoreLast1year); } //rowkey 作为key , 自定义类型userActivityScore 作为value 输出 context.write(new Text(Bytes.toString(rowkey.get())), userActivityScore); } public String getScores(String str) { ......跟业务相关,省略 return result; } public String getScoreByNum(String invoiceNum) { ......跟业务相关,省略 return invoiceNum; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107复制代码package com.dxyun.dxdp.activity.scores.reducer; import com.dxyun.dxdp.activity.scores.entity.UserActivityScore; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class GradeReducer extends TableReducer<Text, UserActivityScore, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<UserActivityScore> values, Context context) throws IOException, InterruptedException { int index = 0; String userID = ""; String gmf_sj = ""; String gmf_email = ""; String scorePerYear = ""; String scorePerMonth = ""; String scorePerHourPeriod = ""; String scoreLast7days = ""; String scoreLast30days = ""; String scoreLast1year = ""; ....业务代码,用于获取mapper阶段的计算结果,省略。 Put put = new Put(key.toString().getBytes()); if (null != userID && !(("").equals(userID))) { put.addColumn("user_info".getBytes(), "userID".getBytes(), userID.getBytes()); } if (null != gmf_sj && !(("").equals(gmf_sj))) { put.addColumn("user_info".getBytes(), "gmf_sj".getBytes(), gmf_sj.getBytes()); } if (null != gmf_email && !(("").equals(gmf_email))) { put.addColumn("user_info".getBytes(), "gmf_email".getBytes(), gmf_email.getBytes()); } if (null != scorePerYear && !(("").equals(scorePerYear))) { put.addColumn("user_info".getBytes(), "scorePerYear".getBytes(), scorePerYear.getBytes()); } if (null != scorePerMonth && !(("").equals(scorePerMonth))) { put.addColumn("user_info".getBytes(), "scorePerMonth".getBytes(), scorePerMonth.getBytes()); } if (null != scorePerHourPeriod && !(("").equals(scorePerHourPeriod))) { put.addColumn("user_info".getBytes(), "scorePerHourPeriod".getBytes(), scorePerHourPeriod.getBytes()); } if (null != scoreLast7days && !(("").equals(scoreLast7days))) { put.addColumn("user_info".getBytes(), "scoreLast7days".getBytes(), scoreLast7days.getBytes()); } if (null != scoreLast30days && !(("").equals(scoreLast30days))) { put.addColumn("user_info".getBytes(), "socreLast30days".getBytes(), scoreLast30days.getBytes()); } if (null != scoreLast1year && !(("").equals(scoreLast1year))) { put.addColumn("user_info".getBytes(), "scoreLast1year".getBytes(), scoreLast1year.getBytes()); } context.write(null, put); } }
附:不进行变量初始化时的报错信息:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45复制代码17/10/19 14:10:52 INFO mapreduce.Job: map 0% reduce 0% 17/10/19 14:11:00 INFO mapreduce.Job: Task Id : attempt_1507929502667_0059_m_000000_0, Status : FAILED Error: java.lang.NullPointerException at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) at com.dxyun.dxdp.activity.scores.entity.UserActivityScore.write(UserActivityScore.java:108) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1164) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:721) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.dxyun.dxdp.activity.scores.mapper.GradeMapper.map(GradeMapper.java:66) at com.dxyun.dxdp.activity.scores.mapper.GradeMapper.map(GradeMapper.java:17) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
复制代码
1
复制代码
1
复制代码
1
复制代码
1
复制代码
1
最后
以上就是迷人烧鹅最近收集整理的关于mapreduce自定义类型-空指针异常之坑NullPointerException的全部内容,更多相关mapreduce自定义类型-空指针异常之坑NullPointerException内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复