我是靠谱客的博主 老迟到巨人,最近开发中收集的这篇文章主要介绍MR自定义FileOutputFormat,进行输出自定义文件名,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

说明: 本文所涉及的JavaSE版本为1.8.0, IDEA版本为IntelliJ IDEA Community Edition 2018.3.5 x64。

需求:结合以前的MR案例(学生成绩二次排序),自定义FileOutputFormat和RecoreWriter,对其输出采用自定义文件名。

输入数据:

代码:

 

  1. map代码:
    package com.gcs.SelfOutputFile;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class SecSortMap  extends Mapper<LongWritable, Text, SecSortWritable, NullWritable> {
    
        SecSortWritable ssw = new SecSortWritable();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String str = value.toString();
            String[] strArr = str.split(" ");
            ssw.setName(strArr[0]);
            ssw.setCourse(strArr[1]);
            ssw.setScore(Integer.parseInt(strArr[2]));
            context.write(ssw, NullWritable.get());
        }
    }
    

     

  2. Reduce代码:
    package com.gcs.SelfOutputFile;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class SecSortReduce  extends Reducer<SecSortWritable, NullWritable, SecSortWritable, NullWritable> {
    
        @Override
        protected void reduce(SecSortWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
    

     

  3. 自定义序列化类:
    package com.gcs.SelfOutputFile;
    
    import lombok.Getter;
    import lombok.Setter;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class SecSortWritable implements WritableComparable<SecSortWritable> {
    
        private  String name;
        private  String course;
        private  int    score;
    
        public  SecSortWritable(){
    
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public void setCourse(String course) {
            this.course = course;
        }
    
    
        public void setScore(int score) {
            this.score = score;
        }
        public String getName() {
            return name;
        }
    
        public int compareTo(SecSortWritable o) {
            if (this.name.hashCode() > o.name.hashCode()){
                return -1;
            }else if (this.name.hashCode() < o.name.hashCode()){
                return  1;
            }else{
                return this.score > o.score ? -1 : 1;
            }
        }
    
        public void write(DataOutput dataOutput) throws IOException {
    
            dataOutput.writeChars(name);
            dataOutput.writeChar('');  //写String需要增加''作为结束符; writeUTF
            dataOutput.writeChars(course);
            dataOutput.writeChar('');
            dataOutput.writeInt(score);
        }
    
        public void readFields(DataInput dataInput) throws IOException {
    
            String str = "";
            char c = '';
            while ((c = dataInput.readChar()) != '') {
                str += c;
            }
    
            this.name = str;
    
            c = '';
            str = "";
            while ((c = dataInput.readChar()) != '') {
                str += c;
            }
    
            this.course = str;
            this.score = dataInput.readInt();
        }
    
        @Override
        public  String toString(){
            return this.name + "t" + this.course + "t" + this.score;
        }
    }
    

     

  4. 自定义FileOutputFormat
    package com.gcs.SelfOutputFile;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class MyOutputFormat extends FileOutputFormat<SecSortWritable, NullWritable> {
        public RecordWriter<SecSortWritable, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            RecordWriter rw = new MyRecordWriter(taskAttemptContext);
            return rw;
        }
    }
    

     

  5. 自定义RecordWriter
    package com.gcs.SelfOutputFile;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class MyRecordWriter extends RecordWriter<SecSortWritable, NullWritable> {
    
        FileSystem fs = null;
        FSDataOutputStream littleBaiFile = null;
        FSDataOutputStream othersFile = null;
        public  MyRecordWriter(TaskAttemptContext context){
            Path outputPath = FileOutputFormat.getOutputPath(context);
            try{
                //获取文件系统;
                fs = FileSystem.get(context.getConfiguration());
                littleBaiFile = fs.create(new Path(outputPath.toString() + "\littleBaiFile.txt"));
                othersFile = fs.create(new Path(outputPath.toString() + "\othersFile.txt"));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        public void write(SecSortWritable key, NullWritable nullWritable) throws IOException, InterruptedException {
            String str = key.toString() + "rn";
            byte[] bytes = str.getBytes();
            if (key.getName().equals("小白")){
                littleBaiFile.write(bytes);
            }else {
                othersFile.write(bytes);
            }
        }
    
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            if (littleBaiFile != null)
            {
                littleBaiFile.flush();
                littleBaiFile.close();
            }
    
            if (othersFile != null)
            {
                othersFile.flush();
                othersFile.close();
            }
        }
    }
    

     

  6. 主程序
package com.gcs.SelfOutputFile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SecSortRun {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"D:\data.txt","D:\studata"};
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(SecSortRun.class);

        job.setMapperClass(SecSortMap.class);
        job.setMapOutputKeyClass(SecSortWritable.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(SecSortReduce.class);
        job.setOutputValueClass(SecSortWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        
        // 指定输出格式类
        job.setOutputFormatClass(MyOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        Path path = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

 

输出:

最后

以上就是老迟到巨人为你收集整理的MR自定义FileOutputFormat,进行输出自定义文件名的全部内容,希望文章能够帮你解决MR自定义FileOutputFormat,进行输出自定义文件名所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部