概述
写入mysql和hive的方式
package com.ithhs.spark
import java.util.Properties
import org.apache.spark.sql.functions.{coalesce, from_unixtime, to_date, unix_timestamp}
import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
object JdbcDemo {
def main(args: Array[String]): Unit = {
//创建执行环境
val spark: SparkSession = SparkSession.builder()
//hive元数据
.config("hive.metastore.uris", "thrift://192.168.80.81:9083")
//hive存储路径
.config("spark.sql.warehouse.dir", "hdfs://192.168.80.81:9000/user/hive/warehouse")
//执行本地
.master("local")
//名称
.appName("data")
//必须加enableHiveSupport(),这样才可以往hive数据表写数据
.enableHiveSupport()
.getOrCreate()
// 连接mysql方式1 建议使用该方式
val properties = new Properties()
properties.put("user", "root") //用户名
properties.put("password", "password") // 密码
val jdbc_url = "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8"//连接数据库
val df1: DataFrame = spark.read.jdbc(jdbc_url, "(select * from test) tmp", properties) //jdbc
//连接mysql方式2 不建议使用
val df2: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8")//连接数据库
//.option("dbtable", "(select a.*,b.cid,b.cname from province a left join city b on a.pid=b.pid) tmp")
.option("dbtable", "(select * from tmail) tmp")//sql语句,也可以是表名
.option("user", "root")//用户
.option("password", "password")//密码
.load()
df2.show()
//连接mysql方式3
val mapJdbc = Map(
"url" -> "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8", //连接数据库
"user" -> "root", //用户
"password" -> "password", //密码
"dbtable" -> "(select * from user)as a" //按要求写sql语句
)
//读取mysql数据
val df3: DataFrame = spark.read.format("jdbc")
.options(mapJdbc).load()
df3.show()
//写入hive数据库的操作,Overwrite覆盖 Append追加,ErrorIfExists存在保报错,Ignore:如果存在就忽略
df3.write.mode(SaveMode.Append).format("hive").saveAsTable("web.users")//
//动态分区
//因为要做动态分区, 所以要先设定partition参数
//由于default是false, 需要额外下指令打开这个开关
spark sql ("set hive.exec.dynamic.partition=true")
spark sql ("set hive.exec.dynamic.partition.mode=nostrick")
//指定分区字段到分区表中
df3.write.mode(SaveMode.Append).format("hive").partitionBy("bt").saveAsTable("gdcmxy.users")
}
}
最后
以上就是高大河马为你收集整理的spark写入mysql和hive的方式写入mysql和hive的方式的全部内容,希望文章能够帮你解决spark写入mysql和hive的方式写入mysql和hive的方式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复