概述
MapReduce运行步骤:
Map Side
- 从磁盘读取数据并分区
输入文件拆分成多个逻辑inputSplits,并把每一个inputSplits分别发送给一个单独的Mapper Task - 进行map操作
运行自定义的map业务过程 - 输出数据到缓冲区
map输出的结果并不是直接写入磁盘中,而是会先存储在一个预定义的buffer中(buffer容量够大)。buffer容量不够的话会溢写到磁盘。这会消耗一部分IO资源,影响运行时间。 - 分区、排序分组
对map输出的数据进行分区,按照key进行排序和分组 - Combiner归约(可选)
相当于本地端的reduce过程 - 合并写入磁盘
对map的最终数据进行merge之后输出到磁盘中等待shuffle进程
inputSplits:用来传给每个单独map的数据,inputSplits存储的不是数据本身,而是一个分片长度和一个记录数据位置的数组
分片与block的关系:block大小由配置决定,split大小与block大小无绝对关系。
map程序在读splits时是按文件内容一行一行读的,即使一个splits跨越了两个block。
如果split分界点位于一行的某个字节中,那么这行就已经在上一个split中被读,故可以跳过这行重新分片。
Reduce Side
- 从map端复制数据
- 对数据进行合并
- 对数据进行排序
- 进行Reduce操作
- 输出到磁盘
1、2两步为shuffle过程
调优
最简单的调优—设置Combiner
Combiner在Map端提前进行了一次Reduce处理,实现本地key的聚合,对map输出的key排序,value进行迭代。
这可减少Map Task中间输出的结果,从而减少各个Reduce Task的远程拷贝时间,最终表现为Map Task和Reduce Task执行时间缩短。
选择合理的Writable类型
为应用程序处理的数据选择合适的Writable类型可大大提升性能。
Map side tuning
InputFormat
这是map阶段的第一步,从磁盘读取数据并切片,每个分片由一个map task处理
当输入的是海量的小文件的时候,会启动大量的map task,效率及其之慢,有效的解决方式是使用CombineInputFormat自定义分片策略对小文件合并处理,从而减少map task的数量,减少map过程使用的时间。
Buffer
map side中将结果输出到磁盘之前的一个处理方式,通过对其进行设置的话可以减少map任务的IO开销,从而提高性能。
当map产生的数据非常大的时候,如果默认的buffer大小不够看,势必引起非常多次的spill,产生大量的磁盘开销。这时候可以把io.sort.mb调大,那么map在整个计算过程中spill的次数就势必会降低。
Merge
该阶段是在map产生溢写后,对spill进行处理的过程,通过对其进行配置也可以达到优化IO开销的过程。
map产生spill之后必须将这些spill进行合并,这个过程叫做merge
merge过程是并行度处理spill的,每次并行多少个spill是由参数io.sort.factor指定的,默认为10个。
Combine
我们知道在map side设置了Combiner,则会根据设定的函数对map输出的数据进行一次类Reduce的预处理,但是Conbine发生的阶段可能是在merge之前,也可能是在merge之后。
例如:产生的spill非常多,虽然我们可以通过merge阶段的io.sort.factor进行优化配置,但是在此之前我们也可以通过先执行combine对结果进行处理之后再对数据进行merge。这样一来,到merge阶段的数据量将会进一步减少,IO开销也会被降到最低。
输出中间数据到磁盘
map side的最后一个步骤,在这个步骤也可以通过压缩选项的配置来得到任务的优化。
通常老说,想要达到比较平衡的CPU和磁盘压缩比,LzoCodec比较合适,但也要取决于job的具体情况。
Map Side tuning总结
从上面几点来看,map的瓶颈都是在频繁的IO操作造成的,所有的优化也都是在针对IO进行的,而优化的瓶颈又很大程度上被机器的配置等外部因素所限制
Reduce side tuning
shuffle
由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的
为了优化reduce的执行时间,hadoop中等第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据
在这个shuffle过程中,由于map的数量通常是很多个的,而每个map中又都有可能包含每个reduce所需要的数据
所以对于每个reduce来说,去各个map中拿数据也是并行的,可以通过mapred.reduce.parallel.copies这个参数来调整,默认为5
当map数量很多的时候,就可以适当调大这个值,减少shuffle过程使用的时间
还有一种情况是:reduce从map中拿数据的时候,有可能因为中间结果丢失、网络等其他原因导致map任务失败
而reduce不会因为map失败就永无止境的等待下去,它会尝试去别的地方获得自己的数据(这段时间失败的map可能会被重跑)
所以设置reduce获取数据的超时时间可以避免一些因为网络不好导致无法获得数据的情况
mapred.reduce.copy.backoff,默认300s
一般情况下不用调整这个值,因为生产环境的网络都是很流畅的
Merge
reduce是并行将map结果下载到本地,所以也是需要merge的,下载下来的数据也是存入一个buffer而不是马上存入磁盘,所以我们同样可以控制这个值来减少IO开销
Reduce
之前说过Reduce端的buffer,默认情况下,数据达到阈值的时候会溢写到磁盘,然后Reduce会从磁盘中获得到所有的数据
也就是说,buffer和Reduce是没有直接关系的,中间多了个写磁盘->读磁盘的过程,有了这个弊端,可以通过参数来配置,使得buffer中的一部分数据可以直接输送到Reduce,从而减少IO开销:mapred.job.reduce.input.buffer.percent,默认为0.0
当这个值大于0的时候,会保留指定比例的内存读buffer中的数据直接拿给Reduce使用
Reduce side tuning 总结
和map阶段差不多,Reduce节点的调优也是主要集中在加大内存使用量,较少IO,增大并行数。
MapReduce 调优总结
一个原则:
减少数据的传输量
尽量使用内存
减少磁盘IO的次数
增大任务并行数
在此附上MapReduce工作流程图,标红部分为可优化部分。(绝对原创)
最后
以上就是会撒娇斑马为你收集整理的MapReduce原理及性能调优的全部内容,希望文章能够帮你解决MapReduce原理及性能调优所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复