概述
使用Hadoop进行数据处理时,有时候需要将计算结果根据不同的条件存入不同的分区。
比如:计算顾客是否回头购买的时候,会将回头了的顾客数据和未回头的顾客数据分别存入不同的表。
可以使用MultipleOutputs实现。
需要在reduce类中定义MultipleOutputs,并且重写Reducer的setup()方法和cleanup()方法。具体实例如下
public static class PeriodReduce extends Reducer<TextPair,Text,NullWritable,Text>{
private MultipleOutputs<NullWritable, Text> mos;
protected void setup(Context context)throws IOException,InterruptedException{
mos = new MultipleOutputs<NullWritable, Text>(context);
}
protected void cleanup(Context context)throws IOException,InterruptedException{
mos.close();
}
public void reduce(TextPair key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
Iterator<Text> it = values.iterator();
ArrayList<String> custInfo = null;
String last ="notexist";
String ret = "notexist";
while (it.hasNext()){
String line = it.next().toString();
//排序后,先到达reduce的数据是上个周期的
if(key.getId()==0){
custInfo = new ArrayList<String>();
//将上个周期的购买情况存放
custInfo.add(line);
last = "exist";
}else if(key.getId()==1){
ret = "exist";
//如果回头,则加上上个周期信息放入return目录下
if("exist".equals(last)&&custInfo.size()>0){
for(String str:custInfo){
mos.write(NullWritable.get(), new Text(line+"