概述
RDD的缓存(持久化)
Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果 希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。
/**
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用
*/
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
存储时是有默认级别的 StorageLevel.MEMORY_ONLY (默认就是存到内存中)
useDisk: Boolean,useMemory: Boolean,useOffHeap: Boolean, deserialized:Boolean,replication: Int =1 决定了下面参数的传入方式
是否是用磁盘 是否使用内存 是否使用堆外内存 是否反序列化 副本的个数
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
}
ps:MEMORY_AND_DISK 先存储到内存中内存存储满了在存到磁盘
MEMORY_ONLY 内存只能存储内存大小的数据,超出的部分将不会再存储
因为内存中只存了一部分,少了一部分数据,这部分数据被加载时它会重新计算
堆外内存: 堆外内存是相对于对内内存而言,堆内内存是由JVM管理的,在平时java中创建对象都处于堆内内存,并且它是遵守JVM的内存管理规则(GC垃圾回收机制),那么堆外内存就是存在于JVM管控之外的一块内存,它不受JVM的管控约束缓存容易丢失,或者存储在内存的数据由于内存存储不足可能会被删掉.RDD的缓存容错机制保证了即缓存丢失也能保证正确的的计算出内容,通过RDD的一些列转换,丢失的数据会被重算,由于RDD的各个Partition是独立存在,因此只需要计算丢失部分的数据即可,并不需要计算全部的Partition
案例练习: 学科案例cache缓存(重要)
/*缓存*/
import java.net.URL
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SubjectDemo2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SubjectDemo").setMaster("local")
val sc = new SparkContext(conf)
//0.添加学科信息
val subjects =
Array("http://java.learn.com","http://ui.learn.com",
"http://bigdata.learn.com","http://h5.learn.com","http://android.learn.com")
// 1.对数据进行切分
val tuples: RDD[(String, Int)] = sc.textFile("C:\Users\Administrator\Desktop\subjectaccess\access.txt").map(line => {
val fields: Array[String] = line.split("t")
//取出url
val url = fields(1)
(url, 1)
})
//将相同url进行聚合,得到了各个学科的访问量
/*
缓存使用的场景:通常会将后期常用的数据进行缓存
特别是发生shuffle后的数据,因为shuffle过程的代价太大,所以经常在shuffle后进行缓存
cache默认是缓存到内存中,cache是transformation函数,所以需要一个action算子触发
*/
val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_).cache()
//因为学科信息已经存储到Array中
for(subject <- subjects){
//对学科信息进行过滤
val filtered: RDD[(String, Int)] = sumed.filter(_._1.startsWith(subject))
val res: Array[(String, Int)] = filtered.sortBy(_._2,false).take(3)
println(res.toList)
}
sc.stop()
}
}
checkpoint检查点机制(重要)
Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。
案例:
/*
检查点,类似于快照,chekpoint的作用就是将DAG中比较重要的数据做一个检查点,将结果存储到一个高可用的地方
*/
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkDemo").setMaster("local")
val sc = new SparkContext(conf)
sc.setCheckpointDir("hdfs://hadoop01:8020/ck")
val rdd =
sc.textFile("hdfs://hadoop01:8020/word.txt").flatMap(_.split("
")).map((_,1)).reduceByKey(_+_)
//检查点的触发一定要使用个action算子
rdd.checkpoint()
rdd.saveAsTextFile("hdfs://hadoop01:8020/out10")
println(rdd.getCheckpointFile) //查看存储的位置
/**
查看是否可以设置检查点
rdd.isCheckpointed
这个方法在shell中可以使用 但是代码中不好用
*/
}
RDD的检查点机制
cache 和 checkpoint 的区别
缓存把 RDD 计算出来然后放在内存中,但是RDD 的依赖链(相当于数据库中的redo 日志), 也不能丢掉, 当某个点某个 executor 宕了,上面cache 的RDD就会丢掉, 需要通过 依赖链重放计算出来, 不同的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存储,所以依赖链就可以丢掉了,就斩断了依赖链, 是通过复制实现的高容错。
[外链图片转存失败(img-YrlYFWoc-1565837141936)(./image/图片12.png)]
ps:当使用了checkpoint后,数据被保存到HDFS,此RDD的依赖关系也会丢掉,因为数据已经持久化到HDFS,所以不需要重新计算,无论是cache或checkpoint缓存了数据,若数据丢失是自动恢复的,checkpoint的数据需要手动清除而cache的数据是自动清除的(任务结束)
cache和persist的区别
cache源码中调用的是persist,persist更加底层的持久化,cache默认持久化等级是内存且不能修改,而persist是可以修改持久化等级,cache和persist的使用是有规则的必须在transformation或者textfile等创建一个rdd之后,直接连续调用cache()或者persist()才可以,如果先创建一个rdd,再单独另起一行执行cache()或者persist(),是没有用的,而且会报错,大量的文件会丢失
什么时候使用cache或checkpoint
1.某步骤计算特别耗时
2.计算链条特别长
3.发生shuffle之后
建议使用cache或是persist模式因为,不需要创建存储位置,并且默认存储到内存中计算速度快,而checkpoint需要手动创建存储位置和手动删除数据.若数据量非常庞大建议改用chechpoint.
最后
以上就是踏实悟空为你收集整理的Spark Core 总结1-RDD的缓存、checkpoint的全部内容,希望文章能够帮你解决Spark Core 总结1-RDD的缓存、checkpoint所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复