概述
1、cache()和persist()
cache()和persist()注意问题:
1.cache()和persist()持久化单位是partition,cache()和persist()是懒执行算子,需要action算子触发执行。
2.对一个RDD使用cache或者persist之后可以赋值给一个变量,下次直接使用这个变量就是使用的持久化的数据。也可以直接对RDD进行cache或者persist不赋值给一个变量
3.如果采用第二种方式赋值给变量的话,后面不能紧跟action算子。
4.cache()和persist()的数据在当前application执行完成之后会自动清除。
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("cacheAndPersist")
conf.setMaster("local")
val sc = new SparkContext(conf)
var lines = sc.textFile("./data/persistData.txt")
//
lines.cache()
lines = lines.persist(StorageLevel.MEMORY_ONLY)
val startTime1 = System.currentTimeMillis()
val count1 = lines.count()
val endTime1 = System.currentTimeMillis()
println("count1 = "+count1+",time = "+(endTime1-startTime1)+"ms")
val startTime2 = System.currentTimeMillis()
val count2 = lines.count()
val endTime2 = System.currentTimeMillis()
println("count2 = "+count2+",time = "+(endTime2-startTime2)+"ms")
sc.stop()
}
2、checkpoint
当RDD的lineage比较长,计算较为复杂时,可以使用checkpint对RDD进行持久化,checkpoint将数据直接持久化到磁盘中
checkpoint执行流程:
1.当spark job 执行完之后会从后往前回溯,对进行checkpoint RDD进行标记
2.回溯完成之后,Spark框架会启动一个job重新计算checkpointRDD的数据
3.计算完成之后,将计算的结果直接持久化到指定的checkpoint目录中,切断RDD之间的依赖关系。
优化:对RDD进行checkpoint之前先对RDD进行cache()下,这样第三步就不用重新从头计算当前checkpointRDD的数据
def main(args: Array[String]): Unit = {
val conf =new SparkConf()
conf.setAppName("test")
conf.setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("./data/words")
val lines2 = sc.textFile("./data/words")
sc.setCheckpointDir("./data/checkpoint")
lines.checkpoint()
lines2.checkpoint()
val count = lines.count()
val count2 = lines2.count()
println(lines2.getCheckpointFile)
sc.stop()
}
最后
以上就是怕孤单花生为你收集整理的spark之缓存的全部内容,希望文章能够帮你解决spark之缓存所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复