package com.cn.demo_outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataOutputStream fsDataOutputStream01 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\good_Comment\1.txt"));
FSDataOutputStream fsDataOutputStream02 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\bad_Comment\1.txt"));
MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream01,fsDataOutputStream02);
return myRecordWriter;
}
}
package com.cn.demo_outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream fsDataOutputStream01;
private FSDataOutputStream fsDataOutputStream02;
public MyRecordWriter() {
}
//带参构造方法接收流
public MyRecordWriter(FSDataOutputStream fsDataOutputStream01,FSDataOutputStream fsDataOutputStream02) {
this.fsDataOutputStream01 = fsDataOutputStream01;
this.fsDataOutputStream02 = fsDataOutputStream02;
}
@Override
public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
//K2 V2判断好评差评
String[] splits = text.toString().split("t");
if(Integer.parseInt(splits[9])<=1){
//利用流的写出输出文件
fsDataOutputStream01.write(text.toString().getBytes());
fsDataOutputStream01.write("rn".getBytes());
}else{
//利用流的写出输出文件
fsDataOutputStream02.write(text.toString().getBytes());
fsDataOutputStream01.write("rn".getBytes());
}
//关闭流
IOUtils.closeStream(fsDataOutputStream01);
IOUtils.closeStream(fsDataOutputStream02);
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
}
}
--------------------------------------------主程序--------------------------------------
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(MyOutPutFormat.class);
MyOutPutFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\output"));
最后
以上就是单薄山水最近收集整理的关于自定义OutputFormat的全部内容,更多相关自定义OutputFormat内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复