概述
在hadoop中,已将很多的Writable类封装归入org.apache.hadoop.io包中,具体继承关系如下(图片转自https://www.cnblogs.com/wuyudong/p/hadoop-writable.html):
当然还有一些类似VectorWritable、IntPairWritable等类被封装在了mahout中,有需要的可以自行搜索下载,或底下评论,在此不多阐释。
而很显然,单是这些数据类型在实际应用中往往无法满足我们的需求,这时就很容易想到自定义一个writable,以此作为key值和value值,进行mapper、reducer之间的数据传递。
- 自定义数据类型作为Value值
首先介绍简单的,作为value值进行传递,先贴一个示例:
package mySimijoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class Student implements Writable {
private Text name = new Text();
private IntWritable id = new IntWritable();
public Student() {
// TODO Auto-generated constructor stub
}
public Student(Text name, IntWritable id)
{
this.name = name;
this.id = id;
}
public IntWritable getId() {
return id;
}
public Text getName() {
return name;
}
public void setId(IntWritable id) {
this.id = id;
}
public void setName(Text name) {
this.name = name;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
name.readFields(in);
id.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
name.write(out);
id.write(out);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return name + " " + id + " ";
}
}
直接新建一个java class,将其继承Writable类,其中的成员变量自己定义,然后一定要写默认构造函数,哪怕自己已经有了类似
public Student(Text name, IntWritable id)
{
this.name = name;
this.id = id;
}
这样的构造函数,依旧要把默认构造函数写上,否则可能会出现”莫名奇怪”的错误(是可能,反正hadoop就是玄学…)
其次,如果是使用了MapWritable,那么一定一定要自己重载一个toString()方法,否则输出会是看不懂的一串英文+数字,该方法将在另一篇博文中提及,可移步https://blog.csdn.net/u013700358/article/details/80786263。
构造完自定义数据类型后,即可直接把Mapper和Reducer中的Value值对应的类型改为Student,注意每次修改时,一定要将main函数中
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Student.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Student.class);
这里的Value类进行相应修改,否则会出现mismatch的问题。
- 自定义数据类型作为Key值
接下来介绍自定义数据类型作为Key值传递。首先明确一点:若仍旧按照上述方法,会出现如下异常:
2018-06-23 18:38:55,001 WARN [org.apache.hadoop.mapred.MapTask] - Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.ClassCastException: class test.Relation
at java.lang.Class.asSubclass(Unknown Source)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:887)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1004)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
2018-06-23 18:38:55,026 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local1779626067_0001
java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :class test.Relation
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :class test.Relation
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:414)
at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: class test.Relation
at java.lang.Class.asSubclass(Unknown Source)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:887)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1004)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
具体原因就不解释了(其实是刚接触hadoop也怕误导人),主要是当作为Key值时,不能单纯地继承Writable,而要实现WritableComparable接口,具体可看如下示例:
package test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class Relation implements WritableComparable<Relation> {
private Text attr1 = new Text(), attr2 = new Text();
private IntWritable len1 = new IntWritable(), len2 = new IntWritable();
public Relation() {
// TODO Auto-generated constructor stub
}
public Relation(Text attr1, IntWritable len1, Text attr2, IntWritable len2)
{
this.attr1 = attr1;
this.len1 = len1;
this.attr2 = attr2;
this.len2 = len2;
}
public Text getAttr1() {
return attr1;
}
public Text getAttr2() {
return attr2;
}
public IntWritable getLen1() {
return len1;
}
public IntWritable getLen2() {
return len2;
}
public void setAttr1(Text attr1) {
this.attr1 = attr1;
}
public void setAttr2(Text attr2) {
this.attr2 = attr2;
}
public void setLen1(IntWritable len1) {
this.len1 = len1;
}
public void setLen2(IntWritable len2) {
this.len2 = len2;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
attr1.readFields(in);
len1.readFields(in);
attr2.readFields(in);
len2.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
attr1.write(out);
len1.write(out);
attr2.write(out);
len2.write(out);
}
@Override
public String toString() {
// TODO Auto-generated method stub
return attr1 + " " + len1 + " " + attr2 + " " + len2 + " ";
}
@Override
public int compareTo(Relation o) {
int fir = attr1.toString().compareTo(o.getAttr1().toString());
int snd = attr2.toString().compareTo(o.getAttr2().toString());
if(fir != 0)
{
return fir;
}
else if(snd != 0)
{
return snd;
}
else return 0;
}
}
这里说两点:
1. 在readFields和write函数中,一定要按序读取和写入,否则有可能把Text数据写入IntWritable中,引起不必要的误解。
2. compareTo函数中,不一样的写法将导致不同的reduce结果,如上述写法可能导致原本只有单位1个数量的relation,经过compareTo后,有fir个或snd个relation,若这不是你自己需要的,可以改为
public int compareTo(Relation o) {
int fir = attr1.toString().compareTo(o.getAttr1().toString());
int snd = attr2.toString().compareTo(o.getAttr2().toString());
if(fir != 0)
{
return 1;
}
else if(snd != 0)
{
return -1;
}
else return 0;
}
这样至少保证每个relation的数量不变,至于是否为自己想要的,可以再斟酌调整。
这样构造完,就可以将其作为Key值在mapper和reducer中传递,同理,也需要把main中对应的KeyClass做相应修改。
最后
以上就是彩色钢笔为你收集整理的玄学hadoop(二)之自定义数据类型作为Key值 或 Value值的全部内容,希望文章能够帮你解决玄学hadoop(二)之自定义数据类型作为Key值 或 Value值所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复