我是靠谱客的博主 跳跃魔镜,最近开发中收集的这篇文章主要介绍【MapReduce】基础案例 ---- 自定义Partitioner分区 ( 按手机号分区 )自定义Partitioner分区,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述


文章目录

  • 自定义Partitioner分区
    • ▪ 自定义分区基本步骤
    • ▪ 案例
      • 需求分析
      • 代码实现
        • PhoneBean封装类
        • ProvincePartitioner分区类
        • Mapper阶段
        • Reducer阶段
        • Driver阶段
      • ★ 直接利用统计好的结果


自定义Partitioner分区

▪ 自定义分区基本步骤

在这里插入图片描述

返回顶部


▪ 案例

需求分析

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(1)输入数据
在这里插入图片描述
(2)期望输出数据

手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

在这里插入图片描述

返回顶部


代码实现

PhoneBean封装类

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PhoneBean implements Writable {
    private String ip;     // ip
    private long upFlow;   // 上行流量
    private long downFlow; // 下行流量
    private long sumFlow;  // 总流量
    public PhoneBean() {
    }
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(ip);
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }
    @Override
    public void readFields(DataInput dataInput) throws IOException {
          ip = dataInput.readUTF();
          upFlow = dataInput.readLong();
          downFlow = dataInput.readLong();
          sumFlow = dataInput.readLong();
    }
    @Override
    public String toString() {
        // 方便后续切割
        return ip + "t" +upFlow + "t" + downFlow + "t" + sumFlow;
    }
    public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
     public void set(String ip1,long upFlow1,long downFlow1){
        ip = ip1;
        upFlow = upFlow1  ;
        downFlow = downFlow1;
        sumFlow = upFlow1 + downFlow1;
    }
}

返回顶部


ProvincePartitioner分区类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner <Text,PhoneBean> {
    @Override
    public int getPartition(Text text, PhoneBean phoneBean, int numPartitions) {
        // key 是手机号
        // value 是户主信息

        // 1.获取手机号前三位
        String phoneNum = text.toString().substring(0,3);
        // 2.定义分区数 注意:分区数必须从0开始
        int partition = 4;
        if ("136".equals(phoneNum)){
            partition = 0;
        } else if ("137".equals(phoneNum)){
            partition = 1;
        } else if ("138".equals(phoneNum)){
            partition = 2;
        }else if ("139".equals(phoneNum)){
            partition = 3;
        } else {
            partition = 4;
        }
        return partition;
    }
}

返回顶部


Mapper阶段

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class PhoneMapper extends Mapper<LongWritable, Text,Text,PhoneBean> {
    Text k = new Text();
    PhoneBean v = new PhoneBean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 读取一行数据
        String line = value.toString();
        // 2. 拆分
        String[] words = line.split("t");
        // 3. 封装对象
        k.set(words[1]);
        String ip = words[2];
        long upFlow = Long.parseLong(words[words.length-3]);
        long dowmFlow = Long.parseLong(words[words.length-2]);
        v.setIp(ip);
        v.setUpFlow(upFlow);
        v.setDownFlow(dowmFlow);
        // 4.写出
        context.write(k,v);
    }
}

返回顶部


Reducer阶段

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class PhoneReducer extends Reducer<Text,PhoneBean,Text,PhoneBean> {
    String ip = "";
    long sum_upFlow = 0;
    long sum_downFlow = 0;
    PhoneBean v = new PhoneBean();
    @Override
    protected void reduce(Text key, Iterable<PhoneBean> values, Context context) throws IOException, InterruptedException {
        // 1.累加求和
        for (PhoneBean phoneBean:values){
            ip = phoneBean.getIp();
            sum_upFlow += phoneBean.getUpFlow();
            sum_downFlow += phoneBean.getDownFlow();
        }
        v.set(ip,sum_upFlow,sum_downFlow);
        // 2.写出
        context.write(key,v);
        ip = "";
        sum_upFlow = 0;
        sum_downFlow = 0;
    }
}

返回顶部


Driver阶段

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PhoneDriver {
    public static void main(String[] args) {
        Job job = null;
        Configuration conf = new Configuration();
        try{
            // 获取job对象
            job = Job.getInstance(conf);
            // 配置
            job.setMapperClass(PhoneMapper.class);
            job.setReducerClass(PhoneReducer.class);
            job.setJarByClass(PhoneDriver.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(PhoneBean.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(PhoneBean.class);
            // 指定自定义数据分区
            job.setPartitionerClass(ProvincePartitioner.class);
            // 同时指定相应数量的reduce task
            job.setNumReduceTasks(5);
            // 设置输入输出路径
            FileInputFormat.setInputPaths(job,new Path("G:\Projects\IdeaProject-C\MapReduce\src\main\java\第三章_MR框架原理\Partition分区\dataset\phone_data  .txt"));
            FileOutputFormat.setOutputPath(job,new Path("G:\Projects\IdeaProject-C\MapReduce\src\main\java\第三章_MR框架原理\Partition分区\output1\"));
            // 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result?0:1);
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

在这里插入图片描述
返回顶部


★ 直接利用统计好的结果

  • 在上面的处理过程中,我们通过创建Bean类,处理了统计总流量的同时进行了分区处理。接下来我们也可以进行两次job。第一次处理统计流量,第二次处理统计好的结果,执行分区操作。

分区类

public class P extends Partitioner <Text,Text>{

    @Override
    public int getPartition(Text text, Text text2, int numPartitions) {
        // key 是手机号
        // value 是流量信息

        // 1.获取手机号前三位
        String phoneNum = text.toString().substring(0,3);
        // 2.定义分区数 注意:分区数必须从0开始
        int partition = 4;
        if ("136".equals(phoneNum)){
            partition = 0;
        } else if ("137".equals(phoneNum)){
            partition = 1;
        } else if ("138".equals(phoneNum)){
            partition = 2;
        }else if ("139".equals(phoneNum)){
            partition = 3;
        } else {
            partition = 4;
        }
        return partition;
    }
}

Mapper阶段

  • 按照制表符拆分,注意这里有的时候分隔符要注意:尽量把制表符和空格都试一遍,有时候文本格式不明显,会报错提示类似数组越界,其实就是没有拆分对。
  • 这里处理的是已经统计好总流量的数据,在分区的时候是按照key来进行的,所以我们要把电话号码封装到key中,使用split的重载方法,可以对数据按照“t”只进行一次分割这里的2其实也可以认为是分割后行程的块数。(string.split()方法对空字符的处理)
public class PM  extends Mapper<LongWritable, Text,Text,Text> {
    Text k = new Text();
    Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         // 13470253144	180	180	360
           // 1.读取一行数据
        String line = value.toString();
        // 2.拆分
        String[] fields = line.split("t",2);
        k.set(fields[0]);
        v.set(fields[1]);
        // 3.写出
        context.write(k,v);
    }
}

Reducer阶段

  • Mapper阶段已经对数据进行了拆分,封装了key,value,这里只需要读取写出即可。
public class PR extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
          for (Text value:values){
              context.write(key,value);
          }
    }
}

Driver阶段

  • 设置分区配置:分区关联的类,ReduceTask数目
    job.setPartitionerClass(P.class);
    job.setNumReduceTasks(5);
    由于分区数决定了生成多少个ReduceTask来处理对应分区的数据,所以在job中要设置ReduceTask的数目。
  • 由于思路的一点改动,所以这里的文件输入路径应当改为已经处理好的总流量统计数据。
public class PD {
    public static void main(String[] args) {
        Job job = null;
        Configuration conf = new Configuration();
        try {
            // 获取job
            job = Job.getInstance(conf);
            // 配置
            job.setMapperClass(PM.class);
            job.setReducerClass(PR.class);
            job.setJarByClass(PD.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            // 设置分区配置
            job.setPartitionerClass(P.class);
            job.setNumReduceTasks(5);
            // 输入、出路径
            FileInputFormat.setInputPaths(job,new Path("G:\Projects\IdeaProject-C\MapReduce\src\main\java\第二章_Hadoop序列化\output\part-r-00000"));
            FileOutputFormat.setOutputPath(job,new Path("G:\Projects\IdeaProject-C\MapReduce\src\main\java\第七章_MR扩展案例\自定义分区\output"));
            // 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0:1);
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

在这里插入图片描述

返回顶部


最后

以上就是跳跃魔镜为你收集整理的【MapReduce】基础案例 ---- 自定义Partitioner分区 ( 按手机号分区 )自定义Partitioner分区的全部内容,希望文章能够帮你解决【MapReduce】基础案例 ---- 自定义Partitioner分区 ( 按手机号分区 )自定义Partitioner分区所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(68)

评论列表共有 0 条评论

立即
投稿
返回
顶部