概述
几年前,包括最近,我看了各种书籍、教程、官网。但是真正能够把RDD、DataFrame、DataSet解释得清楚一点的、论据多一点少之又少,甚至有的人号称Spark专家,但在这一块根本说不清楚。还有国内的一些书籍,小猴真的想问一声:Are you OK?书名别再叫精通xxx技术了,请改名为 xxx技术从入门到放弃。这样可以有效避免耽误别人学习,不好吗?
大家都在告诉我们结论,但其实,小猴作为一名长期混迹于开源社区、并仍在一线大数据开发的技术人,深谙技术文化之一:
To experience | 去经历
这是我要提倡的技术文化之一。之前有人把Experience译为体验,但在小猴的技术世界里,Experience更多的是自己去经历,而不能跟团去旅游一样,那样你只能是一个外包而已,想要做到卓越,就得去经历。技术,只有去经历才会有成长。
目录
- 目录
- RDD、DataFrame、DataSet介绍
- RDD
- DataFrame
- Apache Spark 2.0 统一API
- DataSet
- Youtube视频分析案例
- 数据集
- Maven开发环境准备
- RDD开发
- DataFrame开发
- DataSet开发
- 对比RDD和DataSet的API
- 对比RDD、DataFrame、DataSet
RDD、DataFrame、DataSet介绍
我们每天都在基于框架开发,对于我们来说,一套易于使用的API太重要了。对于Spark来说,有三套API。
image-20210201000858671
分别是:
-
RDD
-
DataFrame
-
DataSet
三套的API,开发人员就要学三套。不过,从Spark 2.2开始,DataFrame和DataSet的API已经统一了。而编写Spark程序的时候,RDD已经慢慢退出我们的视野了。
但Spark既然提供三套API,我们到底什么时候用RDD、什么时候用DataFrame、或者DataSet呢?我们先来了解下这几套API。
RDD
RDD的概念
-
RDD是Spark 1.1版本开始引入的。
-
RDD是Spark的基本数据结构。
-
RDD是Spark的弹性分布式数据集,它是不可变的(Immutable)。
-
RDD所描述的数据分布在集群的各个节点中,基于RDD提供了很多的转换的并行处理操作。
-
RDD具备容错性,在任何节点上出现了故障,RDD是能够进行容错恢复的。
-
**RDD专注的是How!**就是如何处理数据,都由我们自己来去各种算子来实现。
什么时候使用RDD?
-
应该避免使用RDD!
RDD的短板
-
集群间通信都需要将JVM中的对象进行序列化和反序列化,RDD开销较大
-
频繁创建和销毁对象会增加GC,GC的性能开销较大
image-20210201231436680
Spark 2.0开始,RDD不再是一等公民
从Apache Spark 2.0开始,RDD已经被降级为二等公民,RDD已经被弃用了。而且,我们一会就会发现,DataFrame/DataSet是可以和RDD相互转换的,DataFrame和DataSet也是建立在RDD上。
DataFrame
DataFrame概念
-
DataFrame是从Spark 1.3版本开始引入的。
-
通过DataFrame可以简化Spark程序的开发,让Spark处理结构化数据变得更简单。DataFrame可以使用SQL的方式来处理数据。例如:业务分析人员可以基于编写Spark SQL来进行数据开发,而不仅仅是Spark开发人员。
-
DataFrame和RDD有一些共同点,也是不可变的分布式数据集。但与RDD不一样的是,DataFrame是有schema的,有点类似于关系型数据库中的表,每一行的数据都是一样的,因为。有了schema,这也表明了DataFrame是比RDD提供更高层次的抽象。
-
DataFrame支持各种数据格式的读取和写入,例如:CSV、JSON、AVRO、HDFS、Hive表。
-
DataFrame使用Catalyst进行优化。
-
DataFrame专注的是What!,而不是How!
DataFrame的优点
-
因为DataFrame是有统一的schema的,所以序列化和反序列无需存储schema。这样节省了一定的空间。
-
DataFrame存储在off-heap(堆外内存)中,由操作系统直接管理(RDD是JVM管理),可以将数据直接序列化为二进制存入off-heap中。操作数据也是直接操作off-heap。
DataFrane的短板
-
DataFrame不是类型安全的
-
API也不是面向对象的
Apache Spark 2.0 统一API
从Spark 2.0开始,DataFrame和DataSet的API合并在一起,实现了跨库统一成为一套API。这样,开发人员的学习成本就降低了。只需要学习一个High Level的、类型安全的DataSet API就可以了。——这对于Spark开发人员来说,是一件好事。
上图我们可以看到,从Spark 2.0开始,Dataset提供了两组不同特性的API:
-
非类型安全
-
类型安全
其中非类型安全就是DataSet[Row],我们可以对Row中的字段取别名。这不就是DataFrame吗?而类型安全就是JVM对象的集合,类型就是scala的样例类,或者是Java的实体类。
有Spark 2.0源码为证:
package object sql {
// ...
type DataFrame = Dataset[Row]
}
https://github.com/IloveZiHan/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/package.scala
也就是说,每当我们用导DataFrame其实就是在使用Dataset。
针对Python或者R,不提供类型安全的DataSet,只能基于DataFrame API开发。
什么时候使用DataFrame
DataSet
-
DataSet是从Spark 1.6版本开始引入的。
-
DataSet具有RDD和DataFrame的优点,既提供了更有效率的处理、以及类型安全的API。
-
DataSet API都是基于Lambda函数、以及JVM对象来进行开发,所以在编译期间就可以快速检测到错误,节省开发时间和成本。
-
DataSet使用起来很像,但它的执行效率、空间资源效率都要比RDD高很多。可以很方便地使用DataSet处理结构化、和非结构数据。
DataSet API的优点
image-20210201231136055
-
DataSet结合了RDD和DataFrame的优点。
-
当序列化数据时,Encoder生成的字节码可以直接与堆交互,实现对数据按需访问,而无需反序列化整个对象。
类型安全
写过Java或者C#的同学都会知道,一旦在代码中类型使用不当,编译都编译不过去。日常开发中,我们更多地是使用泛型。因为一旦我们使用非类型安全的类型,软件的维护周期一长,如果集合中放入了一些不合适的类型,就会出现严重的故障。这也是为什么Java、C#还有C++都要去支持泛型的原因。
在Spark中也会有类型安全的问题。而且,一旦在运行时出现类型安全问题,会影响整个大规模计算作业。这种作业的错误排除难度,要比单机故障排查起来更复杂。如果在运行时期间就能发现问题,这很美好啊。
DataFrame中编写SQL进行数据处理分析,在编译时是不做检查的,只有在Spark程序运行起来,才会检测到问题。
SQL | DataFrame | Dataset | |
---|---|---|---|
语法错误 | 运行时 | 编译时 | 编译时 |
解析错误 | 运行时 | 运行时 | 编译时 |
对结构化和半结构化数据的High Level抽象
例如:我们有一个较大的网站流量日志JSON数据集,可以很容易的使用DataSet[WebLog]来处理,强类型操作可以让处理起来更加简单。
以RDD更易用的API
DataSet引入了更丰富的、更容易使用的API操作。这些操作是基于High Level抽象的,而且基于实体类的操作,例如:进行groupBy、agg、select、sum、avg、filter等操作会容易很多。
性能优化
使用DataFrame和DataSet API在性能和空间使用率上都有大幅地提升。
-
DataFrame和DataSet API是基于Spark SQL引擎之上构建的,会使用Catalyst生成优化后的逻辑和物理执行计划。尤其是无类型的DataSet[Row](DataFrame),它的速度更快,很适合交互式查询。
-
由于Spark能够理解DataSet中的JVM对象类型,所以Spark会将将JVM对象映射为Tungsten的内部内存方式存储。而Tungsten编码器可以让JVM对象更有效地进行序列化和反序列化,生成更紧凑、更有效率的字节码。
通过上图可以看到,DataSet的空间存储效率是RDD的4倍。RDD要使用60GB的空间,而DataSet只需要使用不到15GB就可以了。
Youtube视频分析案例
数据集
去Kaggle下载youtube地址:
https://www.kaggle.com/datasnaek/youtube-new?select=USvideos.csv
每个字段的含义都有说明。
Maven开发环境准备
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12</scala.version>
<spark.version>3.0.1</spark.version>
</properties>
<repositories>
<repository>
<id>central</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>central</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.3</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
</build>
RDD开发
/**
* Spark RDD处理示例
*/
object RddAnalysis {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDD Process").setMaster("local[*]")
val sc = new SparkContext(conf)
// 读取本地文件创建RDD
val youtubeVideosRDD = {
sc.textFile("""E: 5.git_projectdatasetyoutube""")
}
// 统计不同分类Youtube视频的喜欢人数、不喜欢人数
// 1. 添加行号
// 创建计数器
val rownumAcc = sc.longAccumulator("rownum")
// 带上行号
youtubeVideosRDD.map(line => {
rownumAcc.add(1)
rownumAcc.value -> line
})
// 过滤掉第一行
.filter(_._1 != 1)
// 去除行号
.map(_._2)
// 过滤掉非法的数据
.filter(line => {
val fields = line.split("