概述
解决Spark数据倾斜
美团Spark性能优化
1、先用sample(false,0,x)采用key,找出倾斜的key
2、将数据集拆分成倾斜部分和不倾斜部分
3、不倾斜部分走正常路线
4、倾斜部分前面加上前缀
5、重分区 => 聚合 => 去掉前缀 => 聚合(保证数据准确)
6、如果是大表join大表,其中一个表有数据倾斜,就需要用膨胀法,将倾斜部分的key加上一个0-n的前缀,一条数据膨胀成n条,然后将另一个表的这部分key也加上相应的前缀,然后单独对这部分数据进行一次双重聚合,与不倾斜的数据进行union操作,完成聚合。
7、空值看作是特殊的key,空值多了一样用3的方法去解决。
Spark的分区
概念:
分区是RDD内部并行计算的一个计算单元,是RDD数据集的逻辑分片,分区的格式决定并行计算的粒度,分区的个数决定任务的个数。
spark分区器有三个
HashPartitioner:将key的哈希值/分区数量进行分区,HashPartitioner确定分区的方式:
partition = key.hashCode () % numPartitions
RangePartitioner:范围分区器,按照字典顺序或数字大小排序后/分区数量来分区
自定义分区器
通过实现get分区总数方法和get分区数方法,指定自定义规则的key分区方式;
使用自定义分区器创建的RDD进行复杂的聚合或join操作效率更高。
//只需要继承Partitioner,重写两个方法
class MyPartitioner(val num: Int) extends Partitioner {
//这里定义partitioner个数
override def numPartitions: Int = ???
//这里定义分区规则
override def getPartition(key: Any): Int = ???
}
3、并行度
spark作业的最大并行度=excutor个数*每个excutor的cpu core数
但spark的当前并行度取决于task数,而task数=分区数。
分区数可以通过spark.default.parallelism设置默认分区数,也可以在使用算子时显示地指定分区器和分区数量。
spark官方推荐设置分区数为最大并行度的2-3倍,这样可以保证提前计算的线程立刻被后面的task使用,并且每个task处理的数据量会更少。
最后
以上就是勤奋鼠标为你收集整理的Spark解决数据倾斜和Spark分区)的全部内容,希望文章能够帮你解决Spark解决数据倾斜和Spark分区)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复