概述
package com.cn.demo_outputformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataOutputStream fsDataOutputStream01 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\good_Comment\1.txt")); FSDataOutputStream fsDataOutputStream02 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\bad_Comment\1.txt")); MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream01,fsDataOutputStream02); return myRecordWriter; } }
package com.cn.demo_outputformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class MyRecordWriter extends RecordWriter<Text, NullWritable> { private FSDataOutputStream fsDataOutputStream01; private FSDataOutputStream fsDataOutputStream02; public MyRecordWriter() { } //带参构造方法接收流 public MyRecordWriter(FSDataOutputStream fsDataOutputStream01,FSDataOutputStream fsDataOutputStream02) { this.fsDataOutputStream01 = fsDataOutputStream01; this.fsDataOutputStream02 = fsDataOutputStream02; } @Override public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException { //K2 V2判断好评差评 String[] splits = text.toString().split("t"); if(Integer.parseInt(splits[9])<=1){ //利用流的写出输出文件 fsDataOutputStream01.write(text.toString().getBytes()); fsDataOutputStream01.write("rn".getBytes()); }else{ //利用流的写出输出文件 fsDataOutputStream02.write(text.toString().getBytes()); fsDataOutputStream01.write("rn".getBytes()); } //关闭流 IOUtils.closeStream(fsDataOutputStream01); IOUtils.closeStream(fsDataOutputStream02); } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { } }
--------------------------------------------主程序--------------------------------------
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(MyOutPutFormat.class); MyOutPutFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\output"));
最后
以上就是单薄山水为你收集整理的自定义OutputFormat的全部内容,希望文章能够帮你解决自定义OutputFormat所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复