概述
参考链接:
(1)Spark创建DataFrame的三种方法
https://blog.csdn.net/martin_liang/article/details/79748503
(2)Spark创建DataFrame的几种方式
https://blog.csdn.net/shirukai/article/details/81085642
与关系数据库的表(Table)一样,DataFrame是Spark中对带模式(schema)行列数据的抽象。DateFrame广泛应用于使用SQL处理大数据的各种场景。
创建DataFrame有很多种方法,比如从本地List创建、从RDD创建或者从源数据创建。
创建DataFrame的几种方式
目标:生成如下的DataFrame数据
+----+---+-----------+
|name|age| phone|
+----+---+-----------+
|ming| 20|15552211521|
|hong| 19|13287994007|
| zhi| 21|15552211523|
+----+---+-----------+
创建DataFrame的方法
(1)Spark中使用toDF函数创建DataFrame
通过导入(importing)Spark sql implicits, 就可以将本地序列(seq), 数组或者RDD转为DataFrame。只要这些数据的内容能指定数据类型即可。
注意:如果直接用toDF()而不指定列名字,那么默认列名为"_1", “_2”, …
#通过case class + toDF创建DataFrame的示例
import sqlContext.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Define the schema using a case class.
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// 使用 sqlContext 执行 sql 语句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
(2)Spark中使用createDataFrame函数创建DataFrame
在SqlContext中使用createDataFrame也可以创建DataFrame。跟toDF一样,这里创建DataFrame的数据形态也可以是本地数组或者RDD。
#通过row+schema创建示例
import org.apache.spark.sql.types._
val schema = StructType(List(
StructField("integer_column", IntegerType, nullable = false),
StructField("string_column", StringType, nullable = true),
StructField("date_column", DateType, nullable = true)
))
val rdd = sc.parallelize(Seq(
Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
))
val df = sqlContext.createDataFrame(rdd, schema)
(3)通过文件直接创建DataFrame
详见8种创建方式,这里只列出最为常见的文件创建方式
#使用parquet文件创建
val df = sqlContext.read.parquet("hdfs:/path/to/file") #hdfs:/path/to/file是一个目录
#使用json文件创建
val df = spark.read.json("people.json")
#使用csv文件创建
//首先初始化一个SparkSession对象
val spark = org.apache.spark.sql.SparkSession.builder.master("local").appName("Spark CSV Reader").getOrCreate;
//然后使用SparkSessions对象加载CSV成为DataFrame
val df = spark.read.format("com.databricks.spark.csv")
.option("header", "true") //reading the headers
.option("mode", "DROPMALFORMED")
.load("csv/file/path"); //.csv("csv/file/path")属于spark 2.0 api
df.show()
8种创建例子
第一种:通过Seq生成
第二种:读取Json文件生成
第三种:读取csv文件生成
第四种:通过Json格式的RDD生成(弃用)
第五种:通过Json格式的DataSet生成
第六种: 通过csv格式的DataSet生成
第七种:动态创建schema
第八种:通过jdbc创建
第一种:通过Seq生成
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
val df = spark.createDataFrame(Seq(("ming", 20, 15552211521L),("hong", 19, 13287994007L),("zhi", 21, 15552211523L))).toDF("name", "age", "phone")
df.show()
第二种:读取Json文件生成
val dfJson = spark.read.format("json").load("/usr/local/spark/spark-data/student.json")
dfJson.show()
第三种:读取csv文件生成
val dfCsv = spark.read.format("csv").option("header", true).load("/usr/local/spark/spark-data/students.csv")
dfCsv.show()
第四种:通过Json格式的RDD生成(弃用)
import spark.implicits._
val sc = spark.sparkContext
val jsonRDD = sc.makeRDD(Array(
"{"name":"ming","age":20,"phone":15552211521}",
"{"name":"hong", "age":19,"phone":13287994007}",
"{"name":"zhi", "age":21,"phone":15552211523}"
))
val jsonRddDf = spark.read.json(jsonRDD)
jsonRddDf.show()
第五种:通过Json格式的DataSet生成
val jsonDataSet = spark.createDataset(Array(
"{"name":"ming","age":20,"phone":15552211521}",
"{"name":"hong", "age":19,"phone":13287994007}",
"{"name":"zhi", "age":21,"phone":15552211523}"
))
val jsonDataSetDf = spark.read.json(jsonDataSet)
jsonDataSetDf.show()
第六种:通过csv格式的DataSet生成
val scvDataSet = spark.createDataset(Array("ming,20,15552211521","hong,19,13287994007","zhi,21,15552211523"))
spark.read.csv(scvDataSet).toDF("name","age","phone").show()
第七种:动态创建schema
val schema = StructType(List(StructField("name", StringType, true),StructField("age", IntegerType, true),StructField("phone", LongType, true)))
val dataList = new util.ArrayList[Row]()
dataList.add(Row("ming",20,15552211521L))
dataList.add(Row("hong",19,13287994007L))
dataList.add(Row("zhi",21,15552211523L))
spark.createDataFrame(dataList,schema).show()
第八种:通过jdbc创建
//jdbc创建:读取数据库(mysql)
val options = new util.HashMap[String,String]()
options.put("url", "jdbc:mysql://localhost:3306/sparktest")
options.put("driver","com.mysql.jdbc.Driver")
options.put("user","root")
options.put("password","hadoop")
options.put("dbtable","employee")
spark.read.format("jdbc").options(options).load().show()
完整代码
package com.hadoop.spark
import java.util
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext, SparkSession}
/**
* 创建DataFrame的几种方式
*/
object CreateDataFrameTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local").getOrCreate()
//第一种:通过Seq生成
val df = spark.createDataFrame(Seq(("ming", 20, 15552211521L),("hong", 19, 13287994007L),("zhi", 21, 15552211523L))).toDF("name", "age", "phone")
df.show()
//第二种:通过读取Json文件生成
val dfJson = spark.read.format("json").load("/usr/local/spark/spark-data/student.json")
dfJson.show()
//第三种:通过读取Csv文件生成
val dfCsv = spark.read.format("csv").option("header", true).load("/usr/local/spark/spark-data/students.csv")
dfCsv.show()
//第四种:通过Json格式的RDD生成(弃用)
val sc = spark.sparkContext
val jsonRDD = sc.makeRDD(Array(
"{"name":"ming","age":20,"phone":15552211521}",
"{"name":"hong", "age":19,"phone":13287994007}",
"{"name":"zhi", "age":21,"phone":15552211523}"
))
val jsonRddDf = spark.read.json(jsonRDD)
jsonRddDf.show()
//第五种:通过Json格式的DataSet生成
val jsonDataSet = spark.createDataset(Array(
"{"name":"ming","age":20,"phone":15552211521}",
"{"name":"hong", "age":19,"phone":13287994007}",
"{"name":"zhi", "age":21,"phone":15552211523}"
))
val jsonDataSetDf = spark.read.json(jsonDataSet)
jsonDataSetDf.show()
//第六种: 通过csv格式的DataSet生成
val scvDataSet = spark.createDataset(Array("ming,20,15552211521","hong,19,13287994007","zhi,21,15552211523"))
spark.read.csv(scvDataSet).toDF("name","age","phone").show()
//第七种:动态创建schema
val schema = StructType(List(StructField("name", StringType, true),StructField("age", IntegerType, true),StructField("phone", LongType, true)))
val dataList = new util.ArrayList[Row]()
dataList.add(Row("ming",20,15552211521L))
dataList.add(Row("hong",19,13287994007L))
dataList.add(Row("zhi",21,15552211523L))
spark.createDataFrame(dataList,schema).show()
//第八种:读取数据库(mysql)
val options = new util.HashMap[String,String]()
options.put("url", "jdbc:mysql://localhost:3306/sparktest")
options.put("driver","com.mysql.jdbc.Driver")
options.put("user","root")
options.put("password","hadoop")
options.put("dbtable","employee")
spark.read.format("jdbc").options(options).load().show()
}
}
最后
以上就是靓丽鞋垫为你收集整理的大数据管理技术 Spark创建DataFrame的全部内容,希望文章能够帮你解决大数据管理技术 Spark创建DataFrame所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复