我是靠谱客的博主 踏实帅哥,这篇文章主要介绍大数据量mysql数据分区(时间)导入hive,Spark,scala实现大数据量mysql数据分区(时间)导入hive,Spark,scala实现,现在分享给大家,希望可以做个参考。

大数据量mysql数据分区(时间)导入hive,Spark,scala实现

说明:代码包含了mysql分区导入hive,hive导入mysql,scala编写

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package datasource import java.sql.{Connection, DriverManager} import java.text.SimpleDateFormat import java.util import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer object DataBaseRead { def main(args: Array[String]) = { val url = "jdbc:mysql://localhost:3306/test" val tablename = "stu1" val user = "root" val password = "root" val column = "createTime" //分区字段 val coltype = "date" //分区类型 val partition = "50" //分区个数 // val frame: DataFrame = getSpark.sql("select * from hehe") // saveASMysqlTable(frame,"result","append") ReadSql(url, tablename, user, password, column, coltype, partition) } //读取mysql数据库到hive def ReadSql(url: String, tablename: String, user: String, password: String, column: String, coltype: String, partition: String) = { // val conf = new SparkConf().setAppName("bigdata test").setMaster("local[*]") // val spark = SparkSession.builder().config(conf).getOrCreate() val spark: SparkSession = getSpark val prop = new Properties() prop.put("driver", "com.mysql.jdbc.Driver") prop.put("url", url) prop.put("dbtable", tablename) prop.put("user", user) prop.put("password", password) //如果分区字段是long类型的,那么在读取mysql的时候,需要多加几个参数:列名,最小值,最大值,分区数 if (coltype.toLowerCase() == "long") { val ab = LongTypeConn("com.mysql.jdbc.Driver", url, user, password, column, tablename) val lowerNum = ab(0) val upperNum = ab(1) val longFrame = spark.read.jdbc( prop.getProperty("url"), prop.getProperty("dbtable"), column, lowerNum, upperNum, partition.toInt, prop ) //longFrame.write.mode(SaveMode.Overwrite).json("D:\out") } //如果是时间类型的,那么在读取的时候需要多一个参数,就是我们自定义划分的时间区间 else if (coltype.toLowerCase() == "date") { var arr2 = DateTypeConn("com.mysql.jdbc.Driver", url, user, password, column, tablename, partition.toInt) val strings: Array[String] = arr2.toArray[String] for (elem <- strings) { println(elem) } //生成dataFrame val dateFrame = spark.read.jdbc( prop.getProperty("url"), prop.getProperty("dbtable"), strings,prop) dateFrame.createTempView("temp") val dtypes: Array[(String, String)] = dateFrame.dtypes var arr="" for (elem <- dtypes) { arr+=elem } val ctr ="create table if not exists "+tablename+"("+"nt" val str: String = arr.replace("IntegerType", "int").replace("StringType","string") .replace("DoubleType", "double").replace("DateType","date").replace("LongType","bigint").replace("TimestampType","string") .replace(","," ").replace(")",",nt").replace("(","").dropRight(3) var end ="nt)" var fengen="," var fenge="nrow format delimited fields terminated by '"+fengen+"'" var sparkSql=ctr+str+end+fenge println(sparkSql) spark.sql(sparkSql) spark.sql("insert into table "+tablename+" select * from temp") spark.close() // dateFrame.write.mode(SaveMode.Overwrite).json("C:\Users\86186\Desktop\out") } /* 如果分区字段是Long类型的数据,比如id,那么我们需要得到该字段的最大和最小值,再根据设置的分区个数进行分区 */ def LongTypeConn(driver: String, url: String, user: String, password: String, column: String, tablename: String): ArrayBuffer[Long] = { var conn: Connection = null val array = new ArrayBuffer[Long]() try { Class.forName(driver) conn = DriverManager.getConnection(url, user, password) val stat = conn.createStatement() val rs = stat.executeQuery("select min(" + column + ") as minNum,max(" + column + ") as maxNum from " + tablename) while (rs.next()) { val minNum = rs.getLong("minNum") val maxNum = rs.getLong("maxNum") array.append(minNum) array.append(maxNum) } return array } catch { case e: Exception => e.printStackTrace() return array } conn.close() return array } } /* 如果分区字段是时间类型的,那么我们需要将数据表中的时间字段划分成一个个的时间段,并放到一个数组中 */ def DateTypeConn(driver: String, url: String, user: String, password: String, column: String, tablename: String, partition: Int):ArrayBuffer[String] ={ var conn:Connection = null val array = new ArrayBuffer[String]() val resArray = ArrayBuffer[(String,String)]() var lastArray = ArrayBuffer[String]() try{ Class.forName(driver) conn = DriverManager.getConnection(url,user,password) val stat = conn.createStatement() val rs = stat.executeQuery("select min(" +column +") as minNum,max(" + column + ") as maxNum from " + tablename) while (rs.next()){ val minNum = rs.getString("minNum") val maxNum = rs.getString("maxNum") array.append(minNum) array.append(maxNum) } /* 因为有很多种时间格式,所以在具体开发过程中,我们需要根据我们自己的数据格式进行处理,此处列举三种常见的时间格式 */ if(array(0).contains("-")){ val sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") var hehe=sf.parse(array(0)).getTime() var minTime = sf.parse(array(0)).getTime() val maxTime = sf.parse(array(1)).getTime() val subNum = (maxTime - minTime)/partition.toLong var midNum = minTime for(i <- 0 to partition - 1){ minTime = midNum midNum = midNum + subNum if(i == 0){ resArray.append(sf.format(minTime) -> sf.format(midNum)) }else if(i == partition - 1){ resArray.append(sf.format(minTime) -> sf.format(maxTime)) }else{ resArray.append(sf.format(minTime) -> sf.format(midNum)) } } for (elem <- resArray) { if(elem._1.toString==sf.format(hehe)){ lastArray+= column.toString + ">= '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'" }else{ lastArray+= column.toString + "> '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'" } } return lastArray }else{ val sf = new SimpleDateFormat("yyyyMMdd HH:mm:ss") var minTime = sf.parse(array(0)).getTime() val maxTime = sf.parse(array(1)).getTime() val subNum = (maxTime - minTime)/partition.toLong var midNum = minTime for(i <- 0 to partition - 1){ minTime = midNum midNum = midNum + subNum if(i == 0){ resArray.append(sf.format(minTime) -> sf.format(midNum)) }else if(i == partition - 1){ resArray.append(sf.format(minTime) -> sf.format(maxTime)) }else{ resArray.append(sf.format(minTime) -> sf.format(midNum)) } } for (elem <- resArray) { if(elem._1.toString==sf.format(hehe)){ lastArray+= column.toString + ">= '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'" }else{ lastArray+= column.toString + "> '" +elem._1+ "' and " + column.toString + " <= '"+elem._2 +"'" } } return lastArray } return lastArray }catch { case e:Exception => e.printStackTrace() return lastArray } conn.close() return lastArray } //获取spark环境用于mysqlToHive def getSpark={ val conf=new SparkConf().set("spark.sql.inMemoryColumnarStorage.Compressed","true").set("spark.sql.crossJoin","true").setAppName("mysql in hive").setMaster("local[*]") val hiveSpark=SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() hiveSpark } //从hive读取数据到mysql def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: String) = { var table = "result" val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以 创建prop 按照spark 的格式 进行配置数据库 prop.setProperty("user", "root") prop.setProperty("password", "root") prop.setProperty("driver", "com.mysql.jdbc.Driver") prop.setProperty("url", "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf-8") if (saveMode == SaveMode.Overwrite) { var conn: Connection = null try { conn = DriverManager.getConnection( prop.getProperty("url"), prop.getProperty("user"), prop.getProperty("password") ) val stmt = conn.createStatement table = table.toUpperCase stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append conn.close() } catch { case e: Exception => println("MySQL Error:") e.printStackTrace() } } dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table, prop) } }

最后

以上就是踏实帅哥最近收集整理的关于大数据量mysql数据分区(时间)导入hive,Spark,scala实现大数据量mysql数据分区(时间)导入hive,Spark,scala实现的全部内容,更多相关大数据量mysql数据分区(时间)导入hive内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(93)

评论列表共有 0 条评论

立即
投稿
返回
顶部