我是靠谱客的博主 单薄山水,最近开发中收集的这篇文章主要介绍自定义OutputFormat,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

package com.cn.demo_outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataOutputStream fsDataOutputStream01 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\good_Comment\1.txt"));
        FSDataOutputStream fsDataOutputStream02 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\bad_Comment\1.txt"));

        MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream01,fsDataOutputStream02);
        return myRecordWriter;
    }
}

package com.cn.demo_outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
    private FSDataOutputStream fsDataOutputStream01;
    private FSDataOutputStream fsDataOutputStream02;
    public MyRecordWriter() {
    }
    
    //带参构造方法接收流
    public MyRecordWriter(FSDataOutputStream fsDataOutputStream01,FSDataOutputStream fsDataOutputStream02) {
        this.fsDataOutputStream01 = fsDataOutputStream01;
        this.fsDataOutputStream02 = fsDataOutputStream02;
    }
    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
        //K2 V2判断好评差评
        String[] splits = text.toString().split("t");
        if(Integer.parseInt(splits[9])<=1){
            //利用流的写出输出文件
            fsDataOutputStream01.write(text.toString().getBytes());
            fsDataOutputStream01.write("rn".getBytes());
        }else{
            //利用流的写出输出文件
            fsDataOutputStream02.write(text.toString().getBytes());
            fsDataOutputStream01.write("rn".getBytes());
        }
        
        //关闭流
        IOUtils.closeStream(fsDataOutputStream01);
        IOUtils.closeStream(fsDataOutputStream02);

    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

    }
}

--------------------------------------------主程序--------------------------------------

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

job.setOutputFormatClass(MyOutPutFormat.class);
MyOutPutFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\output"));

最后

以上就是单薄山水为你收集整理的自定义OutputFormat的全部内容,希望文章能够帮你解决自定义OutputFormat所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部