概述
1、transformation和Action算子的介绍。
transformation操作会针对已有的RDD创建一个新的RDD;
而action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且把结果返回给Driver程序。
transformation的特点就是lazy(懒)特性。lazy特性指的是,如果一个Spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行。也就是说,transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当执行了Action操作,那么所有的transformation才会执行。Spark通过这个lazy特性,来进行底层的Spark应用执行的优化,避免产生过多中间结果。
action操作的执行,会触发一个spark job的执行,从而触发这个action之前所有的transformation的执行。
2、RDD持久化详解:
不使用持久化的执行过程:在Spark应用程序中,如果对某个RDD,后面进行多次transformation或者是action操作,那么,可能每次都要重新计算一个RDD,那么就会反复消耗大量的时间,从而大大降低了spark应用的整体性能。
使用RDD的执行过程:一个RDD,在要求执行多次操作的时候,只有在执行第一次计算时,才会进行计算,但是此后对这个RDD所做的操作,都是其针对缓存的!第一次操作完,会把操作的结果缓存在内存中,那么我们就不需要多次计算同一个RDD,从而在很多场景下,可以大幅度的提升我们spark应用程序的性能,官方文档说,合理使用RDD持久化机制,甚至可以提升spark应用程序的性能十倍。
Spark的持久化原理:
要持久化一个RDD,只要调用期cache()或者是persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存到每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。
cache()和persist()的主要区别在于,cache()是persisit()的一种简化方式,cache()底层就是调用的persisit()的无参版本,同时就是调用persisit(MEMEORY_ONLY),将数据持久化到内存中。如果需要从内存中清除缓存,那么可以使用unpersisit()方法。
Spark自己也会在shuffle操作时,进行数据的持久化,比如写入磁盘,主要是为了再借点失败时,避免需要重新计算整个过程。
下面是一个RDD的体现:
package spark.stady.two;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.awt.*;
/**
* ClassName Persisit
* Description RDD的缓存机制
* @author Vincent
* @version 1.0
* @data 2019/1/21 11:24
*/
public class Persisit {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Persisit")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//cache缓存 : 或者persisit()的使用,是有规则的,
//必须在transformation或者textfile等创建一个RDD之后,直接连续调用,cache()或者persisit()才可以
//如果先创建一个RDD,然后单独另起一行执行cache()或者persist()方法,是没有用的
//而且,会报错,大量的文件丢失。
JavaRDD<String> lines = sc.textFile("C://Users//SYG//Desktop//spark.txt")
.cache();
//.persist(StorageLevel.MEMORY_AND_DISK()); persist的使用方式。
// 默认的cache()使用的MEMORY_ONLY这个持久化的策略。
long beginTime = System.currentTimeMillis();
long count = lines.count();
System.out.println(count);
long endTime = System.currentTimeMillis();
System.out.println("cost " + (endTime - beginTime) + " millineseconds;");
beginTime = System.currentTimeMillis();
count = lines.count();
System.out.println(count);
endTime = System.currentTimeMillis();
System.out.println("cost " + (endTime - beginTime) + " millineseconds;");
sc.close();
}
}
输出结果:
///这是对一个80MB文本文件的一个操作;
//不加cache()的操作:
第一次输出:483345 cost 645 millineseconds;
第二次输出:483345 cost 374 millineseconds;
//加cache()的操作:
第一次输出:483345 cost 1174 millineseconds;
第二次输出:483345 cost 47 millineseconds;
明显第二次比第一次要快上很多,这就是RDD的缓存机制。
RDD的持久化策略:
MEMORY_ONLY 级别:以非序列化的java对象的方式持久化在JVM内存中,如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次使用到的时候,重新被计算。
MEMORY_AND_DISK级别:同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用到这些partition的时候,从磁盘中读取。RY_AND_DISK级别:同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用到这些partition的时候,从磁盘中读取。
MEMORY_ONLY_SER级别:同MEMORY_ONLY,但是会使用java序列化方式,将java对象序列化以后持久化,可以减少内存的开销,但是在使用的时候需要反序列化,因此会增大CPU的开销
MEMORY_AND_DISK_SER级别:同MEMORY_AND_DSK。但是使用序列化方式持久化java对象。
DISK_ONLY级别:使用非序列化java对象的方式持久化,完全存储到磁盘中。
MEMORY_ONLY_2 等等:如果是尾部加2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次就行计算,只需要使用备份数据即可。
使用选择建议:
1、优先使用MEMORY_ONLY级别,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存的速度更快,而且没有序列化的操作,因此不需要使用CPU进行反序列化的操作。
2、如果MEMORY_ONLY无法缓存下来所有的数据的话,那么就是使用MEMORY_ONLY_SER,将数据进行序列化存储,纯内存的操作还是非常快的,只是需要消耗一些CPU。
3、如果需要进行快速的失败恢复,那么就选择后缀为_2的策略。进行数据的备份,这样失败时,就不需要在从新计算了。
4、能不使用DISK相关的策略。就不使用,有的时候,从磁盘读取还不如从新计算一遍来的更快。
最后
以上就是彪壮唇膏为你收集整理的Spark学习第二天----Transformation和Action算子的学习的全部内容,希望文章能够帮你解决Spark学习第二天----Transformation和Action算子的学习所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复