概述
首先要说明RDD的算子一共分为两种一种为行动算子一种为transformations算子。
依赖
RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下图所示,依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
缓存
如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。
CheckPoint
虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
RDD编程
RDD的创建:在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建。
1、从集合创建:主要提供了两个函数:parallelize和makeRDD
2、由外部存储系统的数据集创建包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,比如:sc.textFile(“”url“”)具体含义后边讲解
3、从其他RDD转换
RDD的转换(面试开发重点)API
首先其整体分为Value型和Key、Value型
Value型:
函数 | 参数说明 | 举例 |
---|---|---|
map | 略,在Scala一文中已经说过 | … |
mapPartitions(func) | 是对每一个分区进行一次map,假设有N个元素M个分区,那么map会执行N次而mapPatition则执行M次,因此其func的类型为Iterator【T】 | mapPartition |
mapPartitionsWithIndex(func) | 这个就是指定某个索引值执行func,就是比上一个多了个可以指定某个分区进行func计算 | mapPartitionsWithIndex(func) |
flatMap(func) | … | … |
glom | 将每个分区变成数组形成新的RDD类型RDD[Array【T】 | glom |
groupBy(func) | 将函数返回值相同的放入一个iterator中 | groupBy |
filter(func) | … | … |
sample(withReplacement, fraction, seed) | 抽样,第一参数为true表示放回的抽取,反之不放回抽取,第二个参数为抽样抽几个,第三个参数为随机数种子 | sample |
distinct([numTasks])) | 对Rdd进行去重操作,可以传参数来指定有几个线程来完成 | rdd.distinct(2) |
coalesce(numPartitions) | 缩减分区,由于大数据集过滤后,提高小数据执行效率 | rdd.coalesce(3) |
repartition(numPartitions) | 根据分区数,重新通过网络随机洗牌所有数据 | 他和上面的coalesce的区别 |
sortBy(func,[ascending], [numTasks]) | 将rdd用func进行排序,第二个为升序降序,第三个并行数 | sortBy |
pipe(command, [envVars]) | 管道,针对每个分区,都执行一个shell角标,返回输出的RDD | pipe |
举例:
1、mapPartition:
var sc=new SparkContext();
var rdd=sc.parallelize(Array(1,2,3,4))
rdd.mapPartition(x=>x.map(_*2))
2、mapPartitionsWithIndex(func):
创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD
var sc=new SparkContext();
var rdd=sc.parallelize(Array(1,2,3,4))
rdd.mapPartitionsWithIndex((i,its)=>its.map((_,i)))
map()和mapPartition()的区别:
- map():每次处理一条数据。
- mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
- 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
3、glom
创建一个4个分区的RDD,并将每个分区的数据放到一个数组
var sc=new SparkContext();
var rdd=sc.parallelize(1 to 16,4)
rdd.glom().collect()
4、groupBy
需求:创建一个RDD,按照元素模以2的值进行分组。
var sc=new SparkContext();
var rdd=sc.parallelize(1 to 16,4)
rdd.groupBy(i=>i%2)
5、sample
需求:创建一个RDD(1-10),从中选择放回和不放回抽样
var sc=new SparkContext();
var rdd=sc.parallelize(1 to 16,4)
rdd.sample(true,0.4,2)
6、coalesce和repartition的区别
- coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
- repartition实际上是调用的coalesce,默认是进行shuffle的。源码如下:
7、sortBy:
按照与3余数的大小排序
var sc=new SparkContext();
var rdd=sc.parallelize(1 to 16,4)
rdd.sortBy(x=>x%3).collect()
输出结果为:res12: Array[Int] = Array(3, 4, 1, 2)
8、pipe
编写一个脚本,使用管道将脚本作用于RDD上。编写一个脚本,使用管道将脚本作用于RDD上。
编写一个脚本
Shell脚本
#!/bin/sh
echo "AA"
while read LINE; do
echo ">>>"${LINE}
val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
rdd.pipe("/opt/module/spark/pipe.sh").collect()
输出结果:
Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
双Value类型交互
函数 | 说明 | 举例 |
---|---|---|
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD | union |
subtract (otherDataset) | 求差的一个函数 | subtract |
intersection(otherDataset) | 求交集 | intersection(otherDataset) |
cartesian(otherDataset) | 求笛卡尔积两个RDD的 | … |
zip(otherDataset) | 拉链,将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。 | zip |
1、union
创建两个RDD,求并集
//创建第一个RDD
val rdd1 = sc.parallelize(1 to 5)
//创建第二个RDD
val rdd2 = sc.parallelize(5 to 10)
//计算两个RDD的并集
val rdd3 = rdd1.union(rdd2)
rdd3的结果为:Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
2、subtract
计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来
val rdd = sc.parallelize(3 to 8)
val rdd1 = sc.parallelize(1 to 5)
rdd.subtract(rdd1).collect()
输出结果为: Array(8, 6, 7)
3、intersection(otherDataset)
val rdd1 = sc.parallelize(1 to 7)
val rdd2 = sc.parallelize(5 to 10)
val rdd3 = rdd1.intersection(rdd2)
rdd3.collect()
输出结果为:Array(5, 6, 7)
4、zip
val rdd1 = sc.parallelize(Array(1,2,3),3)
val rdd2 = sc.parallelize(Array("a","b","c"),3)
rdd1.zip(rdd2).collect
输出结果为:Array((1,a), (2,b), (3,c))
Key-Value类型
函数 | 参数说明 | 举例 |
---|---|---|
partitionBy | 对RDD进行重新分区,如果原有的RDD和现有的RDD是一致的话就不进行分区,否则生成shuffleRDD,即会产生shuffle | partitionBy |
groupByKey | 也是对每个key进行操作生成的序列(key,迭代器) | groupByKey |
reduceByKey(func, [numTasks]) | func(x,y)其中x,y就是相同的key所对应的两个value比如(1,x),(1,y) | reduceByKey |
aggregateByKey | zeroValue:给每一个分区中的每一个key一个初始值;seqOp:函数用于在每一个分区中用初始值逐步迭代value,可以理解为就是对每个分区执行的操作;)combOp:函数用于合并每个分区中的结果。 | aggregateByKey |
foldByKey | 是上面的简化版其中第二个参数和第三个参数一致了 | foldByKey |
combineByKey[C] | 其实他和aggregateByKey十分类似,只不过他的第一个参数可以对初始值进行改造 | combineByKey |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD | sortByKey |
mapValues | 只针对于value进行操作 | mapvalue |
join(otherDataset, [numTasks]) | 比如原始结果为a1(1,5),a2(1,9)那么进行join就变成了(1,(5,9)) | join |
cogroup(otherDataset, [numTasks]) | 其实比较类似于上面函数,只不过返回的不是元组而是迭代器 | cogroup |
1、partitionBy
需求:创建一个4个分区的RDD,对其重新分区
val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
2、groupByKey
创建一个pairRDD,将相同key对应值聚合到一个sequence中,并计算相同key对应值的相加结果。
val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val group = wordPairsRDD .groupByKey()
group.collect()
输出结果为: Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
val res2=group.map((k,v)=>(k,v.sum))
res2.collect()
输出结果:Array[(String, Int)] = Array((two,2), (one,1), (three,3))
3、reduceByKey
创建一个pairRDD,计算相同key对应值的相加结果
object Text {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("demo").setMaster("local[*]"))
val unit = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
unit.reduceByKey((x,y)=>x+y).foreach(println)
}
}
输出结果:
(b,3)
(a,5)
(c,18)
reduceByKey和groupByKey的区别:
reduceByKey会在shuffle前有combine(预聚合)操作,而groupByKey则直接shuffle,所以推荐使用reduceByKey。
4、aggregateByKey
创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加
val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
输出结果为:res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
可以将第一个参数为初始值,第二个参数为对每一个partition的相同的key进行一次func的操作,第三个参数则是下一个stage对上面所有相同的key进行一次func2操作。
5、foldByKey
创建一个pairRDD,计算相同key对应值的相加结果
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
val agg = rdd.foldByKey(0)(_+_)
6、combineByKey
用combineByKey求平均值:
object Text {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("demo").setMaster("local[*]"))
val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
var ll= input.combineByKey(score=>(score,1),
(x:(Int,Int),v)=>(x._1+v,x._2+1),
(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).foreach(println)
// ll.map((k,v)=>v._1/v._2.toDouble) 求平均值
}
}
输出结果为:
(a,(274,3))
(b,(286,3))
7、sortByKey
创建一个pairRDD,按照key的正序和倒序进行排序
val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd.sortByKey(true).collect()
rdd.sortByKey(true).collect()
输出结果:
res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
8、mapvalue
创建一个pairRDD,并将value添加字符串"|||"
val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3.mapValues(_+"|||").collect()
输出结果:
res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
9、join
需求:创建两个pairRDD,并将key相同的数据聚合到一个元组。
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd.join(rdd1).collect()
输出结果为:
Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
10、cogroup
建两个pairRDD,并将key相同的数据聚合到一个迭代器
val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd.cogroup(rdd1).collect()
输出结果为:
Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
Action算子
函数名称 | 参数解析 | 案例 |
---|---|---|
collect | 在驱动程序中以数组的形式返回所有元素 | collect |
count() | 返回rdd中元素的个数 | count |
first() | 返回RDD中的第一个元素 | first |
take(n) | 返回RDD中前n个元素组成的数组 | take() |
takeOrdered | 返回RDD中排序过后的前n个 | |
saveAsTextFile(path) | 将RDD的元素以textFile的格式保存到HDFS或者其他的文件系统中,对每个元素spark会调用toString方法,将它转换成文本 | … |
saveAsSequenceFile(path) | 将数据集中的数据已hadoop的sequencefile的格式保存在指定的目录 | … |
saveAsObjectFile(path) | 将数据集中的数据序列化成对象,存储到文件 | … |
foreach(func) | 在数据集的每一个元素上运行func | foreach |
1、collect
val rdd = sc.parallelize(1 to 10)
rdd.collect()
最终结果为:
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
2、count
val rdd = sc.parallelize(1 to 10)
rdd.count
输出结果为:
res1: Long = 10
3、first
val rdd = sc.parallelize(1 to 10)
rdd.first
输出结果为:
res2: Int = 1
4、take
val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd.take(3)
输出结果为:
Array[Int] = Array(2, 5, 4)
5、takeOrdered
val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd.takeOrdered(3)
输出结果为:
Array[Int] = Array(2, 3, 4)
6、foreach
var rdd = sc.makeRDD(1 to 5,2)
rdd.foreach(println(_))
输出结果:
3
4
5
1
2
RDD中函数和属性传递的问题:
因为我们写的对RDD的操作实际上在Driver上但是真正执行计算的是executor,所以这就涉及了跨进程通信,需要序列化,下面看几个例子:
1、首先我们创建一个类:
class Search(s:String){
//过滤出包含字符串的数据
def isMatch(s: String): Boolean = {
s.contains(query)
}
//过滤出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
//过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
}
创建spark主程序:
object SeriTest {
def main(args: Array[String]): Unit = {
//1.初始化配置信息及SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//2.创建一个RDD
val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))
//3.创建一个Search对象
val search = new Search()
//4.运用第一个过滤函数并打印结果
val match1: RDD[String] = search.getMatche1(rdd)
match1.collect().foreach(println)
}
}
执行之后:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at com.atguigu.Search.getMatche1(SeriTest.scala:39)
at com.atguigu.SeriTest$.main(SeriTest.scala:18)
at com.atguigu.SeriTest.main(SeriTest.scala)
Caused by: java.io.NotSerializableException: com.atguigu.Search
问题说明:
//过滤出包含字符串的RDD
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
在这个方法中调用了isMatch方法实际上是this.isMatch方法但是this方法就是Search对象并不能序列化所以导致了这个问题。
解决方法就是将Search对象进行序列化
class Search() extends Serializable{...}
传递一个属性:
创建Spark主程序
object TransmitTest {
def main(args: Array[String]): Unit = {
//1.初始化配置信息及SparkContext
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
//2.创建一个RDD
val rdd: RDD[String] = sc.parallelize(Array("hadoop", "spark", "hive", "atguigu"))
//3.创建一个Search对象
val search = new Search()
//4.运用第一个过滤函数并打印结果
val match1: RDD[String] = search.getMatche2(rdd)
match1.collect().foreach(println)
}
}
运行程序
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.filter(RDD.scala:386)
at com.atguigu.Search.getMatche1(SeriTest.scala:39)
at com.atguigu.SeriTest$.main(SeriTest.scala:18)
at com.atguigu.SeriTest.main(SeriTest.scala)
Caused by: java.io.NotSerializableException: com.atguigu.Search
问题说明:
/过滤出包含字符串的RDD
def getMatche2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
在这个方法中调用了query字段,其定义在Search这个类的字段,实际上调用的是this.query,this就是search这个类对象,但是其不能序列化所以报错了
解决方法:
1、将类序列化
2、因为query从属于search类,我们可以将这个query传给一个局部变量(局部变量类型可以序列化)就将search序列化问题变成了局部变量序列化问题了,而我们给定的局部变量又可以序列化。
RDD依赖关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
窄依赖
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女
宽依赖
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生
RDD缓存
RDD可以通过persist或者cache两种方式将前面的计算结果缓存,默认情况下persist会将缓存你放在JVM堆中。但是并不是调用了这两个方法就立即缓存,而是要等到行动算子。
通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
例子:
val rdd = sc.makeRDD(Array("atguigu"))
val nocache = rdd.map(_.toString+System.currentTimeMillis)
nocache.collect
结果为:res0: Array[String] = Array(atguigu1538978275359)
nocache.collect
res1: Array[String] = Array(atguigu1538978282416)
nocache.collect
res2: Array[String] = Array(atguigu1538978283199)
可以看到每次的时间戳都是不一样的所以其是重新计算的
将RDD转换为携带当前时间戳并做缓存
val cache = rdd.map(_.toString+System.currentTimeMillis).cache
结果为:
cache.collect
res3: Array[String] = Array(atguigu1538978435705)
scala> cache.collect
res4: Array[String] = Array(atguigu1538978435705)
scala> cache.collect
res5: Array[String] = Array(atguigu1538978435705)
就可以看到多次打印的结果一致
RDD的checkPoint
在spark中除了持久化操作以外,还提供了一种检查点机制,检查点实际上就是将血缘关系以二进制的方式存储到disk中,之后RDD就会切断血缘,从而减小了容错成本,因为不用在从新计算了而是从检查点开始。我们可以通过sparkContext.setCheckPointDir()来设置目录。当然对某个RDD进行了检查点并不会马上执行而是要等行动算子。
例子:
sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")
val rdd = sc.parallelize(Array("atguigu"))
val ch = rdd.map(_+System.currentTimeMillis)
ch.collect
res56: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res57: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res58: Array[String] = Array(atguigu1538981860504)
键值对RDD分区器
Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
注意:
(1)只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
- Hash分区
- Ranger分区:
HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。
RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
- 自定义分区:要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。
(1)numPartitions: Int:返回创建出来的分区数。
(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。
举例:需求:将相同后缀的数据写入相同的文件,通过将相同后缀的数据分区到相同的分区并保存输出来实现。
val data = sc.parallelize(Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6)))
定义一个自定义分区类
class CustomerPartitioner(numParts:Int) extends org.apache.spark.Partitioner{
//覆盖分区数
override def numPartitions: Int = numParts
//覆盖分区号获取函数
override def getPartition(key: Any): Int = {
val ckey: String = key.toString
ckey.substring(ckey.length-1).toInt%numParts
}
}
val par = data.partitionBy(new CustomerPartitioner(2))
数据读取与保存
spark的数据保存可以从这两个维度来看:一个是保存的文件格式:text、json、csv等另一个是保存的文件系统:本地系统、hdfs、hbase还是数据库?
文件类数据读取与保存
1.text文件
读取:
sc.textFile("hdfs://hadoop102:9000/fruit.txt")
写入:
sc.saveAsTextFile("fruat")
- Json文件
如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。
import scala.util.parsing.json.JSON
val json = sc.textFile("/people.json")
val result = json.map(JSON.parseFull)
result.collect
3.Sequence文件
SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile[ keyClass, valueClass]。
val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq.collect
result:
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))
4.对象文件:
对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v] 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。
val rdd = sc.parallelize(Array(1,2,3,4))
rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
val objFile = sc.objectFile[Int]("file:///opt/module/spark/objectFile")
objFile.collect
result:
res19: Array[Int] = Array(1, 2, 3, 4)
4.2文件系统类数据读取与保存
1.HDFS
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数.
1)输入格式(InputFormat): 制定数据输入的类型,如TextInputFormat等,新旧两个版本所引用的版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
2)键类型: 指定[K,V]键值对中K的类型
3)值类型: 指定[K,V]键值对中V的类型
4)分区值: 指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits
注意:其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.例如,对于textFile而言,只有path这个指定文件路径的参数,其他参数在系统内部指定了默认值。
1.在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
2.如果用Spark从Hadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了
2.MySQL数据库连接
Mysql读取:
package com.atguigu
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD {
def main(args: Array[String]): Unit = {
//1.创建spark配置信息
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//2.创建SparkContext
val sc = new SparkContext(sparkConf)
//3.定义连接mysql的参数
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/rdd"
val userName = "root"
val passWd = "000000"
//创建JdbcRDD
val rdd = new JdbcRDD(sc, () => {
Class.forName(driver)
DriverManager.getConnection(url, userName, passWd)
},
"select * from `rddtable` where `id`>=?;",
1,
10,
1,
r => (r.getInt(1), r.getString(2))
)
//打印最后结果
println(rdd.count())
rdd.foreach(println)
sc.stop()
}
}
Mysql写入:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
val sc = new SparkContext(sparkConf)
val data = sc.parallelize(List("Female", "Male","Female"))
data.foreachPartition(insertData)
}
def insertData(iterator: Iterator[String]): Unit = {
Class.forName ("com.mysql.jdbc.Driver").newInstance()
val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000")
iterator.foreach(data => {
val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
ps.setString(1, data)
ps.executeUpdate()
})
}
3.HBase数据库
略,因为这个数据库的和HBASE的我们可以用sparkSql来进行操作
累加器
累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
自定义累加器:
举例:
package com.atguigu.spark
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
class LogAccumulator extends org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] {
private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
override def isZero: Boolean = {
_logArray.isEmpty
}
override def reset(): Unit = {
_logArray.clear()
}
override def add(v: String): Unit = {
_logArray.add(v)
}
override def merge(other: org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]]): Unit = {
other match {
case o: LogAccumulator => _logArray.addAll(o.value)
}
}
override def value: java.util.Set[String] = {
java.util.Collections.unmodifiableSet(_logArray)
}
override def copy():org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] = {
val newAcc = new LogAccumulator()
_logArray.synchronized{
newAcc._logArray.addAll(_logArray)
}
newAcc
}
}
// 过滤掉带字母的
object LogAccumulator {
def main(args: Array[String]) {
val conf=new SparkConf().setAppName("LogAccumulator")
val sc=new SparkContext(conf)
val accum = new LogAccumulator
sc.register(accum, "logAccum")
val sum = sc.parallelize(Array("1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => {
val pattern = """^-?(d+)"""
val flag = line.matches(pattern)
if (!flag) {
accum.add(line)
}
flag
}).map(_.toInt).reduce(_ + _)
println("sum: " + sum)
for (v <- accum.value) print(v + "")
println()
sc.stop()
}
}
广播变量(调优策略)
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。 在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。
使用广播变量的过程如下:
(1) 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。 任何可序列化的类型都可以这么实现。
(2) 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。
(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
最后
以上就是甜蜜冷风为你收集整理的Spark算子API解析RDD中函数和属性传递的问题:RDD依赖关系RDD缓存RDD的checkPoint键值对RDD分区器数据读取与保存累加器的全部内容,希望文章能够帮你解决Spark算子API解析RDD中函数和属性传递的问题:RDD依赖关系RDD缓存RDD的checkPoint键值对RDD分区器数据读取与保存累加器所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复