我是靠谱客的博主 矮小糖豆,这篇文章主要介绍scala hive数据到mysql 含分区.,现在分享给大家,希望可以做个参考。

package com.citic.guoan.test

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.hive.HiveContext
import java.io.{File, Serializable}
import java.util.Properties

import com.mysql.jdbc.JDBC4MysqlSQLXML


object LoadDB extends Serializable {
  private var sparkConf: SparkConf = null
  private var javaSparkContext: JavaSparkContext = null
  private var hiveContext: HiveContext = null
  private var sqlContext: SQLContext = null

  def main(args: Array[String]): Unit = {
    initSparkContext()
    initSQLContext()
    initHiveContext()
    System.out.println(" ---------------------- start hive2db ------------------------")
    hive2db(args(0),args(1))
    System.out.println(" ---------------------- finish hive2db ------------------------")
//    System.out.println(""" ---------------------- start db2db ------------------------""")
//    db2db()
//    System.out.println(" ---------------------- finish db2db ------------------------")
    System.exit(0)
  }


  /*
      *   创建sparkContext
      * */
  def initSparkContext(): Unit = {
//    val warehouseLocation = System.getProperty("user.dir")
    val warehouseLocation = new File("spark-warehouse").getAbsolutePath
//    sparkConf = new SparkConf().setAppName("from-to-mysql").
//      set("spark.sql.warehouse.dir", warehouseLocation).
//      setMaster("yarn-client")

    val spark = SparkSession
      .builder()
      .appName("LoadDB")
      .config("spark.some.config.option", "some-value")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .config("spark.sql.shuffle.partitions", 1) //TODO 该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小. 当前分组排序的数据多分区,数据会打乱,临时先用1个分区. 后续完善
      .enableHiveSupport()
      .getOrCreate()


    javaSparkContext =  new JavaSparkContext(spark.sparkContext)
  }

  /*
      *   创建hiveContext
      *   用于读取Hive中的数据
      * */
  def initHiveContext(): Unit = {
    hiveContext = new HiveContext(javaSparkContext)
  }

  /*
      *   创建sqlContext
      *   用于读写MySQL中的数据
      * */
  def initSQLContext(): Unit = {
    sqlContext = new SQLContext(javaSparkContext)
  }

  /*
      *   使用spark-sql从hive中读取数据, 然后写入mysql对应表.
      * */
  def hive2db(mysqlTable: String ,where:String): Unit = {
    val url = "jdbc:mysql://10.254.15.61:3306/test?characterEncoding=UTF-8"
    val props = new Properties
    props.put("user", "root")
    props.put("password", "123456")
    val query =
      s"""
         |select * from hive.$mysqlTable  $where
       """.stripMargin
    val rows = hiveContext.sql(query)

    rows.write.mode(SaveMode.Append).jdbc(url, mysqlTable, props)
  }

  /*
      *   使用spark-sql从db中读取数据, 处理后再回写到db
      * */
  def db2db(): Unit = {
    val url = "jdbc:mysql://10.254.15.61:3306/test?characterEncoding=UTF-8"
    val fromTable = "accounts"
    val toTable = "accountsPart"
    val props = new Properties
    props.put("user", "root")
    props.put("password", "123456")
    val rows = sqlContext.read.jdbc(url, fromTable, props)
    rows.write.mode(SaveMode.Append).jdbc(url, toTable, props)
  }
}

最后

以上就是矮小糖豆最近收集整理的关于scala hive数据到mysql 含分区.的全部内容,更多相关scala内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部