我是靠谱客的博主 鲤鱼橘子,最近开发中收集的这篇文章主要介绍MapReducer——分区(7),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

MR的分区:根据Map的输出<key2  value2>进行分区
(*)默认情况下,MR的输出只有一个分区(一个分区就是一个文件)
(*)自定义分区:按照员工的部门号进行分区

MyPartitionMapper.java
package com.partition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
//                                                                 k2 部门号  v2 员工
public class MyPartitionMapper extends Mapper<LongWritable, Text, IntWritable,Emp> {
    @Override
    protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
        // 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();

        //分词
        String[] words = data.split(",");

        //生成员工对象
        Emp emp = new Emp();
        emp.setEmpno(Integer.parseInt(words[0]));
        emp.setEname(words[1]);
        emp.setJob(words[2]);
        emp.setMgr(Integer.parseInt(words[3]));
        emp.setHiredate(words[4]);
        emp.setSal(Integer.parseInt(words[5]));
        emp.setComm(Integer.parseInt(words[6]));
        emp.setDeptno(Integer.parseInt(words[7]));

        //输出员工对象  k2:部门号  v2:员工对象
        context.write(new IntWritable(emp.getDeptno()), emp);
    }
}
MyPartitionReducer.java
package com.partition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

//就是同一个部门的员工
public class MyPartitionReducer extends Reducer<IntWritable, Emp, IntWritable, Emp> {
    @Override
    protected void reduce(IntWritable k3, Iterable<Emp> v3,Context context) throws IOException, InterruptedException {
        // 直接输出
        for(Emp e:v3){
            context.write(k3, e);
        }
    }
}
MyPartitioner.java
package com.partition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
//自定义的分区规则:按照部门号进行分区            k2 部门号  v2  员工对象
public class MyPartitioner extends Partitioner<IntWritable,Emp> {
    /**
     *
     * @param intWritable k2
     * @param emp v2
     * @param numTask  分区的个数
     * @return
     */
    @Override
    public int getPartition(IntWritable intWritable, Emp emp, int numTask) {
        // 建立我们的分区规则
        // 得到该员工的部门号
        int deptNo = emp.getDeptno();
        if(deptNo == 10){
            // 放入一号分区
            return 1 % numTask;
        }else if(deptNo == 20){
            // 放入二号分区
            return 2 % numTask;
        }else {
            // 30号部门,放入零号分区
            return 3 % numTask;
        }
    }
}
MyPartitionerMain.java
package com.partition;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyPartitionerMain {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(MyPartitionerMain.class);

        job.setMapperClass(MyPartitionMapper.class);
        job.setMapOutputKeyClass(IntWritable.class); //k2 是部门号
        job.setMapOutputValueClass(Emp.class);  // v2输出就是员工对象

        //加入分区规则
        job.setPartitionerClass(MyPartitioner.class);
        //指定分区的个数
        job.setNumReduceTasks(3);

        job.setReducerClass(MyPartitionReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Emp.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

运行结果:

 

最后

以上就是鲤鱼橘子为你收集整理的MapReducer——分区(7)的全部内容,希望文章能够帮你解决MapReducer——分区(7)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部