我是靠谱客的博主 彩色大神,这篇文章主要介绍Spark 2.4.0编程指南--Spark DataSourcesSpark 2.4.0编程指南–Spark DataSources,现在分享给大家,希望可以做个参考。

Spark 2.4.0编程指南–Spark DataSources

更多资源

  • github: https://github.com/opensourceteams/spark-scala-maven-2.4.0

视频

  • Spark 2.4.0编程指南–Spark DataSources(bilibili视频): https://www.bilibili.com/video/av38193405/?p=5
width="800" height="500" src="//player.bilibili.com/player.html?aid=38193405&cid=68636905&page=5" scrolling="no" border="0" allowfullscreen="true">

前置条件

  • 已安装好java(选用的是java 1.8.0_191)
  • 已安装好scala(选用的是scala 2.11.121)
  • 已安装好hadoop(选用的是Hadoop 2.9.2)
  • 已安装好hive(选用的是apache-hive-3.1.1-bin)
  • 已安装好spark(选用的是spark-2.4.0-bin-hadoop2.7)

技能标签

  • parquet、orc、csv、json、text、avro格式文件的读、写
  • spark.sql直接运行文件
  • BucketyBy,PartitionBy 读写文件
  • mergining dataSet
  • jdbc(mysql)读写操作
  • Hive操作(create drop database ,create,insert,show,truncate,drop table)
  • 官网: http://spark.apache.org/docs/2.4.0/sql-data-sources.html

常规 Load/Save (parquet)

读取parquest格式文件

  • 读取parquet格式文件users.parquet
  • users.parquet 直接打开是十六进制数据
复制代码
1
2
3
4
5
6
7
8
9
spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet").show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+

保存parquest格式文件

  • 读取parquet格式文件users.parquet
  • users.parquet 直接打开是十六进制数据
  • 保存的数据会在这个目录下namesAndFavColors.parquet
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet") usersDF.select("name", "favorite_color").write.save("hdfs://standalone.com:9000/home/liuwen/data/parquest/namesAndFavColors.parquet") spark.read.load("hdfs://m0:9000/home/liuwen/data/parquest/namesAndFavColors.parquet").show //+------+--------------+ //| name|favorite_color| //+------+--------------+ //|Alyssa| null| //| Ben| red| //+------+--------------+

Load/Save (json)

读取json格式文件

  • 读取json格式文件people.json
  • people.json存储的是json格式的数据
  • 注意,json格式存储的文件,每行中都包含字段名称信息,比较占空间,不推荐使用,可以考虑用默认的 parquet格式存储
复制代码
1
2
3
4
5
6
7
8
9
10
11
spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/data/json/people.json").show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| //+----+-------+

保存json格式文件

  • 读取json格式文件people.json
  • people.json存储的是json格式的数据
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/data/json/people.json").show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| //+----+-------+ //保存json格式数据到hdfs上面 ds.select("name", "age").write.format("json").save("hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json") //读取保存的数据 spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json").show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| //+----+-------+
  • HDFS查看保存的文件信息
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
hdfs dfs -ls -R hdfs://standalone.com:9000/home/liuwen/output/json // drwxr-xr-x - liuwen supergroup 0 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json //-rw-r--r-- 1 liuwen supergroup 0 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/_SUCCESS //-rw-r--r-- 1 liuwen supergroup 71 2018-12-18 17:44 //hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/part-00000-6690fee8-33d3-413c-8364-927f02593ff2-c000.json hdfs dfs -cat hdfs://standalone.com:9000/home/liuwen/output/json/namesAndAges.json/* //数据在文件 namesAndAges.json/part-00000-6690fee8-33d3-413c-8364-927f02593ff2-c000.json //{"name":"Michael"} //{"name":"Andy","age":30} //{"name":"Justin","age":19} //[liuwen@standalone ~]$

Load/Save (csv)

读取csv格式文件

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
val peopleDFCsv = spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("hdfs://m0:9000/home/liuwen/data/csv/people.csv") //peopleDFCsv: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field] peopleDFCsv.show // +-----+---+---------+ //| name|age| job| //+-----+---+---------+ //|Jorge| 30|Developer| //| Bob| 32|Developer| //+-----+---+---------+

写csv格式文件

复制代码
1
2
3
4
5
6
7
8
9
10
11
//保存json格式数据到hdfs上面 peopleDFCsv.select("name", "age").write.format("csv").save("hdfs://standalone.com:9000/home/liuwen/output/csv/people.csv") spark.read.format("csv").option("sep", ",").option("inferSchema", "true").option("header", "true").load("hdfs://standalone.com:9000//home/liuwen/output/csv/people.csv").show //+-----+---+ //|Jorge| 30| //+-----+---+ //| Bob| 32| //+-----+---+
  • 查看hdfs上csv文件
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
hdfs dfs -ls -R hdfs://m0:9000/home/liuwen/output/csv/ //drwxr-xr-x - liuwen supergroup 0 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv //-rw-r--r-- 1 liuwen supergroup 0 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv/_SUCCESS //-rw-r--r-- 1 liuwen supergroup 16 2018-12-18 18:04 hdfs://m0:9000/home/liuwen/output/csv/people.csv/part-00000-d6ad5563-5908-4c0e-8e6f-f13cd0ff445e-c000.csv hdfs dfs -text hdfs://m0:9000/home/liuwen/output/csv/people.csv/part-00000-d6ad5563-5908-4c0e-8e6f-f13cd0ff445e-c000.csv //Jorge,30 //Bob,32

Load/Save (orc)

写orc格式文件

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet") usersDF.show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+ usersDF.write.format("orc").option("orc.bloom.filter.columns", "favorite_color").option("orc.dictionary.key.threshold", "1.0").save("hdfs://standalone.com:9000/home/liuwen/output/orc/users_with_options.orc")

读orc格式文件

复制代码
1
2
3
4
5
6
7
8
9
10
11
spark.read.format("orc").load("hdfs://standalone.com:9000/home/liuwen/output/orc/users_with_options.orc").show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+

直接在文件上运行sql

  • 直接在文件上运行sql
复制代码
1
2
3
4
5
6
7
8
9
10
val sqlDF = spark.sql("SELECT * FROM parquet.`hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet`") sqlDF.show //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+

saveAsTable

  • 把数据保存为Hive表
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val sqlDF = spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/employ.json") sqlDF.show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| sqlDF.write.saveAsTable("people_bucketed") val sqlDF2 = spark.sql("select * from people_bucketed")
  • 读取hive表中的数据
复制代码
1
2
3
val sqlDF = spark.sql("select * from people_bucketed")

bucket

  • 把数据保存为Hive表,bucketBy 分桶
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val sqlDF = spark.read.format("json").load("hdfs://standalone.com:9000/home/liuwen/output/json/employ.json") sqlDF.show //+----+-------+ //| age| name| //+----+-------+ //|null|Michael| //| 30| Andy| //| 19| Justin| sqlDF.write.bucketBy(42, "name").sortBy("salary") .saveAsTable("people_bucketed3") val sqlDF2 = spark.sql("select * from people_bucketed3") sqlDF2.show
  • 读取hive表中的数据
复制代码
1
2
3
val sqlDF = spark.sql("select * from people_bucketed3")

partitionBy

  • 把数据保存为Hive表,partitionBy 按字段分区
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val spark = sparkSession(true) val usersDF = spark.read.load("hdfs://standalone.com:9000/home/liuwen/data/parquest/users.parquet") usersDF.show() //+------+--------------+----------------+ //| name|favorite_color|favorite_numbers| //+------+--------------+----------------+ //|Alyssa| null| [3, 9, 15, 20]| //| Ben| red| []| //+------+--------------+----------------+ //保存在HDFS上 hdfs://standalone.com:9000/user/liuwen/namesPartByColor.parquet usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
  • 读取hive表中的数据
复制代码
1
2
3
val sqlDF = spark.sql("select * from namesPartByColor.parquet")

dataFrame的合并

  • 把两个dataSet合并,就是把两个dataSet先保存到hdfs的文件上,这两个dataSet的文件在同一个目录上,再读这个目录下的文件
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import spark.implicits._ val df1 = Seq(1,2,3,5).map(x => (x,x * x)).toDF("a","b") val df2 = Seq(10,20,30,50).map(x => (x,x * x)).toDF("a","b") df1.write.parquet("data/test_table/key=1") df1.show() // +---+---+ // | a| b| // +---+---+ // | 1| 1| // | 2| 4| // | 3| 9| // | 5| 25| // +---+---+ df2.write.parquet("data/test_table/key=2") df2.show() // +---+----+ // | a| b| // +---+----+ // | 10| 100| // | 20| 400| // | 30| 900| // | 50|2500| // +---+----+ val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() // root // |-- a: integer (nullable = true) // |-- b: integer (nullable = true) // |-- key: integer (nullable = true) mergedDF.show() // +---+----+---+ // | a| b|key| // +---+----+---+ // | 10| 100| 2| // | 20| 400| 2| // | 30| 900| 2| // | 50|2500| 2| // | 1| 1| 1| // | 2| 4| 1| // | 3| 9| 1| // | 5| 25| 1| // +---+----+---+

mysql(jdbc)

  • 读mysql的数据
复制代码
1
2
3
4
5
6
7
8
9
val connectionProperties = new Properties() connectionProperties.put("user","admin") connectionProperties.put("password","000000") val jdbcDF = spark.read.jdbc("jdbc:mysql://mysql.com:3306/test","test.test2",connectionProperties) jdbcDF.show()
  • 往mysql写数据
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
val connectionProperties = new Properties() connectionProperties.put("user","admin") connectionProperties.put("password","000000") val jdbcDF = spark.read.jdbc("jdbc:mysql://macbookmysql.com:3306/test","test.test",connectionProperties) jdbcDF.show() jdbcDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://macbookmysql.com:3306/test","test.test3",connectionProperties)

spark hive

  • 就是支持hive的语法,只不过是在spark中执行,把hive的数据转成dataFrame,供spark算子计算
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
val warehouseLocation = new File("spark-warehouse").getAbsolutePath val spark = SparkSession .builder() .master("local") // .master("spark://standalone.com:7077") .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() import spark.sql sql("CREATE database IF NOT EXISTS test_tmp") sql("use test_tmp") sql("CREATE TABLE IF NOT EXISTS student(name VARCHAR(64), age INT)") sql("INSERT INTO TABLE student VALUES ('小王', 35), ('小李', 50)")

end

最后

以上就是彩色大神最近收集整理的关于Spark 2.4.0编程指南--Spark DataSourcesSpark 2.4.0编程指南–Spark DataSources的全部内容,更多相关Spark内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(57)

评论列表共有 0 条评论

立即
投稿
返回
顶部