概述
import java.sql.Connection
import scala.collection.mutable.ArrayBuffer
object JdbcTemplateUtil extends Serializable {
/**
* 单条操作
* @param sql
* @param params
*/
def executeSql(conn: Connection, sql: String, params: Array[String]): Unit = {
try {
val ps = conn.prepareStatement(sql)
if (params != null) {
for (i <- params.indices)
ps.setString(i + 1, params(i))
}
//noinspection ScalaUnusedSymbol
val update = ps.executeUpdate()
ps.close()
} catch {
case e: Exception => println(">>>Execute Sql Exception..." + e)
}
}
/**
* 批量操作
* @param sql
* @param paramList
*/
def executeBatchSql(conn: Connection, sql: String, paramList: ArrayBuffer[Array[String]]): Unit = {
try {
val ps = conn.prepareStatement(sql)
conn.setAutoCommit(false)
for (params: Array[String] <- paramList) {
if (params != null) {
for (i <- params.indices) ps.setString(i + 1, params(i))
ps.addBatch()
}
}
// val update = ps.executeUpdate()
ps.executeBatch()
conn.commit()
ps.close()
conn.close()
} catch {
case e: Exception => println(">>>Execute Batch Sql Exception..." + e)
}
}
}
import java.sql.Connection
import java.text.{ParseException, SimpleDateFormat}
import java.util.{Calendar, Date}
import scala.collection.mutable.ArrayBuffer
object DateTimeUtil extends Serializable {
/**
* 获取本机日历日期
* @param delta
* @return
*/
def dateDelta(delta: Int, separator: String): String = {
val sdf = new SimpleDateFormat("yyyy" + separator + "MM" + separator + "dd")
val cal = Calendar.getInstance()
cal.add(Calendar.DATE, delta)
val date = sdf.format(cal.getTime)
date
}
}
数据写入JDBC时,Spark提供了以下几种模式,注意:
SaveMode.ErrorIfExists模式(默认),该模式下,若数据库中已经存在该表,则会直接报异常,导致数据不能存入数据库;
SaveMode.Append 若表已经存在,则追加在该表中;若该表不存在,则会先创建表,再插入数据;
SaveMode.Overwrite 重写模式,其本质是先将已有的表及其数据全都删除,再重新创建该表,然后插入新的数据;
SaveMode.Ignore 若表不存在,则创建表,并存入数据;若表存在的情况下,直接跳过数据的存储,不会报错。
SparkSQL读取Hive表数据,然后将数据写入MySQL,如何避免主键冲突?
1、基于日期删除当前mysql表中数据,然后再写入。但是需注意当前写入的批数据中主键唯一。
2、基于duplicate key关键词解决主键冲突,使用PrepareStatement来完成JDBC写入。
但是这样就得舍弃spark提供的JDBC TO Other DataBases。
具体参照官网:http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
下文使用SaveMode.Append 以及确保delete删除当前日期数据,然后再次写入当前批数据到mysql。
import java.sql.{Connection, DriverManager}
import com.berg.commons.enums.PropertyEnum
import com.berg.commons.utils._
import org.apache.log4j.Logger
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.commons.codec.digest.DigestUtils
case class GisOmsAddressFilterDO(ID: String, STAT_DATE: String, SN: String, ADDRESS: String , AK: String, RESULT: Int, CITY_CODE: Int, REQUEST_DATE_TIME: String)
object GisOmsAddressFilterStorage extends Serializable{
val appName: String = this.getClass.getSimpleName.replace("$", "")
val logger: Logger = Logger.getLogger(appName)
private[GisOmsAddressFilterStorage] val dbName: String = "xxx"
private[GisOmsAddressFilterStorage] val dbTableName: String = "xxx"
private[GisOmsAddressFilterStorage] val sourceTableName = "xxx"
private[GisOmsAddressFilterStorage] val url = PropertyUtil.getPropertyValue(PropertyEnum.DATABASE_DEV.getValue(),"berg.mysql.driverURL")
private[GisOmsAddressFilterStorage] val user = PropertyUtil.getPropertyValue(PropertyEnum.DATABASE_DEV.getValue(),"berg.mysql.user")
private[GisOmsAddressFilterStorage] val password = PropertyUtil.getPropertyValue(PropertyEnum.DATABASE_DEV.getValue(),"berg.mysql.password")
val dlr = "$"
/**
* @param spark
* @param incDay
* */
def saveGisOmsAddressData(spark: SparkSession, incDay: String):Unit={
val sparkSql = s"""
| select
| *
| from $sourceTableName
| where inc_day = '$incDay'
""".stripMargin
logger.error(">>>>>>Execute Hive Sql: " + sparkSql)
val df = spark.sql(sparkSql).toDF("sn", "address", "ak", "result", "cityCode", "dateTime", "statDate")
import spark.implicits._
val jdbcDF = df.rdd.map(row=>{
val sn = row.getAs[String]("sn")
val address = row.getAs[String]("address")
val ak = row.getAs[String]("ak")
val result = row.getAs[String]("result").toInt
val cityCode = row.getAs[String]("cityCode").toInt
val dateTime = row.getAs[String]("dateTime")
val statDate = row.getAs[String]("statDate")
val id = DigestUtils.md5Hex(statDate.concat(sn))
GisOmsAddressFilterDO(id, statDate, sn, address, ak, result, cityCode, dateTime)
}).toDF ()
try {
val delSql = s"DELETE FROM $dbTableName WHERE STAT_DATE = '$incDay'"
logger.error(">>>>>>Execute Del Sql: " + delSql)
val conn: Connection = DriverManager.getConnection(url,user,password)
JdbcTemplateUtil.executeSql(conn,delSql,null)
jdbcDF.write.format("jdbc").mode(SaveMode.Append)
.option("url", url)
.option("user", user)
.option("password", password)
.option("dbtable", dbTableName)
.save()
logger.error(">>>>>>Save OK!!!")
} catch {
case e: Exception => logger.error(">>>OP DataBase Exception: "+e)
}
spark.stop()
}
def start(incDay: String): Unit = {
val spark = SparkSession.builder().config(ConfigUtil.getSparkConf(appName)).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
saveGisOmsAddressData(spark, incDay)
}
def main(args: Array[String]): Unit = {
val incDay: String = DateTimeUtil.dateDelta(-1, "")
logger.info(incDay)
start(incDay)
}
}
更多学习: https://blog.csdn.net/u011622631/article/details/84572022
最后
以上就是欢呼小海豚为你收集整理的Spark | 读取Hive表数据写入MySQL的全部内容,希望文章能够帮你解决Spark | 读取Hive表数据写入MySQL所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复