概述
前言
MapReduce框架让我们可以专注于算法逻辑,而不需要去关注代码实现
但如果有需求,我们几乎可以自定义MR流程中的全部组件,如下
- 大框架
Mapper
,Reducer
,Runner
,还有常用到的自定义类Bean
- 分组逻辑
GroupingComparator
,分区逻辑Partitioner
,Map端预聚合逻辑Combiner
- 输入流
InputFormat
,输出流OutputFormat
自定义这些类,可以加深对MR的理解
此篇文章主要介绍第2点中提到的组件,这些自定义组件常用于调优
第1点:https://blog.csdn.net/IAmListening/article/details/89791248
第3点:https://blog.csdn.net/IAmListening/article/details/89792588
GroupingComparator
重点归纳
- 将自定义类作为key时,会发现compareTo方法和分组联系紧密
最简单的例子:compareTo的结果是0,就意味着一定属于同一组
GroupingComparator也正是继承了WritableComparator类 - 数据进入Reduce端之前,会先根据key进行排序,然后才开始分组;进行分组的时候,只会比较相邻的元素.
也就是说,如果排序规则和分组规则不统一,是没有办法正确分组的.
比如说,排序结果的key-value是1-w
,2-q
,3-w
.这时要依据value分组
首先,1-w
会和2-q
比较,结果不是同组,不进行聚合;然后,2-q
和3-w
比较,依然不是同组;3-w
是最后一对key-value,所以此时比较结束.
这样的话,1-w
是没办法和3-w
分到同一组的.也就是说,这样的分组规则是无效的
public class MooGrouping extends WritableComparator {
// 和WritableComparator接口不同,该类无法通过泛型来指定要比较的是MooBean.class.因此只能通过构造函数来指定
protected MooGrouping(){
super(MooBean.class, true);
}
@Override
// 分组时只需要比较名字,因为最终要按照名字聚合
public int compare(WritableComparable a, WritableComparable b) {
System.out.println("开始分组");
MooBean m1 = (MooBean)a;
MooBean m2 = (MooBean)b;
return m1.getName().compareTo(m2.getName());
}
}
Partitioner
重点归纳
- 分区逻辑应该尽量避免数据倾斜,常用hashcode来决定分到哪个区
- 如果将hashcode直接强转为int,可能会发生数据溢出从而出现负数
可以将强转后的数据和Integer.MAX_VALUE
进行&
运算
// 两个泛型分别对应map端输出的key和value
public class MooPartitioner extends Partitioner<MooBean, MooBean> {
@Override
public int getPartition(MooBean mooBean, MooBean mooBean2, int i) {
System.out.println("开始分区");
return mooBean.getName().hashCode() % i;
}
}
Combiner
重点归纳
-
数据在Map端被格式化为key和value,然后输出到内存.key和value在内存中以键值对的形式存储,而且会按照键的自然顺序自动排序.
需要注意的是,如果key或value是自定义对象.该对象经过Mapper,被输出到内存中时,内存中记录的并不是该对象的地址,而是该对象的属性值
拓展来说,在MR流程中的很多地方,当前对象实际上只是代表当前的属性值.属性不同,也可能使用的是同一对象 -
只有键相同的键值对,才能在combiner上聚合,因为这个聚合在分组规则(GroupingComparator)之前
Combiner位于Map端,此处先进行聚合可以减轻Reduce端的负担
但Combiner的使用一定要十分谨慎,因为Combiner是否会被调用是不确定的
也就是说,Combiner的使用与否必须不能影响到业务逻辑
public class MooCombiner extends Reducer<MooBean, MooBean, MooBean, MooBean> {
@Override
protected void reduce(MooBean key, Iterable<MooBean> values, Context context) throws IOException, InterruptedException {
System.out.println("combiner运行啦");
int sum = 0;
// 计算总成绩
for (MooBean value : values){
sum = sum + value.getScore();
}
// 分别赋值
MooBean mooBean = new MooBean();
mooBean.setName(key.getName());
mooBean.setCourse(key.getCourse());
mooBean.setScore(sum);
// 传递给Reducer
context.write(key, mooBean);
}
}
最后
以上就是现实微笑为你收集整理的用代码来理解MapReduce2_分组分区和combin优化的全部内容,希望文章能够帮你解决用代码来理解MapReduce2_分组分区和combin优化所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复