概述
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
//这里的kv是mapper输出的kv
public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
public int getPartition(Text key, FlowBean value, int numPartitions) {
//获取手机号前三位
String num = key.toString().substring(0, 3);
//设置区
int partition = 4;
// 2 判断是哪个省
if("136".equals(num)) {
partition = 0;
}else if ("137".equals(num)){
partition=1;
}else if ("138".equals(num)){
partition=2;
}else if ("139".equals(num)) {
partition = 3;
}
return partition;
}
}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowcountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//读取一行
String line = value.toString();
//切割单词
String[] fields = line.split("t");
//包装
k.set(fields[1]);
long upflow = Long.parseLong(fields[fields.length-3]);
long downflow = Long.parseLong(fields[fields.length-2]);
v.set(upflow,downflow);
context.write(k,v);
}
}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowcountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
FlowBean v = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sumupflow = 0;
long sumdownflow = 0;
//累计
for (FlowBean value : values) {
sumupflow += value.getUpflow();
sumdownflow += value.getDownflow();
}
v.set(sumupflow,sumdownflow);
context.write(key,v);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args =new String[] {"f:/input", "f:/output413"};
//获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置jar包
job.setJarByClass(FlowcountDriver.class);
//关联mapper和reducer
job.setMapperClass(FlowcountMapper.class);
job.setReducerClass(FlowcountReducer.class);
//map输出的key和value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//最终输出的k和v
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定自定义数据分区
job.setPartitionerClass(ProvincePartitioner.class);
//同时指定相应的reduceTask
job.setNumReduceTasks(5);
//文件输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
最后
以上就是健忘草莓为你收集整理的Partition分区案例实操的全部内容,希望文章能够帮你解决Partition分区案例实操所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复