概述
这节课继续讲spark-sql
上节课讲了什么是SQL(引出了它的特征,在分布式下解析的过程)、什么是hive在大数据里面(spark-sql也是趋向于和hive整合的),上节课讲的是理论,这节课从代码开始。
在讲spark-core的时候有一个上下文的类是spark context可以维护创建RDD,提交作业的过程,sql靠的包装外挂的东西把它丰富起来了,尤其dataSet对上层RDD的包裹。dataSet最终要转换会RDD才能触发作业。
sql 字符串 -> dataset 对rdd的一个包装(优化器)->只有RDD才能触发DAGScheduler
sql是字符串,必须有一个载体,最终要转换成对象。这个对象用的不是RDD而是DataSet来映射的那些语句和逻辑,最终还是要转换成Rdd,只有Rdd才能触发DAGScheduler。sql里的关键字,所谓的表,操作,函数要转换成操作符,有的是函数,有的需要一种模型来承载,是简单的RDD不能支起的,所以起了一个dataSet,是对RDD简单模型的包装,里面融入了优化器。
变化:
现在是dataset,而在spark context里没有dataset,所以来了spark session,通过builder() 构造器,再往下可以放入config() 把曾经的conf放在这个位置,也可以直接点appName()点master()追加,enableHiveSupport() 开启这个选项是spark-sql on hive,只有开启这个才支持DDL创建table的操作,没开启spark只有catalog没有metastore元数据存储,不支持DDL创建,只能数据加载过来之后编程dataframe进行操作,最后getOrCreate()得到一个sparkSession,未来主要用来对着session去操作sql的。
val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val session: SparkSession = SparkSession
.builder()
.config(conf)
//.appName("test")
//.master("local[*]")
//.enableHiveSupport() //开启这个选项 spark-sql on hive,只有开启这个才支持DDL创建table的操作,没开启spark只有catalog
.getOrCreate()
直接面向数据的时候怎么写SQL明白了之后,这时候更期望的是别每次都手工加载数据创建schema复杂的过程,希望全公司所有人都在hive的meta store上,下次再用spark sql的时候就直接写sql,什么都不干了。这是过度的过程。这时候可以体会到catalog的重要性,是sparksql对接的位置。一定要明白,meta store是存的,catalog是编目、目录的意思。
这个时候以前的spark context哪里去了,还可有session.sparkContext拿回以前的sc,再去走以前rdd的api操作,都可以。未来拿sc主要做一个事情,sc.setLogLevel(“ERROR”),设置一下提示级别,先不用他去得到rdd
val sc: SparkContext = session.sparkContext
sc.setLogLevel("ERROR")
感受一下session
思考:为什么要提出session的概念?session在Java里是一个会话
以session为主的操作演示,准备一个文件,里面是一些数据,每一行都是规整的json字符串
{"name":"zhangsan","age":20}
{"name":"lisi"}
{"name":"wangwu","age":18}
{"name":"a","age":21}
{"name":"zhangsan1","age":20}
. . . . . . .
可以通过session.read,现在没有hive和元数据,只能面向纯数据了,json文件,如果把json文件放在代码里,最终写出sql来,session.read.json("/in/json"),这时候返回值是一个DataFrame,DataFrame是什么,点进去看是一个类型,它就和别名一样,本质是DataSet只不过里面元素的类型是ROW类型。
//以session为主的操作演示
//DataFrame Dataset[Row]
val df: DataFrame = session.read.json("in/json")
DataFrame/DataSet也是数据集,和之前的rdd数据集有什么区别?
以前rdd上一定要调用一个foreach/count/collect之类的,dataframe有一个show方法,就相当于数据库当中如果它是一张表,想显示里面的东西,那么show(),还有printSchema()是打印表里除数据之外的列头/表格
df.show()
df.printSchema()
输出结果(省略部分表格内容,实际输出为显示20行):
+----+---------+
| age| name|
+----+---------+
| 20| zhangsan|
|null| lisi|
| 18| wangwu|
| 21| a|
| 20|zhangsan1|
+----+---------+
only showing top 20 rows
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
能够出现这一现象,因为最开始读的是一个json,而且这个方法名的json是spark框架帮我们实现的,可以读json格式的文件,它针对这笔数据的时候,会发现键值对的,{“name”:“zhangsan”,“age”:20},左边这个name就是下面的schema信息,右边的age描述的也是列名,只有"zhangsan"和20才是value,是数据信息。且根据写的时候如果带了双引号就是string类型,如果裸露的数值就是long类型,都帮我们做完了,得到最终的结果。并给我们返回了一个DataFrame对象,这个对象可以正确显示它的内容以及打印schema。数据内容和schema是两部分,这件事情变得很简单了。
注意在pom.xml里要加一个依赖,之前是只对core做开发,写rdd就可以了,现在要追加一个spark-sql
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
但现在只是快速的找到感觉,这个东西和rdd写的不一样了,但是要把上节课的理论知识带回来,如果这是一个spark-sql的话,以SQL为中心,能想到的是,如果给你一个MySQL数据库,想做什么事情?
这个数据库里有database的概念,有tables的概念,一系列的元数据管理,现在在没有开启spark on hive 的时候有没有这些东西。要展示谁?
session.catalog,要知道编目里有哪些映射指向,通过 listDatabases() 可以得到当前spark-sql里有哪些数据库的结果集。
同样的, listTables() 可以的到编目里有哪些表,可以想到一个结果,catalog就是一个新的对象,现在不会有表。因为没有meta store,没有开启hive的存储,所以没有存储层,没有持久化。
session.catalog.listFunctions() 还可以得到当前围绕这个sql引擎有哪些函数,无论是系统函数,标准函数,UDF,UDAF,functions.show(999,true) true代表会把字段展开不缩小,可以看到结果。
val databases: Dataset[Database] = session.catalog.listDatabases()
databases.show()
val tables: Dataset[Table] = session.catalog.listTables()
tables.show()
val functions: Dataset[catalog.Function] = session.catalog.listFunctions()
functions.show(10,true)
输出结果:
+-------+----------------+--------------------+
| name| description| locationUri|
+-------+----------------+--------------------+
|default|default database|file:/E:/softs/id...|
+-------+----------------+--------------------+
在数据库的时候有这么一个打印, 在我们的项目目录下,
可以看出来创建了一个目录spark-warehouse但是没有任何用,
因为不能执行DDL也没有相应的东西
没有开启enableHiveSupport() ,catalog是不会做持久化的
未来开启后会持久化到spark-warehouse可能在项目下,也可能在hive里面
这是后话
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+
+----+--------+-----------+--------------------+-----------+
|name|database|description| className|isTemporary|
+----+--------+-----------+--------------------+-----------+
| !| null| null|org.apache.spark....| true|
| %| null| null|org.apache.spark....| true|
| &| null| null|org.apache.spark....| true|
| *| null| null|org.apache.spark....| true|
| +| null| null|org.apache.spark....| true|
| -| null| null|org.apache.spark....| true|
| /| null| null|org.apache.spark....| true|
| <| null| null|org.apache.spark....| true|
| <=| null| null|org.apache.spark....| true|
| <=>| null| null|org.apache.spark....| true|
+----+--------+-----------+--------------------+-----------+
only showing top 10 rows
之前得到的df.show() df.printSchema()都是api级别的操作,如果想写sql怎么办,通过df身上有一个createTempView的方法,临时视图的,并不代表要创建表格,如果是createTable就要创建物理表了,物理表的元数据要持久化一系列操作,在1.6的时候有争议,register table后续版本,因为在catalog里是没有storage的,所以就改成了view视图。
df.createTempView(“ooxx”) 向session里去注册一个表名,把这个数据集注册成一个逻辑名称到编目录catalog里去,未来写sql的时候就可以通过ooxx这个名字找到这个数据集,未来可以得到数据。
df.createTempView("ooxx") //这一过程是df通过session 向catalog中注册表名
因为是通过session注册的,最终写sql实在这里写:
session.sql(“select name from ooxx”)
里面可以写什么sql语句,比如最简单的select * form ooxx,这里的ooxx有没有是取决于catalog中有没有注册过或者catalog能不能再去调用hive的meta store查找到这个表名。
这时候只是传入了一个sql给到它身上,和rdd一样,并没有被执行,会返回一个DataFrame,和rdd迭代的过程一样,这个sql算子也是一个转换算子,会把你的sql语句转换成一个结果集。最终是要通过action算子show()来打印一下查询的结果
val frame: DataFrame = session.sql("select name from ooxx")
frame.show()
输出结果(省略部分结果):
+---------+
| name|
+---------+
| zhangsan|
| lisi|
| wangwu|
| a|
|zhangsan1|
+---------+
only showing top 20 rows
之前catalog里是空的,但是只要注册过之后再去验证
session.catalog.listTables().show()
输出结果:
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|ooxx| null| null|TEMPORARY| true|
+----+--------+-----------+---------+-----------+
出现注册了一个ooxx表的类型是TEMPORARY临时的,即使显示持久化也没有能力持久化出去。
肯定有人不习惯又有地方写sql,又有地方写api,这时候可以大胆的去想,手里通过session开启了hive的支持,如果对一些数据都打开了DataFrame,都注册成表了,或者通过hive的metastore拿到了元数据之后,可以完全未来只想纯写sql,api都不想使用了。
假设df.createTempView(“ooxx”)代表了已经对上面的50个文件都注册成了不同的表,会有50张表,这时候可以做什么事情?发展成什么样子就是我们想要的了?
先导入一个包,Scala标准输入:import scala.io.StdIn._
做一件事情:readLine(“input your sql:”) 并把用户输入的内容返回成一个sql语句。
不硬编码写死上面的sql语句了,来自于用户动态的输入,关键用户输入的表名有没有的问题,刚才说了假设注册了50张表,这里可以有50张表的查询,查询完结果之后,它是懒执行的,只有到action算子的时候整个作业才能跑通。这个死循环会阻塞等待用户的输入,输入完给它开始执行并打印,回头又开始等待用户的输入。
import scala.io.StdIn._
while (true) {
val sql: String = readLine("input your sql: ")
session.sql(sql).show()
}
输出结果:
input your sql: select * from ooxx limit 3
+----+--------+
| age| name|
+----+--------+
| 20|zhangsan|
|null| lisi|
| 18| wangwu|
+----+--------+
input your sql: (等待用户继续输入)
这个过程有点像MySQL了,spark-sql里有一套自己的服务,它也有它的客户端服务可以达到这个效果,可以大胆的扩展一下,你的到这个sql可以通过readline标准输入,如果把这个方法换成一个底层通信,别的人通过tcp/socket发送一个字符串过来,你收到之后再执行是不是也可以。约等于一个服务器客户端模式了。
大体上有一个感觉,spark-sql是比较完整的,虽然它现在自己没有持久化层,但是有一个编目,可以临时的对元数据有一个内存的管理。sql该有的东西它都有,而且也是可以通过一种方式让你只写sql不干别的事情,但是如果想纯写sql要现在catalog中准备出那些表的元数据。只不过元数据怎么准备出来的,要不手工的将元数据进行转换注册成表,或者是开启hive支持,直接可以写sql因为收到的sql 的表来自于catalog对hive的咨询,hive有什么表就可以写什么表。
继续展示细节:
dataframe和rdd是什么关系
创建一个preson.txt文件
zhangsan 18
lisi 22
wangwu 99
首先不碰sparksession,因为展现出的太好用太简单了,从自己构建的过程,session里有createDataFrame一系列的方法,不是通过它的某一个方法直接得到的,而是要满足自己构建DataFrame,可能一辈子不会用它,讲完这个过程就对DataFrame有一定的了解。
createDataFrame()可以传的参数有哪些 ?
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema, needsConversion = true)
}
可以来自一个ROW类型的RDD加上一个schema就是StructType,通过参数都可以参悟到DataFrame由什么组成,由RDD且是ROW类型的数据集(RDD是数据集存数据的)再加上schema(结构化Type,数据的元数据,这个RDD里有哪些列,每个列叫啥类型和上节课讲的理论是一样的)
def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
val attributeSeq: Seq[AttributeReference] = getSchema(beanClass)
val className = beanClass.getName
val rowRdd = rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className))
SQLContext.beansToRows(iter, localBeanInfo, attributeSeq)
}
Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self))
}
还有是通过RDD它不是ROW类型的,这个RDD也是存数据的,但还要给出另外一个东西,是beanClass,做Java编程知道,Java中所有的属性都是可以提取出来的,给出一个类的类型就知道里面有哪些属性了。如果这个RDD里的所有的数据都是这个类型的对象的话,其实在这一步就是在提取schema,也是走的上面类似的东西。换言之就是数据+元数据就可以封装成DataFrame
先说最原始的,先准备一个RDD,再准备一个元数据,通过createDataFrame得到一系列查询.
如果想调用这个方法,需要先得到两个东西:数据+元数据 == DataFrame
1.数据 RDD[ROW]
//1 数据 RDD[ROW]
val rdd: RDD[String] = sc.textFile("in/person.txt")
//得到最原始的RDD,但它的类型还是String类型,要把RDD转成ROW类型
//怎么去转换它?而且String类型存的是每一行有名字和数值两个东西,最终要匹配成两个字段
val rddRow: RDD[Row] = rdd.map(line => {
val strs: Array[String] = line.split(" ")
//可以把一行字符串转换成一个Tuple2元素返回 spark语法用小括号取下标
//先把数据准备成这个样子(strs(0),strs(1).toInt) 再去转换
(strs(0), strs(1).toInt)
}).map(
//现在是个tuple2的类型,要转换成ROW的类型 怎么做? 最终返回值就是一个Row的类型了
//Seq的作用就是把几个元素整成了一个sequence序列,这个集合里它会认为第一个是第一列,第二个是第二列
//Row就是行的意思,这一行就有多个列不同的组成,所有肯定是每个元素是独立的,但整体是一个集合
x => Row.apply(Seq(x._1, x._2)) //错误的,等于列里是一个Seq,这个Row只有一个元素,是一个sequence,应该是有两个列,一个列是String,一个列Int类型的
//正确写法:x => Row.apply(x._1, x._2) 或者 x => Row.fromSeq(Seq(x._1, x._2))
)
2.元数据 StructType
def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)
StructType.apply()在应用的时候需要一个fields,它的类型是 Seq[StructField]又是一个序列,因为一个表里有很多的列,name,age,… 每一个列是一个StructField,整张表有多个列是一个sequence序列/集合,可以用它的api去逆推这件事情,想得到StructType但是要先去得到StructField,想得到列的话StructField.apply()看需要哪些东西:
case class StructField(
name: String, //一个列自己的名字
dataType: DataType, //这个列未来是什么类型,建表的时候也要给出名称和数据类型
nullable: Boolean = true, //是否可以为空
metadata: Metadata = Metadata.empty)
DataTypes类中的数据类型:
public class DataTypes {
public static final DataType StringType = StringType$.MODULE$;
public static final DataType BinaryType = BinaryType$.MODULE$;
public static final DataType BooleanType = BooleanType$.MODULE$;
public static final DataType DateType = DateType$.MODULE$;
public static final DataType TimestampType = TimestampType$.MODULE$;
public static final DataType CalendarIntervalType = CalendarIntervalType$.MODULE$;
public static final DataType DoubleType = DoubleType$.MODULE$;
public static final DataType FloatType = FloatType$.MODULE$;
public static final DataType ByteType = ByteType$.MODULE$;
public static final DataType IntegerType = IntegerType$.MODULE$;
public static final DataType LongType = LongType$.MODULE$;
public static final DataType ShortType = ShortType$.MODULE$;
public static final DataType NullType = NullType$.MODULE$;
//省略类中的方法
}
// 2.元数据 StructType
val fields = Array(
StructField.apply("name", DataTypes.StringType, true),
StructField.apply("age", DataTypes.IntegerType, false)
)
//未来要放一个sequence或array,准备一个Array里面放的就是这两个元素
//Array里的顺序要与row和数据的顺序都是一致的
val schema: StructType = StructType.apply(fields)
根据上面的代码操作得到了数据row,得到的元数据schema,下一步可以创建dataframe了
val dataFrame: DataFrame = session.createDataFrame(rddRow,schema)
通过以上代码流程知道了DataFrame是什么,DataFrame倾向于是有表头,有列,有元数据,且里面还保存了一批所有列对应行的数据,换言之,DataFrame就是一张表。
既然DataFrame是一张表,所以可以show(),可以printSchema(),也可以createTempView(“person”)注册到session里面,既然可以注册进去,就可以session.sql()去查询person这张表。
dataFrame.show()
dataFrame.printSchema()
dataFrame.createTempView("person")
session.sql("select * from person").show()
输出结果:
+--------+---+
| name|age|
+--------+---+
|zhangsan| 18|
| lisi| 22|
| wangwu| 99|
+--------+---+
发生报错:
ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 3)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.collection.immutable.
c
o
l
o
n
colon
coloncolon is not a valid external type for schema of string
原因是在生成Row的时候调用的apply方法传入的参数不对:
val rddRow: RDD[Row] = rdd.map(line => {
val strs: Array[String] = line.split(" ")
(strs(0), strs(1).toInt)
}).map(x => Row.apply(Seq(x._1, x._2)))
正确的使用方法:
x => Row.fromSeq(Seq(x._1, x._2))
x => Row.apply(x._1, x._2)
apply和fromSeq用法的区别:
/*This method can be used to construct a [[Row]] with the given values.*/
def apply(values: Any*): Row = new GenericRow(values.toArray)
/*This method can be used to construct a [[Row]] from a [[Seq]] of values.*/
def fromSeq(values: Seq[Any]): Row = new GenericRow(values.toArray)
这个例子中告诉我们的是什么?
最终的api都这么去写,关键前面是怎么来的,json的时候直接得到了,现在通过SparkContext度Rdd转成Row类型,加上一个StructType得到一个DataFrame最终再来执行。整合出两种特征一相加也能得到DataFrame。(enableHiveSupport还没有开启,使用DDL语句创建会报错。)
session.sql("create table student11 (name string,age int)")
session.catalog.listTables().show()
session.sql("select * from student11")
输出结果:
+---------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+---------+--------+-----------+---------+-----------+
|student11| default| null| MANAGED| false|
| ooxx| null| null|TEMPORARY| true|
| person| null| null|TEMPORARY| true|
+---------+--------+-----------+---------+-----------+
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Hive support is required to select over the following tables:
`default`.`student11`
spark里为什么没有自己来实现metastore元数据存储,而实现了一个catalog,因为它想用hive的meta store,没必要关起门造轮子了,所有的DDL解析以及HQL语法规则都借用的hive的meta store,catalog只是一个被动的只做编目、目录,不做DDL语法解析,元数据存储,所有它不叫作meta store
但以上数据+元数据获取DataFrame的方式已经很少用了,spark session出来之后,很多高阶的东西可以直接拿到DataFrame或者DataSet两种操作方式。
数据+元数据 == dataframe 就是一张表
第一种方式:row类型的rdd+structType
第二种方式:bean类型的rdd+JavaBean
第1.1版本:动态封装:
Row自身是一个类型,代表的是很多列,每个列要标识出准确类型,否则会报类型不匹配(非常重要!!)
//模拟用户给出的schema,完全根据这个schema和数据做映射,得到一个DataFrame
val userSchema = Array(
"name string",
"age int"
)
rdd.map(_.split(" "))其实已经切割出了每一行是2个元素,或是N个元素,切割完是一个数组,比如 [ zhangsan 18 0 ] ,一个数组有N个元素,每一个元素要扔给一个方法,这个方法最终是能将每个元素转换成自己定义的类型,这每一个元素是怎么加工出来的?介绍一个方法:
// 1. row rdd
//Row自身是一个类型,代表的是很多列,每个列要标识出准确类型,否则会报类型不匹配(非常重要!!)
//在这个位置趋向于把列的数量和列属性的类型动态起来
//Row还有一个方法fromSeq 来自一个集合,这个集合里给出了很多values
//整出一个集合来,集合里元素的类型都给封装好,再给扔进去就ok了!
rdd.map(_.split(" "))
.map(x=>x.zipWithIndex) //可以让集合每一个元素拼上正确顺序下标
//可以得到这样一个数据集 [ (zhangsan,0) (18,1) (0,2) ] 希望对每一个元组进行操作
.map(x=>x.map(toDataType(_))) //定义工具方法toDataType() 进行类型转换
.map(x=>Row.fromSeq(x)) //又得到一个Row类型的Rdd,这一过程是动态灵活的
先把数据集准备成一个映射,zhangsan映射name,0映射string,准备好了映射之后,定义一个工具方法
def toDataType(vv:(String,Int))={
userSchema(vv._2).split(" ")(1) match {
case "string" =>vv._1.toString
case "int" =>vv._1.toInt
}
}
数据的元数据应该有几个列,且每个列应该是神马类型,从用户给出的字符串数组可以得到,且里面有名称有类型。其实定义StructType就是把用户给出的东西翻转一下,用户给出的字符串其实就是一个数据集,字符串数组。
// 2. structType 封装
def getDataType(v:String)={
v match {
case "string" =>DataTypes.StringType
case "int" =>DataTypes.IntegerType
}
}
val userfields: Array[StructField] = userSchema.map(_.split(" "))
.map(x => StructField.apply(x(0), getDataType(x(1))))
val userschema: StructType = StructType.apply(fields)
val userDF: DataFrame = session.createDataFrame(rowRdd,userschema)
userDF.show()
userDF.printSchema()
输出结果:
+--------+---+
| name|age|
+--------+---+
|zhangsan| 18|
| lisi| 22|
| wangwu| 99|
+--------+---+
root
|-- name: string (nullable = false)
|-- age: integer (nullable = false)
这些方式都已经不用了,但是可以有温度的知道dataFrame就是元数据+数据,通过代码的方式找到感觉,但是这种代码的编程方式都已经淘汰了,在1.6的时候是这么去玩的。除了session创建还有JavaBean的方式:
1.首先定义一个JavaBean。但是是Scala的bean不是Java类,Scala里也有对象的概念,两个标签@BeanProperty提供get和set方法。
class Person {
@BeanProperty
var name :String=""
@BeanProperty
var agg:Int = 0
}
2.如何完成JavaBean的形式:
//第一步得到的是RDD[Person]不是RDD[ROW]类型了
val rddBean: RDD[Person] = rdd.map(_.split(" ")).map(arr => {
val p = new Person
p.setName(arr(0))
p.setAgg(arr(1).toInt)
p //进来的全是字符串,输出的是对象类型
})
//classOf[Person] 取person这个类型,说白了取person的属性
val beandf: DataFrame = session.createDataFrame(rddBean,classOf[Person])
beandf.show()
beandf.printSchema()
输出结果:(列的顺序是按照首字母排序的,但是类型是对的,符合定义的)
+---+--------+
|agg| name|
+---+--------+
| 18|zhangsan|
| 22| lisi|
| 99| wangwu|
+---+--------+
root
|-- agg: integer (nullable = false)
|-- name: string (nullable = true)
这不重要,开始是把new一个Person对象放在匿名函数里面,如果把创建Person对象放在外面,没有报错,因为这里有一个知识点叫做闭包,把对象闭到匿名函数里面去了,这是说得通的,整体语法也没有问题,执行的时候会不会有问题?
val p = new Person
val rddBean: RDD[Person] = rdd.map(_.split(" ")).map(arr => {
p.setName(arr(0))
p.setAgg(arr(1).toInt)
p //进来的全是字符串,输出的是对象类型
})
报错:Task not serializable我们的任务不能被序列化。
为什么没有被序列化?
按照曾经的编程习惯,new了一个对象,对象new在了方法外,方法里处理每一行的时候,使用的是相同的对象,如果每一次都使用相同对象会有一个问题,这个对象前后传递的时候会有数据覆盖/引用重复的问题,但这个东西答法很值钱:
1.无论MR还是spark都属于pipeline,也就是iterator迭代器,一次内存飞过一条数据,hasNext() next()迭代一条数据,这句话转换一下思路:
这一条记录完成了读取,计算,到序列化的过程,第二条记录再飞进来的时候,这一个对象new在里面和外面是没有区别的。
2.它是分布式计算,计算逻辑由driver端要序列化,发送给其他jvm的executor中执行,函数里所有的东西是否支持序列化,它必然要发生一次序列化的过程,要发送过去。
所以这时候要怎么解决这个问题?
让Person继承序列化就可以了。class Person extends Serializable就没有问题了。
还可以怎么做:
如果只是读取一个文件,文件里没有描述的话,先是一个抽象的最大的类型DataSet,它是对Rdd最完整的包装, 就把它想成是一个Rdd,一行一行的,但是它的api和Rdd是有些差异的。很多sql里的东西,rdd没有它是有的,多了一些可以和SQL绑定的东西, DataSet是Rdd的扩展,Rdd只是纯面向collection的,DataSet可以面向collection,也可以SQL领域语言的映射, 所以SQL这种字符串到逻辑执行才能被打通,如果没有它映射的话,SQL写一个select是啥,DataSet完成了一些方法,可以和SQL的关键字勾上,同时DataSet的方法最终转成rdd的某种包装方式,承上启下的东西。
一定要记住一句话:
spark的dataset既可以按collection类似于rdd的方法操作,也可以按SQL领域语言定义的方式操作
val ds01: Dataset[String] = session.read.textFile("in/person.txt")
val person: Dataset[(String, Int)] = ds01.map(line => {
val strs: Array[String] = line.split(" ")
(strs(0), strs(1).toInt)
})(Encoders.tuple(Encoders.STRING, Encoders.scalaInt))
person
val cperson: DataFrame = person.toDF("name","age")
cperson.show()
cperson.printSchema()
DataSet如果想把数据由非结构变成结构话的化,最好是转成tuple级别,比如进来的是一行字符串,首先切割它,得到的是一个数组,返回 (strs(0), strs(1).toInt) ,但这时候用DataSet数据类型发生变化的时候,要使用编码器。尽量让数据在内存里是字节数组不成为对象,充分利用钨丝计划,堆内堆外都可以放了。只要是常见类型,通过 import session.implicits._隐式转换,就能够完成。
这时候会有schema么?读取的是一个文本文件,不带描述,没有name,没有age的定义,类型也是强转的,缺了schema,变成了_1 _2的参数名,怎么得到名称?person.toDF(“name”,“age”) 再打印即可。
编码器:Scala对类型转换是非常严格的
(Encoders.tuple(Encoders.STRING, Encoders.scalaInt)) 这里不可以写Integer
一个场景:
现在导入的是纯文本文件,不带自描述的,且是string类型,纯文本是不被大数据待见的,既损耗存储空间,又损耗算力去拆解它,一般数据来了都要先做一个快速加工的过程,必须转结构化之后才能进行后续的计算。
但是在参与转结构化有多种方式:
1.现在的这种写法,开启隐式转换,都能够自动匹配 由spark完成 转结构化再存数据
2.hive数仓的时候也有一个正则的可以拆解,把纯文本转成结构化存到数仓的表里
几个名词:
接数:源数据,趋向于不要破坏,不删除,留存历史 都是转换操作,转成另一个副本,parquet csv 或者json的格式 整体的过程叫做ETL过程
ETL:无论从哪里把数据接过来了,然后转换成一批中间状态,ETL到中间态,还不能拿来直接用,还要进行一些加工,做一些SQL的加工才能变成结果态,中间态是最贴近数据源的,比如数据源4个列,中间态还是4个列,不做任何列的筛检,最多做一些数据质量的检查,数据里面是不是不能为空的有空了,或者是不是有重复数据,断片了,得到中间态 所有的计算应该发生在中间态
中间态:对于中间态有一些要求,起码要做到,文件格式类型是parquet csv 或者json 哪种类型计算速度快,选哪种类型,以后续的计算场景需要为准。 还要加一个东西,中间态里要融入分区和分桶,中间态的数据要衡量后续计算的特征。 要做出分区分桶的选择,一切以后续计算成本为考量。分析后面数据计算特征,比如说这批数据是流水,那么身份证号一定是所有SQL中最关心的那一列,身份证号就可以分区分桶了。如果这批数据是增量的拉链表,日期列非常敏感,每天都会往里面放数据,就是分区表,按时间分区,未来计算的时候,可以让起步加载的数据量变少。分桶是会让计算过程中的shuffle,移动数据量变少,极大的优化未来计算的IO 的,一个是加载IO ,一个是shuffle IO。计算是一件不容易的事情,起码要先把文本文件转成一个什么格式去存,存完之后,按照中间态的文件再去计算,融入分区分桶,文件格式类型才能让后续变得更快。
最后
以上就是俭朴黑米为你收集整理的2020.11.17(datafram到dataset开发)的全部内容,希望文章能够帮你解决2020.11.17(datafram到dataset开发)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复