概述
文章目录
- 前言
- 源码
前言
上一篇查看了常见ByKey类算子的调用关系,本篇讨论JOIN类的
每种算子编程手法还是一直一样,分为三种版本:简单的、指定分区个数的、指定分区类型的。
源码
JOIN的左外、右外、全连接这些和数据库里的概念对应,应该不需要过多解释。所以本文就只看看cogroup
,而cogroup
只需要把单个其他表的情况看明白就好了。
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}
另外两个还是老手法。
def cogroup[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, new HashPartitioner(numPartitions))
}//指定分区个数的调用指定分区类型的
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
为了更好理解,还是看个例子好一些
import org.apache.spark.{SparkConf, SparkContext}
object CogroupDemo {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("CogroupDemo")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.parallelize(Array((1,"阿里巴巴"),(1,"腾讯"),(2,"普华永道"),(3,"碧桂园"),(4,"宝洁"),(4,"联合利华")))
val rdd2 = sc.parallelize(Array((1,"互联网"),(3,"地产"),(4,"快消"),(5,"教育")))
val rdd3 = rdd1.cogroup(rdd2).collect()
rdd3.foreach(println)
}
}
结果如下,可以看到根据key(id)将两个rdd的值聚合到了一个value集合中,但是这个value集合依旧分为两个CompactBuffer
(4,(CompactBuffer(宝洁, 联合利华),CompactBuffer(快消)))
(1,(CompactBuffer(阿里巴巴, 腾讯),CompactBuffer(互联网)))
(5,(CompactBuffer(),CompactBuffer(教育)))
(2,(CompactBuffer(普华永道),CompactBuffer()))
(3,(CompactBuffer(碧桂园),CompactBuffer(地产)))
在案例中特意留了些空的值,在各种连接的时候就可以根据是非为空来判断是否需要这条数据。
最后推荐看看HashJoin的原理,会对Spark的Join理解更深刻。
最后
以上就是勤劳冥王星为你收集整理的Spark源码学习之KV-RDD的常见算子(2)前言源码的全部内容,希望文章能够帮你解决Spark源码学习之KV-RDD的常见算子(2)前言源码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复