写入mysql和hive的方式
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57package com.ithhs.spark import java.util.Properties import org.apache.spark.sql.functions.{coalesce, from_unixtime, to_date, unix_timestamp} import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession} object JdbcDemo { def main(args: Array[String]): Unit = { //创建执行环境 val spark: SparkSession = SparkSession.builder() //hive元数据 .config("hive.metastore.uris", "thrift://192.168.80.81:9083") //hive存储路径 .config("spark.sql.warehouse.dir", "hdfs://192.168.80.81:9000/user/hive/warehouse") //执行本地 .master("local") //名称 .appName("data") //必须加enableHiveSupport(),这样才可以往hive数据表写数据 .enableHiveSupport() .getOrCreate() // 连接mysql方式1 建议使用该方式 val properties = new Properties() properties.put("user", "root") //用户名 properties.put("password", "password") // 密码 val jdbc_url = "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8"//连接数据库 val df1: DataFrame = spark.read.jdbc(jdbc_url, "(select * from test) tmp", properties) //jdbc //连接mysql方式2 不建议使用 val df2: DataFrame = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8")//连接数据库 //.option("dbtable", "(select a.*,b.cid,b.cname from province a left join city b on a.pid=b.pid) tmp") .option("dbtable", "(select * from tmail) tmp")//sql语句,也可以是表名 .option("user", "root")//用户 .option("password", "password")//密码 .load() df2.show() //连接mysql方式3 val mapJdbc = Map( "url" -> "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8", //连接数据库 "user" -> "root", //用户 "password" -> "password", //密码 "dbtable" -> "(select * from user)as a" //按要求写sql语句 ) //读取mysql数据 val df3: DataFrame = spark.read.format("jdbc") .options(mapJdbc).load() df3.show() //写入hive数据库的操作,Overwrite覆盖 Append追加,ErrorIfExists存在保报错,Ignore:如果存在就忽略 df3.write.mode(SaveMode.Append).format("hive").saveAsTable("web.users")// //动态分区 //因为要做动态分区, 所以要先设定partition参数 //由于default是false, 需要额外下指令打开这个开关 spark sql ("set hive.exec.dynamic.partition=true") spark sql ("set hive.exec.dynamic.partition.mode=nostrick") //指定分区字段到分区表中 df3.write.mode(SaveMode.Append).format("hive").partitionBy("bt").saveAsTable("gdcmxy.users") } }
最后
以上就是高大河马最近收集整理的关于spark写入mysql和hive的方式写入mysql和hive的方式的全部内容,更多相关spark写入mysql和hive内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复