我是靠谱客的博主 踏实帅哥,最近开发中收集的这篇文章主要介绍大数据量mysql数据分区(时间)导入hive,Spark,scala实现大数据量mysql数据分区(时间)导入hive,Spark,scala实现,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
大数据量mysql数据分区(时间)导入hive,Spark,scala实现
说明:代码包含了mysql分区导入hive,hive导入mysql,scala编写
package datasource
import java.sql.{Connection, DriverManager}
import java.text.SimpleDateFormat
import java.util
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
object DataBaseRead {
def main(args: Array[String]) = {
val url = "jdbc:mysql://localhost:3306/test"
val tablename = "stu1"
val user = "root"
val password = "root"
val column = "createTime" //分区字段
val coltype = "date" //分区类型
val partition = "50" //分区个数
// val frame: DataFrame = getSpark.sql("select * from hehe")
// saveASMysqlTable(frame,"result","append")
ReadSql(url, tablename, user, password, column, coltype, partition)
}
//读取mysql数据库到hive
def ReadSql(url: String, tablename: String, user: String, password: String, column: String, coltype: String, partition: String) = {
// val conf = new SparkConf().setAppName("bigdata test").setMaster("local[*]")
// val spark = SparkSession.builder().config(conf).getOrCreate()
val spark: SparkSession = getSpark
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
prop.put("url", url)
prop.put("dbtable", tablename)
prop.put("user", user)
prop.put("password", password)
//如果分区字段是long类型的,那么在读取mysql的时候,需要多加几个参数:列名,最小值,最大值,分区数
if (coltype.toLowerCase() == "long") {
val ab = LongTypeConn("com.mysql.jdbc.Driver", url, user, password, column, tablename)
val lowerNum = ab(0)
val upperNum = ab(1)
val longFrame = spark.read.jdbc(
prop.getProperty("url"),
prop.getProperty("dbtable"),
column, lowerNum, upperNum,
partition.toInt, prop
)
//longFrame.write.mode(SaveMode.Overwrite).json("D:\out")
}
//如果是时间类型的,那么在读取的时候需要多一个参数,就是我们自定义划分的时间区间
else if (coltype.toLowerCase() == "date") {
var arr2 = DateTypeConn("com.mysql.jdbc.Driver", url, user, password, column, tablename, partition.toInt)
val strings: Array[String] = arr2.toArray[String]
for (elem <- strings) {
println(elem)
}
//生成dataFrame
val dateFrame = spark.read.jdbc(
prop.getProperty("url"),
prop.getProperty("dbtable"),
strings,prop)
dateFrame.createTempView("temp")
val dtypes: Array[(String, String)] = dateFrame.dtypes
var arr=""
for (elem <- dtypes) {
arr+=elem
}
val ctr ="create table if not exists "+tablename+"("+"nt"
val str: String = arr.replace("IntegerType", "int").replace("StringType","string")
.replace("DoubleType", "double").replace("DateType","date").replace("LongType","bigint").replace("TimestampType","string")
.replace(","," ").replace(")",",nt").replace("(","").dropRight(3)
var end ="nt)"
var fengen=","
var fenge="nrow format delimited fields terminated by '"+fengen+"'"
var sparkSql=ctr+str+end+fenge
println(sparkSql)
spark.sql(sparkSql)
spark.sql("insert into table "+tablename+" select * from temp")
spark.close()
// dateFrame.write.mode(SaveMode.Overwrite).json("C:\Users\86186\Desktop\out")
}
/*
如果分区字段是Long类型的数据,比如id,那么我们需要得到该字段的最大和最小值,再根据设置的分区个数进行分区
*/
def LongTypeConn(driver: String, url: String, user: String, password: String, column: String, tablename: String): ArrayBuffer[Long] = {
var conn: Connection = null
val array = new ArrayBuffer[Long]()
try {
Class.forName(driver)
conn = DriverManager.getConnection(url, user, password)
val stat = conn.createStatement()
val rs = stat.executeQuery("select min(" + column + ") as minNum,max(" + column + ") as maxNum from " + tablename)
while (rs.next()) {
val minNum = rs.getLong("minNum")
val maxNum = rs.getLong("maxNum")
array.append(minNum)
array.append(maxNum)
}
return array
} catch {
case e: Exception => e.printStackTrace()
return array
}
conn.close()
return array
}
}
/*
如果分区字段是时间类型的,那么我们需要将数据表中的时间字段划分成一个个的时间段,并放到一个数组中
*/
def DateTypeConn(driver: String, url: String, user: String, password: String, column: String, tablename: String, partition: Int):ArrayBuffer[String] ={
var conn:Connection = null
val array = new ArrayBuffer[String]()
val resArray = ArrayBuffer[(String,String)]()
var lastArray = ArrayBuffer[String]()
try{
Class.forName(driver)
conn = DriverManager.getConnection(url,user,password)
val stat = conn.createStatement()
val rs = stat.executeQuery("select min(" +column +") as minNum,max(" + column + ") as maxNum from " + tablename)
while (rs.next()){
val minNum = rs.getString("minNum")
val maxNum = rs.getString("maxNum")
array.append(minNum)
array.append(maxNum)
}
/*
因为有很多种时间格式,所以在具体开发过程中,我们需要根据我们自己的数据格式进行处理,此处列举三种常见的时间格式
*/
if(array(0).contains("-")){
val sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var hehe=sf.parse(array(0)).getTime()
var minTime = sf.parse(array(0)).getTime()
val maxTime = sf.parse(array(1)).getTime()
val subNum = (maxTime - minTime)/partition.toLong
var midNum = minTime
for(i <- 0 to partition - 1){
minTime = midNum
midNum = midNum + subNum
if(i == 0){
resArray.append(sf.format(minTime) -> sf.format(midNum))
}else if(i == partition - 1){
resArray.append(sf.format(minTime) -> sf.format(maxTime))
}else{
resArray.append(sf.format(minTime) -> sf.format(midNum))
}
}
for (elem <- resArray) {
if(elem._1.toString==sf.format(hehe)){
lastArray+= column.toString + ">= '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'"
}else{
lastArray+= column.toString + "> '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'"
}
}
return lastArray
}else{
val sf = new SimpleDateFormat("yyyyMMdd HH:mm:ss")
var minTime = sf.parse(array(0)).getTime()
val maxTime = sf.parse(array(1)).getTime()
val subNum = (maxTime - minTime)/partition.toLong
var midNum = minTime
for(i <- 0 to partition - 1){
minTime = midNum
midNum = midNum + subNum
if(i == 0){
resArray.append(sf.format(minTime) -> sf.format(midNum))
}else if(i == partition - 1){
resArray.append(sf.format(minTime) -> sf.format(maxTime))
}else{
resArray.append(sf.format(minTime) -> sf.format(midNum))
}
}
for (elem <- resArray) {
if(elem._1.toString==sf.format(hehe)){
lastArray+= column.toString + ">= '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'"
}else{
lastArray+= column.toString + "> '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'"
}
}
return lastArray
}
return lastArray
}catch {
case e:Exception => e.printStackTrace()
return lastArray
}
conn.close()
return lastArray
}
//获取spark环境用于mysqlToHive
def getSpark={
val conf=new SparkConf().set("spark.sql.inMemoryColumnarStorage.Compressed","true").set("spark.sql.crossJoin","true").setAppName("mysql in hive").setMaster("local[*]")
val hiveSpark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
hiveSpark
}
//从hive读取数据到mysql
def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: String) = {
var table = "result"
val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以 创建prop 按照spark 的格式 进行配置数据库
prop.setProperty("user", "root")
prop.setProperty("password", "root")
prop.setProperty("driver", "com.mysql.jdbc.Driver")
prop.setProperty("url", "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf-8")
if (saveMode == SaveMode.Overwrite) {
var conn: Connection = null
try {
conn = DriverManager.getConnection(
prop.getProperty("url"),
prop.getProperty("user"),
prop.getProperty("password")
)
val stmt = conn.createStatement
table = table.toUpperCase
stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
conn.close()
}
catch {
case e: Exception =>
println("MySQL Error:")
e.printStackTrace()
}
}
dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table, prop)
}
}
最后
以上就是踏实帅哥为你收集整理的大数据量mysql数据分区(时间)导入hive,Spark,scala实现大数据量mysql数据分区(时间)导入hive,Spark,scala实现的全部内容,希望文章能够帮你解决大数据量mysql数据分区(时间)导入hive,Spark,scala实现大数据量mysql数据分区(时间)导入hive,Spark,scala实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复