概述
分区 –数据发送给哪个reduce
分箱 –结果输出到哪个目录
mapreduce分区/Partition
mapreduce默认使用HashPartitioner进行分区。
源码如下:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
从上面的源码可以看出,
1.首先根据key取hash值,相同key的hash值相同,所以key相同的键会被发送到相同的reduce。
2.然后与最大值相与(&),得到一个正整数
3.求余,余数即是该《key,value》将被发送到的reduce
HashPartitioner能够将数据均衡的发送到reduce,这已经能够满足大部分程序的需求。
一.但有时,由于发生数据倾斜
导致过多的《key,value》被发送到同一reduce,个别reduce运行极慢,严重拖后了程序运行速度。
二.还有时需要对输出的结果进行分文件保存
一个简单的例子就是分地域,分性别等等条件,将相同类别的数据输出到同一文件
重写自定义分区可以解决上述问题:
1.数据倾斜
1.1 新建类NewPartition,继承HashPartitioner
1.2重写getPartition方法
1.3 对key进行随机打乱操作,这里进行了两种操作,先把时间作为随机数和key拼接在一起,然后通过MD5Util对新key进行加密操作,这样再得出的hash值基本唯一
/**
* 重写自定义分区
* @author gaojunyu
*/
public class NewPartition extends HashPartitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String partitionKey = MD5Util.textToMD5L32(key + TimeUtil.getCurDatetime());
return (partitionKey.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
2.分文件输出
/**
* 重写自定义分区
* @author gaojunyu
*/
public class NewPartition extends HashPartitioner<Text, Text>{
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String partitionKey = MD5Util.textToMD5L32(key + TimeUtil.getCurDatetime());
//分性别输出
if ("male".equals(key.toString())) {
return 0;
}else if ("female".equals(key.toString())) {
return 1;
}
//分地域
if ("beijing".equals(key.toString())) {
return 0;
}else if ("shanghai".equals(key.toString())) {
return 1;
}else if ("guangzhou".equals(key.toString())) {
return 2;
}else if ("shenzhen".equals(key.toString())) {
return 3;
}
}
//返回值是reduce的编号
}
重写完HashPartitioner方法之后,需要在主程序中配置一下即可使用。
job.setPartitionerClass(NewPartition.class); //设置使用自己的partitioner
mapreduce分箱/MultipleOutput
分箱就是多路径输出,上面分区之后指定内容被发送到指定reduce,一个reduce产生一个part-r-xxxx的文件,尽管处理后的内容已经分文件输出,但是根据文件名字并不能直观的找到相应内容,尤其是文件相对较多时。
设置多路径输出
1.仅指定名字输出
2.多路径,指定名字输出
//main方法中
FileOutputFormat.setOutputPath(job, new Path(output));
MultipleOutputs.addNamedOutput(job, "id", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "ip", TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "final", TextOutputFormat.class, Text.class, Text.class);
id,ip为输出的文件名字
//在reduce中初始化多路径输出
public static class CombinerReduce extends Reducer<Text, Text, Text, Text> {
// 多路径输出声明
private MultipleOutputs<Text, Text> mos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, Text>(context);// 初始化mos
super.setup(context);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
super.cleanup(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//省略
//1.多路径及名字输出
//MultipleOutputs.write(String namedOutput, Text key, Text value, String baseOutputPath)
mos.write("id", new Text(id), new Text(isHigh), "id/id");
mos.write("ip", new Text(id), new Text(isHigh), "ip/ip");
//2.文件名输出
//.MultipleOutputs.write(String namedOutput, Text key, Text value)
mos.write("final", key, new Text(maxTs) );
}
}
如图:
最后
以上就是机智眼睛为你收集整理的mapreduce自定义分区及分箱的全部内容,希望文章能够帮你解决mapreduce自定义分区及分箱所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复