概述
1.新的起始点SparkSession
在老的版本中,SparkSQL提供两种SQL查询起始点,一个叫SQLContext,用于Spark自己提供的SQL查询,一个叫HiveContext,用于连接Hive的查询,SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
SparkSession.builder 用于创建一个SparkSession。
import spark.implicits._的引入是用于将DataFrames隐式转换成RDD,使df能够使用RDD中的方法。
如果需要Hive支持,则需要以下创建语句:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
2.创建DataFrames
在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以从Hive Table进行查询返回,或者通过Spark的数据源进行创建。
2.1 从Spark数据源进行创建:
1).在HDFS文件系统中创建文件夹(在/opt/module/hadoop-2.7.2目录下执行)
hadoop fs -mkdir -p /user/luomk/source/
2).将本地文件上传到HDFS
hadoop fs -copyFromLocal /opt/module/spark-2.1.1-bin-hadoop2.7/source/employees.json /user/luomk/source/
3).通过集群模式读取HDFS文件
val df=spark.read.json("source/employees.json")
或者
val peopleDF = spark.read.format("json").load("source/employees.json")
4).展示
peopleDF.show()
peopleDF.createOrReplaceTempView("employees")
spark.sql("select * FROM employees").show();
spark.sql("select * FROM employees where salary>=4000").show();
2.2 从RDD进行转换
hadoop fs -copyFromLocal /opt/module/spark-2.1.1-bin-hadoop2.7/source/people.txt /user/luomk/source/
val peopleRdd = sc.textFile("source/people.txt ")
val peopleDF3 = peopleRdd.map(_.split(",")).map(paras => (paras(0),paras(1).trim().toInt)).toDF("name","age")
scala> peopleDF.show()
2.3 Hive我们在数据源章节介绍
3.创建DataSet
Dataset是具有强类型的数据集合,需要提供对应的类型信息
//1.
scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
//2.
scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> primitiveDS.map(_ + 1).collect()
res1: Array[Int] = Array(2, 3, 4)
//3.
scala> val path = "source/employees.json"
path: String = source/employees.json
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
4.RDD、DataFrame、Dataset 三者之间转换
4.1 RDD 和 DataFrame 之间的转换
4.1.1 RDD -> DataFrame
(1) 通过 将RDD转换位 元组结构 toDF
rdd.map{x =>
val pa = x.split(",");
(pa(0).trim, pa(1).trim)
}.toDF("name", "age”)
(2) 反射
需要样例类:
case class Person(name:String,age:String)
rdd.map{x=>
val pa = x.split(",");
Person(pa(0).trim, pa(1).trim)
}.toDF
(3) 编程方式【你提前不知道都有多少列,通过程序动态生成】
val peopleRDD = spark.sparkContext.textFile("source/people.txt")
// The schema is encoded in a string,应该是动态通过程序生成的
val schemaString = "name,age"
import org.apache.spark.sql.types._
// Generate the schema based on the string of schema Array[StructFiled]
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
// val filed = schemaString.split(" ").map(filename=> filename match{ case "name"=> StructField(filename,StringType,nullable = true); case "age"=>StructField(filename, IntegerType,nullable = true)} )
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
import org.apache.spark.sql._
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
4.1.2 DataFrame -> RDD
dataframe.rdd
如何读取:
1、df.map(_.getString(0)).collect
2、df.map(_.getAs[String]("name")).collect
4.2 RDD 和 DataSet 之间的转换
4.2.1 RDD -> DataSet
需要样例类
Scala case class Person(name:String,age:String)
rdd.map{x=>
val pa = x.split(",");
Person(pa(0).trim, pa(1).trim)
}.toDS 注意转换为Person对象
4.2.1 DataSet -> RDD
ds.rdd
读取:直接读取对象的属性
4.3 DataFrame 和 DataSet 之间的转换
4.3.1 DataFrame -> DataSet
需要样例类【样例类的属性名称需要和DataFrame的列名相同】
case class Person(name:String,age:String)
df.as[Person]
4.3.2 DataSet -> DataFrame
ds.toDF
4.4 示例
4.4.1 数据准备
原始数据:people.txt格式如下:
Michael 29
Andy 30
Justin 20
将原始数据上传到HDFS:
hadoop fs -copyFromLocal /tmp/20190109-190008-c32c/people.txt /
scala> val peopleRdd = sc.textFile("/people.txt")
peopleRdd: org.apache.spark.rdd.RDD[String] = /people.txt MapPartitionsRDD[13] at textFile at <console>:24
scala> peopleRdd.collect
res3: Array[String] = Array(Michael 29, Andy 30, Justin 20)
4.4.2 RDD和DataFrame格式的相互转换:
• RDD -> DataFrame
需要注意,只有import spark.implicits._之后,RDD才有toDF、toDS功能
//(1)通过将RDD转换为原组结构实现
scala> val peopleDF1 = peopleRdd.map{x => val pa = x.split(","); (pa(0).trim ,pa(1).trim)}.toDF("name","age")
scala> peopleDF1.show
另一种方式
scala> val peopleDF3 = peopleRdd.map(_.split(",")).map(paras => (paras(0),paras(1).trim().toInt)).toDF("name","age")
scala> peopleDF3.show()
//(2)通过反射的方式
case class Person(name:String,age:String)
peopleRdd.collect
peopleRdd.map{x=>val pa = x.split(",");Person(pa(0).trim, pa(1).trim)}.toDF
//(3) 通过编程方式来转换【你提前不知道都有多少列,通过程序动态生成】
val peopleRDD = spark.sparkContext.textFile("source/people.txt")
// The schema is encoded in a string,应该是动态通过程序生成的
val schemaString = "name,age"
import org.apache.spark.sql.types._
// Generate the schema based on the string of schema Array[StructFiled]
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
// val filed = schemaString.split(" ").map(filename=> filename match{ case "name"=> StructField(filename,StringType,nullable = true); case "age"=>StructField(filename, IntegerType,nullable = true)} )
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
import org.apache.spark.sql._
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
• DataFrame -> RDD
peopleDF1.rdd
读取方式:
返回的是 RDD[Row] 如果需要获取Row中的数据,那么可以通过 getString(0) 或者 getAs[String]("name")
例如:
peopleDF1.map(_.getString(0)).collect
peopleDF1.map(_.getAs[String]("name")).collect
4.4.3 RDD 和 DataSet 之间的转换
• RDD -> DataSet
需要样例类
case class Person(name:String,age:String)
val DataSet = peopleRdd.map{x=>val pa = x.split(",");Person(pa(0).trim, pa(1).trim)}.toDS
注意转换为Person对象
• DataSet -> RDD
DataSet.rdd
读取: 直接读取对象的属性
4.4.4 DataFrame 和 DataSet 之间的转换
• DataFrame -> DataSet
需要样例类【样例类的属性名称需要和DataFrame的列名相同】
case class Person(name:String,age:String)
peopleDF1.as[Person]
• DataSet -> DataFrame
ds.toDF
例如:DataSet.toDF
最后
以上就是霸气火龙果为你收集整理的Spark SQL 解析-RDD、DataFrame、Dataset 三者之间转换的全部内容,希望文章能够帮你解决Spark SQL 解析-RDD、DataFrame、Dataset 三者之间转换所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复