我是靠谱客的博主 甜甜柜子,最近开发中收集的这篇文章主要介绍spark与hive,mysql交互,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

spark读取hive表的数据处理后存到mysql

●agg返回DF类型 括号里接收的是列 所以可以在括号中给列起别名
○直接写count返回的是df 无法给列起别名
●join 所要查询的数据放在leftjoin左边
●注意方法的返回值 确定返回类型是df还是其他类型
●当遇到联查列重复时,对应的df(列名)
●join的写法
○df1.join(df2,Seq(列名),"left")
○rdf1.join(df2,df1(列名)===df2(列名),"left")

package expandword
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, date_format, desc, rank, sum}
object four {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("rigion")
.enableHiveSupport()
.getOrCreate()
//使用hive的ods库
spark.sql("use shenyunhang")
val ord=spark.table("orders")
val cus=spark.table("customer")
val nat=spark.table("nation")
val reg=spark.table("region ")
val rs=ord
.join(cus,Seq("custkey"),joinType = "left")
.join(nat,cus.col("nationkey")===nat.col("nationkey"))
.join(reg,nat.col("regionkey")===reg.col("regionkey"))
.select(
nat("name").as("nname"),
reg("name").as("rname"),
ord("totalprice"),
(date_format(ord("orderdate"),"yyyyMM")).as("times")
)
.groupBy(col("nname"),col("rname"),col("times"))
.agg(
sum("totalprice").as("sum")
)
.orderBy("sum")
.select(
col("nname"),
col("rname"),
col("sum"),
rank() over(Window.orderBy(desc("sum")))
)
rs.show()
//落地
rs.coalesce(1).write
.format("jdbc")
.mode(SaveMode.Overwrite)
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url","jdbc:mysql://59.111.104.241:9999/job023_group4")
.option("dbtable","ORDERS")
.option("user","job023_group4")
.option("password","job023@TL")
.save()
}
}

插入mysql的增量数据到hive、动态分区

需要注意的是在我们从MySQl拿到数据动态分区插入到Hive中时,是需要配置的。
●开启动态分区参数设置(还有其他配置,这里用这两个就可以)
○hive.exec.dynamic.partition=true
■开启动态分区功能(默认 true ,开启)
○hive.exec.dynamic.partition.mode=nonstrict
■设置为非严格模式(动态分区的模式,默认 strict ,表示必须指定至少一个分区为静态分区, nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
●在load完后,使用.where可以对加载出来的数据进行筛选
●在这里因为动态分区有格式要求,所以用hive中自带的date_format()方法,进行格式转换
●增量的概念就是每次插入的时候,插进去的数据hive中原来没有的数据,而不是overrite全部重新加载到里边,在这里用到的是在mysql中查到的数据创建一个视图,然后再hive中sql对这个视图和hive中数据的表进行left join查询,最后只取null的数据

package expandword
import org.apache.spark.sql.{SaveMode, SparkSession}
object MysqlToHive2 {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.appName("reda mysql to hive limit")
//
.master("local[*]")
.enableHiveSupport()
.config("hive.exec.dynamic.partition","true")
.config("hive.exec.dynamic.partition.mode","nonstrict")
.getOrCreate()
//加载mysql的数据
val df = sparkSession.read
.format("jdbc")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("url","jdbc:mysql://59.111.104.241:9999/job023_group4")
.option("dbtable","ORDERS")
.option("user","job023_group4")
.option("password","job023@TL")
.load()
.where(
"orderdate>='1997-12-1'")
.createOrReplaceTempView("mysql_orders")
//使用hive中的数据
sparkSession.sql("use shenyunhang")
sparkSession.sql(
"""
|insert into table orders1 partition(times)
|select t1.*,date_format(t1.orderdate,'yyyyMMdd') times
|from
|
mysql_orders t1
|left join
|
orders1 t2
|on
|
t1.orderkey = t2.orderkey
|where t2.orderkey is null
|""".stripMargin)
}
}

最后

以上就是甜甜柜子为你收集整理的spark与hive,mysql交互的全部内容,希望文章能够帮你解决spark与hive,mysql交互所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(88)

评论列表共有 0 条评论

立即
投稿
返回
顶部