package com.citic.guoan.test
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.hive.HiveContext
import java.io.{File, Serializable}
import java.util.Properties
import com.mysql.jdbc.JDBC4MysqlSQLXML
object LoadDB extends Serializable {
private var sparkConf: SparkConf = null
private var javaSparkContext: JavaSparkContext = null
private var hiveContext: HiveContext = null
private var sqlContext: SQLContext = null
def main(args: Array[String]): Unit = {
initSparkContext()
initSQLContext()
initHiveContext()
System.out.println(" ---------------------- start hive2db ------------------------")
hive2db(args(0),args(1))
System.out.println(" ---------------------- finish hive2db ------------------------")
// System.out.println(""" ---------------------- start db2db ------------------------""")
// db2db()
// System.out.println(" ---------------------- finish db2db ------------------------")
System.exit(0)
}
/*
* 创建sparkContext
* */
def initSparkContext(): Unit = {
// val warehouseLocation = System.getProperty("user.dir")
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
// sparkConf = new SparkConf().setAppName("from-to-mysql").
// set("spark.sql.warehouse.dir", warehouseLocation).
// setMaster("yarn-client")
val spark = SparkSession
.builder()
.appName("LoadDB")
.config("spark.some.config.option", "some-value")
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("spark.sql.shuffle.partitions", 1) //TODO 该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小. 当前分组排序的数据多分区,数据会打乱,临时先用1个分区. 后续完善
.enableHiveSupport()
.getOrCreate()
javaSparkContext = new JavaSparkContext(spark.sparkContext)
}
/*
* 创建hiveContext
* 用于读取Hive中的数据
* */
def initHiveContext(): Unit = {
hiveContext = new HiveContext(javaSparkContext)
}
/*
* 创建sqlContext
* 用于读写MySQL中的数据
* */
def initSQLContext(): Unit = {
sqlContext = new SQLContext(javaSparkContext)
}
/*
* 使用spark-sql从hive中读取数据, 然后写入mysql对应表.
* */
def hive2db(mysqlTable: String ,where:String): Unit = {
val url = "jdbc:mysql://10.254.15.61:3306/test?characterEncoding=UTF-8"
val props = new Properties
props.put("user", "root")
props.put("password", "123456")
val query =
s"""
|select * from hive.$mysqlTable $where
""".stripMargin
val rows = hiveContext.sql(query)
rows.write.mode(SaveMode.Append).jdbc(url, mysqlTable, props)
}
/*
* 使用spark-sql从db中读取数据, 处理后再回写到db
* */
def db2db(): Unit = {
val url = "jdbc:mysql://10.254.15.61:3306/test?characterEncoding=UTF-8"
val fromTable = "accounts"
val toTable = "accountsPart"
val props = new Properties
props.put("user", "root")
props.put("password", "123456")
val rows = sqlContext.read.jdbc(url, fromTable, props)
rows.write.mode(SaveMode.Append).jdbc(url, toTable, props)
}
}
最后
以上就是矮小糖豆最近收集整理的关于scala hive数据到mysql 含分区.的全部内容,更多相关scala内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复