概述
1、shuffle操作原理:
在Spark中,数据通常不会跨分区分布,以满足特定操作的需要。在计算期间,单
个任务将对单个分区进行操作——因此,要组织单个reduceByKey 的计算任务要执行
的所有数据,Spark需要执行一个all-to-all操作。它必须从所有分区中读取所有
键的所有值,然后将所有分区的值放在一起计算每个键的最终结果——这称为shuffle。
Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark用于重新分发数据
的机制,以便跨分区对数据进行不同的分组。这通常涉及跨执行程序和机器复制数据,
使shuffle成为一项复杂而昂贵的操作。在Spark Core中,Shuffle是划分宽窄依赖
依据Stage的依据
宽依赖:一对多 (有shuffle操作)
窄依赖:一对一 或者多对一
2、 Shuffle操作问题解决
2.1 数据倾斜原理
在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task
来进行处理,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜
(在实际生产中去null值是必须的)
2.2 数据倾斜问题发现与解决
通过Spark Web UI来查看当前运行的stage各个task分配的数据量,从而进一步确定
是不是task分配的数据不均匀导致了数据倾斜。
知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出
来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle
类算子。
通过countByKey查看各个key的分布。
2.3 数据倾斜解决方案
2.3.1 过滤少数导致倾斜的key
2.3.2 提高shuffle操作的并行度
2.3.3 局部聚合和全局聚合
案例<一>:采样倾斜key并分拆join操作(join的两表都很大,但仅一个RDD的几个key的数据量过大)
方案实现思路:
对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
而另外两个普通的RDD就照常join即可。
最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
案例<二>使用随机前缀和扩容RDD进行join(RDD中有大量的key导致数据倾斜)
方案实现思路:
将含有较多倾斜key的RDD扩大多倍,与相对分布均匀的RDD配一个随机数。
4 spark shuffle参数调优
spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的
BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入
buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:
如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),
从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,
进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer
缓冲决定了每次能够拉取多少数据。
调优建议:
如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),
从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中
发现,合理调节该参数,性能会有1%~5%的提升。
spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数
据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以
重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行
失败。
调优建议:
对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次)
,以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中
发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度
提升稳定性。
spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:
建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
spark.shuffle.memoryFraction
默认值:0.2
参数说明:
该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,
默认是20%。
调优建议:
在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议
调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合
过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
spark.shuffle.manager
默认值:sort
参数说明:
该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、
sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是
Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与
sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:
由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该
排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不
需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的
HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意
的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:
当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于
这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按
照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的
所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:
当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数
调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,
map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产
生大量的磁盘文件,因此shuffle write性能有待提高。
spark.shuffle.consolidateFiles
默认值:false
参数说明:
如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启
consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read
task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:
如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可
以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,
同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的
SortShuffleManager要高出10%~30%。
最后
以上就是神勇鸡翅为你收集整理的shuffle原理 及优化策略的全部内容,希望文章能够帮你解决shuffle原理 及优化策略所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复