我是靠谱客的博主 自然未来,最近开发中收集的这篇文章主要介绍Spark基本知识01 二、RDD算子,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、RDD概念以及五大特性

1.概念:

   RDD是Resilient Distributed Dataset(弹性分布式数据集)的简称。

       实际上是一个数据的描述,它记录了要分析的数据所存的节点和块的大小和副本的数量等等元数据信息,当然还包括要计算的一些逻辑

      分布式的数据集,是一种数据描述,不可变,可分区,可并行计算的集合。

2.弹性的体现:

     自动的进行内存和磁盘数据存储的切换 ​

     基于Lineage的高效容错(第n个节点出错,会从第n-1个节点恢复,血统容错)

   ​ Task如果失败会自动进行特定次数的重试(默认4次)

​     Stage如果失败会自动进行特定次数的重试(可以只运行计算失败的阶段),只计算失败的数据分片

3.五大特性:

(1)一组分片:一个数据集可以分成多个分片,每个分片都会被一个计算任务处理,并决定并行计算的粒度。

(2)一个计算每个分区的函数:Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。

(3)RDD之间的依赖关系:RDD的每次转化都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算

(4)一个Partitioner:是指RDD的分区函数。一个是基于哈希的HashPatitioner,另一个是基于范围的RangePartitioner。只有对K-V类型的RDD才会有Partitioner,非K-V的RDD的Partitioner的值是None.

(5)一个列表:用于存储每个Partitioner的优先位置,方便进行任务调度。

 二、RDD算子

1.transformation、action算子

transformation:只记录计算过程

action:当碰到了action算子后,会将前面的transformation算子进行执行,将executor中的计算结果汇总到driver端,进行展示

 

2.一些常用的算子:

package scalaBase.day8
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object e1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("test1")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2),("vivi",4)))
val rdd3: RDD[(String, Int)] = rdd1.union(rdd2)
//交集
println("union: "+rdd3.collect().toBuffer)
val is: RDD[(String, Int)] = rdd1.intersection(rdd2)
println("intersection: "+is.collect().toBuffer)
val rdd4: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
println("jion: "+rdd4.collect().toBuffer)
val rdd5: RDD[(String, (Option[Int], Option[Int]))] = rdd1.fullOuterJoin(rdd2)
println("fullOuterJion: "+rdd5.collect().toBuffer)
val rdd6: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
println("leftOuterJion: "+rdd6.collect().toBuffer)
val dis: RDD[(String, Int)] = rdd3.distinct()
println("distinct: "+dis.collect().toBuffer)
val dc: Long = dis.count()
println("count: "+dc)
val sample: RDD[(String, Int)] = rdd3.sample(true,0.3)
println("sample: "+sample.collect().toBuffer)
val foreach: Unit = rdd1.foreach(_._2+"个苹果")
println("foreach: "+foreach)
val foreach2: Unit = rdd1.foreach(x=>(x._1,x._2+"个苹果"))
println("foreach2: "+foreach2)
println("foreach2 后的rdd1:"+rdd1.collect().toBuffer)
val map: RDD[(String, String)] = rdd1.map(x=>(x._1,x._2+"个香蕉"))
println("map: "+map.collect().toBuffer)
println("map后的rdd1:"+rdd1.collect().toBuffer)
val rdd7: RDD[(String, Iterable[(String, Int)])] = rdd3.groupBy(_._1)
println("groupBy: "+rdd7.collect().toBuffer)
val rdd8: RDD[(String, Iterable[Int])] = rdd3.groupByKey()
println("groupByKey: "+rdd8.collect().toBuffer)
val rdd9: RDD[(String, Int)] = rdd3.groupByKey().mapValues(_.size)
println("mapValues: "+rdd9.collect().toBuffer)
val rdd10: RDD[(String, Int)] = rdd3.reduceByKey(_+_)
println("reduceByKey: "+rdd10.collect().toBuffer)
val rdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
println("cogroup: "+rdd.collect().toBuffer)
sc.stop()
}
}

结果如下:

union: ArrayBuffer((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2), (vivi,4))
intersection: ArrayBuffer((tom,1))
jion: ArrayBuffer((tom,(1,1)), (jerry,(3,2)))
fullOuterJion: ArrayBuffer((vivi,(None,Some(4))), (tom,(Some(1),Some(1))), (jerry,(Some(3),Some(2))), (shuke,(None,Some(2))), (kitty,(Some(2),None)))
leftOuterJion: ArrayBuffer((tom,(1,Some(1))), (jerry,(3,Some(2))), (kitty,(2,None)))
distinct: ArrayBuffer((jerry,3), (jerry,2), (shuke,2), (tom,1), (vivi,4), (kitty,2))
count: 6
sample: ArrayBuffer((vivi,4))
foreach: ()
foreach2: ()
foreach2 后的rdd1:ArrayBuffer((tom,1), (jerry,3), (kitty,2))
map: ArrayBuffer((tom,1个香蕉), (jerry,3个香蕉), (kitty,2个香蕉))
map后的rdd1:ArrayBuffer((tom,1), (jerry,3), (kitty,2))
groupBy: ArrayBuffer((vivi,CompactBuffer((vivi,4))), (tom,CompactBuffer((tom,1), (tom,1))), (jerry,CompactBuffer((jerry,3), (jerry,2))), (shuke,CompactBuffer((shuke,2))), (kitty,CompactBuffer((kitty,2))))
groupByKey: ArrayBuffer((vivi,CompactBuffer(4)), (tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))
mapValues: ArrayBuffer((vivi,1), (tom,2), (jerry,2), (shuke,1), (kitty,1))
reduceByKey: ArrayBuffer((vivi,4), (tom,2), (jerry,5), (shuke,2), (kitty,2))
cogroup: ArrayBuffer((vivi,(CompactBuffer(),CompactBuffer(4))), (tom,(CompactBuffer(1),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))

 

3.sample算子:

有三个参数:

withReplacement:Boolean 表示抽出的数都是否放回,true为有放回的抽样
fraction:Double抽样比例比如,3代表30%,该值不准确,有浮动    
seed:Long=utils.random.nextLong 指定随机数全成的种子,该参数款认不传

 val sample: RDD[(String, Int)] = rdd3.sample(true,0.3)

 

4、另外一些算子


val conf: SparkConf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("myaggregate")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
val r1: collection.Map[String, Long] = rdd1.countByKey
println(r1)
val rdd2 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
val r2: RDD[(String, Int)] = rdd2.filterByRange("a", "d")
println( r2.collect.toBuffer)
val rdd3 = sc.parallelize(List((1,"a b"),(2,"c"),(3,"d e")))
println(rdd3.mapValues(_.split(" ")).collect().toBuffer)
println(rdd3.flatMapValues(_.split(" ")).collect().toBuffer)
val fold: RDD[(String, Int)] = rdd1.foldByKey(0)(_+_)
println(fold.collect().toBuffer)
val rdd4: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),2)
val unit: Unit = rdd4.foreachPartition(x=>println(x.length))
rdd4.foreachPartition(x => println(x.reduce(_ + _)))
val rdd5: RDD[String] = sc.parallelize(List("tom","lucy","vivi","tomato"),3)
val r5: RDD[(Int, String)] = rdd5.keyBy(x=>x.length)
println(r5.collect().toBuffer)
val rdd = sc.parallelize(List(("a", 1), ("b", 2),("a",3),("c",5)))
val map: collection.Map[String, Int] = rdd.collectAsMap
println(map)//map中key值必须唯一

运行结果:

Map(a -> 1, b -> 2, c -> 2)
ArrayBuffer((c,3), (d,4), (c,2), (a,1))
ArrayBuffer((1,[Ljava.lang.String;@4a5905d9), (2,[Ljava.lang.String;@1a3e5f23), (3,[Ljava.lang.String;@6293e39e))
ArrayBuffer((1,a), (1,b), (2,c), (3,d), (3,e))
ArrayBuffer((a,1), (b,4), (c,3))
5
5
40
15
ArrayBuffer((3,tom), (4,lucy), (4,vivi), (6,tomato))
Map(b -> 2, a -> 3, c -> 5)

其中flatMapValues

最后

以上就是自然未来为你收集整理的Spark基本知识01 二、RDD算子的全部内容,希望文章能够帮你解决Spark基本知识01 二、RDD算子所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部