spark读取hive表的数据处理后存到mysql
●agg返回DF类型 括号里接收的是列 所以可以在括号中给列起别名
○直接写count返回的是df 无法给列起别名
●join 所要查询的数据放在leftjoin左边
●注意方法的返回值 确定返回类型是df还是其他类型
●当遇到联查列重复时,对应的df(列名)
●join的写法
○df1.join(df2,Seq(列名),"left")
○rdf1.join(df2,df1(列名)===df2(列名),"left")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51package expandword import org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col, date_format, desc, rank, sum} object four { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("rigion") .enableHiveSupport() .getOrCreate() //使用hive的ods库 spark.sql("use shenyunhang") val ord=spark.table("orders") val cus=spark.table("customer") val nat=spark.table("nation") val reg=spark.table("region ") val rs=ord .join(cus,Seq("custkey"),joinType = "left") .join(nat,cus.col("nationkey")===nat.col("nationkey")) .join(reg,nat.col("regionkey")===reg.col("regionkey")) .select( nat("name").as("nname"), reg("name").as("rname"), ord("totalprice"), (date_format(ord("orderdate"),"yyyyMM")).as("times") ) .groupBy(col("nname"),col("rname"),col("times")) .agg( sum("totalprice").as("sum") ) .orderBy("sum") .select( col("nname"), col("rname"), col("sum"), rank() over(Window.orderBy(desc("sum"))) ) rs.show() //落地 rs.coalesce(1).write .format("jdbc") .mode(SaveMode.Overwrite) .option("driver","com.mysql.cj.jdbc.Driver") .option("url","jdbc:mysql://59.111.104.241:9999/job023_group4") .option("dbtable","ORDERS") .option("user","job023_group4") .option("password","job023@TL") .save() } }
插入mysql的增量数据到hive、动态分区
需要注意的是在我们从MySQl拿到数据动态分区插入到Hive中时,是需要配置的。
●开启动态分区参数设置(还有其他配置,这里用这两个就可以)
○hive.exec.dynamic.partition=true
■开启动态分区功能(默认 true ,开启)
○hive.exec.dynamic.partition.mode=nonstrict
■设置为非严格模式(动态分区的模式,默认 strict ,表示必须指定至少一个分区为静态分区, nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
●在load完后,使用.where可以对加载出来的数据进行筛选
●在这里因为动态分区有格式要求,所以用hive中自带的date_format()方法,进行格式转换
●增量的概念就是每次插入的时候,插进去的数据hive中原来没有的数据,而不是overrite全部重新加载到里边,在这里用到的是在mysql中查到的数据创建一个视图,然后再hive中sql对这个视图和hive中数据的表进行left join查询,最后只取null的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44package expandword import org.apache.spark.sql.{SaveMode, SparkSession} object MysqlToHive2 { def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName("reda mysql to hive limit") // .master("local[*]") .enableHiveSupport() .config("hive.exec.dynamic.partition","true") .config("hive.exec.dynamic.partition.mode","nonstrict") .getOrCreate() //加载mysql的数据 val df = sparkSession.read .format("jdbc") .option("driver","com.mysql.cj.jdbc.Driver") .option("url","jdbc:mysql://59.111.104.241:9999/job023_group4") .option("dbtable","ORDERS") .option("user","job023_group4") .option("password","job023@TL") .load() .where( "orderdate>='1997-12-1'") .createOrReplaceTempView("mysql_orders") //使用hive中的数据 sparkSession.sql("use shenyunhang") sparkSession.sql( """ |insert into table orders1 partition(times) |select t1.*,date_format(t1.orderdate,'yyyyMMdd') times |from | mysql_orders t1 |left join | orders1 t2 |on | t1.orderkey = t2.orderkey |where t2.orderkey is null |""".stripMargin) } }
最后
以上就是甜甜柜子最近收集整理的关于spark与hive,mysql交互的全部内容,更多相关spark与hive内容请搜索靠谱客的其他文章。
发表评论 取消回复