概述
MR之自定义分区
- 前言
- 一、如何自定义分区?
- 二、业务需求
- 二、自定义分区
- 1.CusPartition类
- 2.CusParMapper类
- 3.CusParReduce类
- 四、注意点
前言
对于MR而言,数据输出的时候是HashPartitioner分区器来进行数据的分区输出,对于这类的分区器,在某些业务情况下不满足要求,这个时候就需要自定义分区器来满足需求。
一、如何自定义分区?
需求自定义分区,则需要定义一个class类,需要继承Partitioner类,
重写getPartition放法,再getPartition方法里面实现业务需求。
二、业务需求
1、有如下电话号码的txt文本
13512312341
13612312342
13712312343
13812312344
13612341235
13712341236
13812341237
13823315644
13623345635
13723345636
13823345637
2、需求:需要将相同号码段的手机号码放到同一个文件中,比如135开头的一个文件,136开头的文件
二、自定义分区
1.CusPartition类
package com.hadoop.mapreduce.customPartition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author LengQing
* @date 2020/5/3 - 13:06
*/
public class CusPartition extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
int partition = 3;
// 获取key的前三位 135 -》0 136 -》1 137 -》2 138 -》 3
String index = key.toString().substring(0, 3);
if ("135".equals(index)){
partition = 0 ;
} else if ("136".equals(index)){
partition = 1;
} else if ("137".equals(index)){
partition = 2;
}
return partition;
}
}
2.CusParMapper类
package com.hadoop.mapreduce.customPartition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author LengQing
* @date 2020/5/3 - 13:05
*/
public class CusParMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text outputKey = new Text();
private LongWritable outputValue = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 数据字段:电话号码 用户名称 用户地址
String[] values = value.toString().split(" ");
this.outputKey.set(values[0]);
context.write(this.outputKey,this.outputValue);
}
}
3.CusParReduce类
package com.hadoop.mapreduce.customPartition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author LengQing
* @date 2020/5/3 - 13:05
*/
public class CusParReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable outputValue = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
this.outputValue.set(sum);
context.write(key, this.outputValue);
}
}
4.CusParDriver类
package com.hadoop.mapreduce.customPartition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author LengQing
* @date 2020/5/3 - 13:04
* 需求:按照电话号码将不同电话数据分到不同的文件中
* 例如:135开头 文件1
* 136开头 文件2
* 137开头 文件3
* 138开头 文件4
*/
public class CusParDriver extends Configured implements Tool {
private Configuration conf = new Configuration();
@Override
public int run(String[] args) throws Exception {
// 1 实例化Job
Job job = Job.getInstance(conf, "cusPartition");
job.setJarByClass(CusParDriver.class);
//TODO 1、Input阶段 数据格式:[90,(hadoop hive spark....)]
Path inputPath = new Path(args[0]);
FileInputFormat.setInputPaths(job, inputPath);
//TODO 2、map阶段 数据格式:[(hadoop,1),(hive1),(spark,1)....]
job.setMapperClass(CusParMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//TODO 3、shuffle 数据格式:[(hadoop,[1,1]),(hive,[1,1])....]
job.setPartitionerClass(CusPartition.class);
//job.setSortComparatorClass(null);
job.setNumReduceTasks(4);
//TODO 4、reduce阶段 数据格式:[(hadoop,2),(hive,2)....]
job.setReducerClass(CusParReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//TODO 5、output阶段 数据格式:[(hadoop,2),(hive,2)....]
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) {
try {
int status = ToolRunner.run(new CusParDriver(), args);
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
}
四、注意点
自定义分区后,我对用的输出了4个分区,135、136、137、138四个分区,对用的会输出4个文件,在设置分区数的时候需要大于等于输出的分区数,如果小于,那么对应得程序会报错,大于则多出得会以空文件补充。
最后
以上就是精明发带为你收集整理的MR之自定义分区前言一、如何自定义分区?二、业务需求二、自定义分区的全部内容,希望文章能够帮你解决MR之自定义分区前言一、如何自定义分区?二、业务需求二、自定义分区所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复