我是靠谱客的博主 健忘草莓,最近开发中收集的这篇文章主要介绍Partition分区案例实操,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在这里插入图片描述
在这里插入图片描述

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
//这里的kv是mapper输出的kv
public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        //获取手机号前三位
        String num = key.toString().substring(0, 3);
        //设置区
        int partition = 4;
        // 2 判断是哪个省
        if("136".equals(num)) {
            partition = 0;
        }else if ("137".equals(num)){
            partition=1;
        }else if ("138".equals(num)){
            partition=2;
        }else if ("139".equals(num)) {
            partition = 3;
        }

        return partition;
    }
}

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowcountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    Text k = new Text();
    FlowBean v = new FlowBean();
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //读取一行
        String line = value.toString();
        //切割单词
        String[] fields = line.split("t");
        //包装
        k.set(fields[1]);
        long upflow = Long.parseLong(fields[fields.length-3]);
        long downflow = Long.parseLong(fields[fields.length-2]);
        v.set(upflow,downflow);
        context.write(k,v);

    }
}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowcountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    FlowBean v = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        long sumupflow = 0;
        long sumdownflow = 0;
        //累计
        for (FlowBean value : values) {
            sumupflow += value.getUpflow();
            sumdownflow += value.getDownflow();
        }
        v.set(sumupflow,sumdownflow);
        context.write(key,v);



    }
}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args =new String[] {"f:/input", "f:/output413"};
        //获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //设置jar包
        job.setJarByClass(FlowcountDriver.class);
        //关联mapper和reducer
        job.setMapperClass(FlowcountMapper.class);
        job.setReducerClass(FlowcountReducer.class);
        //map输出的key和value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //最终输出的k和v
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //指定自定义数据分区
        job.setPartitionerClass(ProvincePartitioner.class);
        //同时指定相应的reduceTask
        job.setNumReduceTasks(5);
        //文件输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);

    }
}

最后

以上就是健忘草莓为你收集整理的Partition分区案例实操的全部内容,希望文章能够帮你解决Partition分区案例实操所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部