概述
大数据小白一个。在使用mapreduce处理公司实际业务的过程中,有个mapreduce需要用到自定义类型,打包运行时,却遇到空指针NullPointerException异常,耽误了好长时间才找出问题的根源,特以此博客记录,留作学习使用。
场景:从hbase的一张表(activity_statistics)读取数据, 进行处理后, 写入另一张hbase表(activity_scores),mapper阶段的输出使用自定义类型UserActivityScore。
先说解决方法:使用自定义类型,需实现WritableComparable接口,除了要重写 write 、 readFields方法,还要有无参的构造方法,并对自定义类型中的参数进行初始化,本人就是定义了变量但是没有进行初始化,从而导致了空指针异常,耽误了好长时间。下面直接上代码。
GradeRunnder.java
package com.dxyun.dxdp.activity.scores.runner;
import com.dxyun.dxdp.activity.scores.entity.UserActivityScore;
import com.dxyun.dxdp.activity.scores.mapper.GradeMapper;
import com.dxyun.dxdp.activity.scores.reducer.GradeReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GradeRunner implements Tool {
private Configuration conf;
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(conf, "activity_grade");
job.setJarByClass(GradeRunner.class);
//args[0]:要读取的hbase表
mapper输出的key、value类型分别为Text 和 自定义类型UserActivityScore
TableMapReduceUtil.initTableMapperJob(args[0], new Scan(), GradeMapper.class, Text.class, UserActivityScore.class, job);
//args[1]:要写入的hbase表
TableMapReduceUtil.initTableReducerJob(args[1], GradeReducer.class, job);
boolean flag = job.waitForCompletion(true);
return flag ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
......此处省略mapreduce参数配置
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
public static void main(String[] args) {
try {
int flag = ToolRunner.run(new GradeRunner(), args);
System.exit(flag ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
UserActivityScorepackage com.dxyun.dxdp.activity.scores.entity; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class UserActivityScore implements WritableComparable<UserActivityScore> { private String userID; private String gmf_sj; private String gmf_email; private String scorePerYear; private String scorePerMonth; private String scorePerHourPeriod; private String scoreLast7days; private String scoreLast30days; private String scoreLast1year; //无参构造方法 public UserActivityScore() { //变量初始化,一定要有,不然空指针异常,本人就吃了没有初始化的亏。 this.userID = ""; this.gmf_sj = ""; this.gmf_email = ""; this.scorePerYear = ""; this.scorePerMonth = ""; this.scorePerHourPeriod = ""; this.scoreLast7days = ""; this.scoreLast30days = ""; this.scoreLast1year = ""; } public String getUserID() { return userID; } public String getGmf_sj() { return gmf_sj; } public String getGmf_email() { return gmf_email; } public String getScorePerYear() { return scorePerYear; } public String getScorePerMonth() { return scorePerMonth; } public String getScorePerHourPeriod() { return scorePerHourPeriod; } public String getScoreLast7days() { return scoreLast7days; } public String getScoreLast30days() { return scoreLast30days; } public String getScoreLast1year() { return scoreLast1year; } public void setUserID(String userID) { this.userID = userID; } public void setGmf_sj(String gmf_sj) { this.gmf_sj = gmf_sj; } public void setGmf_email(String gmf_email) { this.gmf_email = gmf_email; } public void setScorePerYear(String scorePerYear) { this.scorePerYear = scorePerYear; } public void setScorePerMonth(String scorePerMonth) { this.scorePerMonth = scorePerMonth; } public void setScorePerHourPeriod(String scorePerHourPeriod) { this.scorePerHourPeriod = scorePerHourPeriod; } public void setScoreLast7days(String scoreLast7days) { this.scoreLast7days = scoreLast7days; } public void setScoreLast30days(String scoreLast30days) { this.scoreLast30days = scoreLast30days; } public void setScoreLast1year(String scoreLast1year) { this.scoreLast1year = scoreLast1year; } @Override public int compareTo(UserActivityScore o) { return 0; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(userID); out.writeUTF(gmf_sj); out.writeUTF(gmf_email); out.writeUTF(scorePerYear); out.writeUTF(scorePerMonth); out.writeUTF(scorePerHourPeriod); out.writeUTF(scoreLast7days); out.writeUTF(scoreLast30days); out.writeUTF(scoreLast1year); } @Override public void readFields(DataInput in) throws IOException { this.userID = in.readUTF(); this.gmf_sj = in.readUTF(); this.gmf_email = in.readUTF(); this.scorePerYear = in.readUTF(); this.scorePerMonth = in.readUTF(); this.scorePerHourPeriod = in.readUTF(); this.scoreLast7days = in.readUTF(); this.scoreLast30days = in.readUTF(); this.scoreLast1year = in.readUTF(); } }
GradeMapper.java
GradeReducer.javapackage com.dxyun.dxdp.activity.scores.mapper; import com.dxyun.dxdp.activity.scores.entity.UserActivityScore; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class GradeMapper extends TableMapper<Text, UserActivityScore> { @Override protected void map(ImmutableBytesWritable rowkey, Result value, Context context) throws IOException, InterruptedException { String userID = Bytes.toString(value.getValue("user_info".getBytes(), "userID".getBytes())); String gmf_sj = Bytes.toString(value.getValue("user_info".getBytes(), "gmf_sj".getBytes())); String gmf_email = Bytes.toString(value.getValue("user_info".getBytes(), "gmf_email".getBytes())); String invoiceNumPerYear = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerYear".getBytes())); String invoiceNumPerMonth = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerMonth".getBytes())); String invoiceNumPerHourPeriod = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumPerHourPeriod".getBytes())); String invoiceNumLast7days = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast7days".getBytes())); String invoiceNumLast30days = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast30days".getBytes())); String invoiceNumLast1year = Bytes.toString(value.getValue("user_info".getBytes(), "invoiceNumLast1year".getBytes())); //计算分数 String scorePerYear = getScores(invoiceNumPerYear); String scorePerMonth = getScores(invoiceNumPerMonth); String scorePerHourPeriod = getScores(invoiceNumPerHourPeriod); String scoreLast7days = getScoreByNum(invoiceNumLast7days); String socreLast30days = getScoreByNum(invoiceNumLast30days); String scoreLast1year = getScoreByNum(invoiceNumLast1year); UserActivityScore userActivityScore = new UserActivityScore(); if (null != userID && !(("").equals(userID))) { userActivityScore.setUserID(userID); } if (null != gmf_sj && !(("").equals(gmf_sj))) { userActivityScore.setGmf_sj(gmf_sj); } if (null != gmf_email && !(("").equals(gmf_email))) { userActivityScore.setGmf_email(gmf_email); } if (null != scorePerYear && !(("").equals(scorePerYear))) { userActivityScore.setScorePerYear(scorePerYear); } if (null != scorePerMonth && !(("").equals(scorePerMonth))) { userActivityScore.setScorePerMonth(scorePerMonth); } if (null != scorePerHourPeriod && !(("").equals(scorePerHourPeriod))) { userActivityScore.setScorePerHourPeriod(scorePerHourPeriod); } if (null != scoreLast7days && !(("").equals(scoreLast7days))) { userActivityScore.setScoreLast7days(scoreLast7days); } if (null != socreLast30days && !(("").equals(socreLast30days))) { userActivityScore.setScoreLast30days(socreLast30days); } if (null != scoreLast1year && !(("").equals(scoreLast1year))) { userActivityScore.setScoreLast1year(scoreLast1year); } //rowkey 作为key , 自定义类型userActivityScore 作为value 输出 context.write(new Text(Bytes.toString(rowkey.get())), userActivityScore); } public String getScores(String str) { ......跟业务相关,省略 return result; } public String getScoreByNum(String invoiceNum) { ......跟业务相关,省略 return invoiceNum; } }
package com.dxyun.dxdp.activity.scores.reducer; import com.dxyun.dxdp.activity.scores.entity.UserActivityScore; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class GradeReducer extends TableReducer<Text, UserActivityScore, ImmutableBytesWritable> { @Override protected void reduce(Text key, Iterable<UserActivityScore> values, Context context) throws IOException, InterruptedException { int index = 0; String userID = ""; String gmf_sj = ""; String gmf_email = ""; String scorePerYear = ""; String scorePerMonth = ""; String scorePerHourPeriod = ""; String scoreLast7days = ""; String scoreLast30days = ""; String scoreLast1year = ""; ....业务代码,用于获取mapper阶段的计算结果,省略。 Put put = new Put(key.toString().getBytes()); if (null != userID && !(("").equals(userID))) { put.addColumn("user_info".getBytes(), "userID".getBytes(), userID.getBytes()); } if (null != gmf_sj && !(("").equals(gmf_sj))) { put.addColumn("user_info".getBytes(), "gmf_sj".getBytes(), gmf_sj.getBytes()); } if (null != gmf_email && !(("").equals(gmf_email))) { put.addColumn("user_info".getBytes(), "gmf_email".getBytes(), gmf_email.getBytes()); } if (null != scorePerYear && !(("").equals(scorePerYear))) { put.addColumn("user_info".getBytes(), "scorePerYear".getBytes(), scorePerYear.getBytes()); } if (null != scorePerMonth && !(("").equals(scorePerMonth))) { put.addColumn("user_info".getBytes(), "scorePerMonth".getBytes(), scorePerMonth.getBytes()); } if (null != scorePerHourPeriod && !(("").equals(scorePerHourPeriod))) { put.addColumn("user_info".getBytes(), "scorePerHourPeriod".getBytes(), scorePerHourPeriod.getBytes()); } if (null != scoreLast7days && !(("").equals(scoreLast7days))) { put.addColumn("user_info".getBytes(), "scoreLast7days".getBytes(), scoreLast7days.getBytes()); } if (null != scoreLast30days && !(("").equals(scoreLast30days))) { put.addColumn("user_info".getBytes(), "socreLast30days".getBytes(), scoreLast30days.getBytes()); } if (null != scoreLast1year && !(("").equals(scoreLast1year))) { put.addColumn("user_info".getBytes(), "scoreLast1year".getBytes(), scoreLast1year.getBytes()); } context.write(null, put); } }
附:不进行变量初始化时的报错信息:
17/10/19 14:10:52 INFO mapreduce.Job: map 0% reduce 0% 17/10/19 14:11:00 INFO mapreduce.Job: Task Id : attempt_1507929502667_0059_m_000000_0, Status : FAILED Error: java.lang.NullPointerException at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) at com.dxyun.dxdp.activity.scores.entity.UserActivityScore.write(UserActivityScore.java:108) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98) at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82) at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1164) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:721) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.dxyun.dxdp.activity.scores.mapper.GradeMapper.map(GradeMapper.java:66) at com.dxyun.dxdp.activity.scores.mapper.GradeMapper.map(GradeMapper.java:17) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
最后
以上就是迷人烧鹅为你收集整理的mapreduce自定义类型-空指针异常之坑NullPointerException的全部内容,希望文章能够帮你解决mapreduce自定义类型-空指针异常之坑NullPointerException所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复