我是靠谱客的博主 友好鸵鸟,最近开发中收集的这篇文章主要介绍Spark-Join优化之Broadcast,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

适用场景

  • 进行join中至少有一个RDD的数据量比较少(比如几百M,或者1-2G)
  • 因为,每个Executor的内存中,都会驻留一份广播变量的全量数据

Broadcast与map进行join代码示例

创建RDD

val list1 = List((jame,23), (wade,3), (kobe,24))
val list2 = List((jame,cave), (wade,bulls), (kobe,lakers))
val rdd1 = sc.makeRDD(list1)
val rdd2 = sc.makeRDD(list2)

传统的join

// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
// 结果如下
scala> rdd1.join(rdd2).collect
res27: Array[(String, (Int, String))] = Array((kobe,(24,lakers)), (wade,(3,bulls)), (jame,(23,cave)))

使用Broadcast+map的join操作

// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量
val rdd2Data = rdd2.collect()
val rdd2Bc = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
def function(tuple: (String,Int)): (String,(Int,String)) ={
for(value <- rdd2Bc.value){
if(value._1.equals(tuple._1))
return (tuple._1,(tuple._2,value._2.toString))
}
(tuple._1,(tuple._2,null))
}
// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(function(_))
//结果如下,达到了与传统join相同的效果
scala> rdd1.map(function(_)).collect
res31: Array[(String, (Int, String))] = Array((jame,(23,cave)), (wade,(3,bulls)), (kobe,(24,lakers)))

 

转载于:https://www.cnblogs.com/0xcafedaddy/p/7613200.html

最后

以上就是友好鸵鸟为你收集整理的Spark-Join优化之Broadcast的全部内容,希望文章能够帮你解决Spark-Join优化之Broadcast所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部