我是靠谱客的博主 怕黑菠萝,这篇文章主要介绍dataX同步mysql至hive,现在分享给大家,希望可以做个参考。

1、前言

我们的业务数据基本都是在数据库中,如果需要离线同步到hdfs我们就需要使用dataX工具。使用dataX只需要学好json脚本,配置好数据源和路径就可以了。以下是我的一个mysql同步到HIve,以上的变量都可以通过传参统一一个脚本处理。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
{ "job": { "setting": { "speed": { "channel": 3 } }, "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username": "${username}", "password": "${password}", "connection": [ { "jdbcUrl": [ "${jdbcUrl}" ], "querySql": [ "select id,create_time,update_time from ${sourceTableName} where update_time<'${endTime}' " ] } ] } }, "writer": { "name": "hdfswriter", "parameter": { "column": [ { "name": "id", "type": "string" }, { "name": "create_time", "type": "string" }, { "name": "update_time", "type": "string" } ], "isCompress": "${isCompress}", "defaultFS": "${hdfsPath}", "fieldDelimiter": "${fieldDelimiter}", "fileName": "${fileName}", "fileType": "${fileType}", "path": "${path}", "writeMode": "${writeMode}" } } } ] } }

2、参数调用和传参

很多人在问如何使用脚本变量如何传参。
首先我们要会使用dataX如何调用这个脚本。

复制代码
1
2
python ${DATAX_HOME}/bin/datax.py -p"-DtargetDBName=$TARGET_DB_NAME -DtargetTableName=$TARGET_TABLE_NAME -DjdbcUrl=$MYSQL_URL -Dusername=$MYSQL_USERNAME -Dpassword=$MYSQL_PASSWD -DsourceTableName=$SOURCE_TABLE_NAME -DhdfsPath=$HDFS_PATH -DstartTime=${START_TIME} -DendTime=${END_TIME} -DisCompress=$ISCOMPRESS -DwriteMode=$WRITEMODE -DfileType=$FILETYPE -DfieldDelimiter='$FIELDDELIMITER' -DfileName=$TARGET_TABLE_NAME -Dpath=${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/day=$DT_TIME" $DATAX_JSON_FILE;

在这个命令中你会发现将所有的变量都通过shell命令传递进去了。后续的这些变量传递我在更新。之所以这么多变量其主要是为了方便后续的脚本更新和调度运行。
对于开发人员只需要关心主要逻辑就行了。
有了这个基础脚本,我们就可以将HIVE上的一些功能一起合并到shell脚本中:

  • 增量同步,保留全部数据。
  • 全量同步,全量同步只保留固定周期的历史全量。
  • 刷新元数据。
  • 通知更新成功。
  • 多个mysql业务库的匹配。
  • 生产业务库密码的保护。

3、封装shell调用脚本

基于上面的考虑。封装dataX的调用脚本。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
#!/bin/bash source /etc/profile DATAX_HOME="/home/data/datax" SHELL_PATH="/home/data/dw_datax" SCRIPT_PATH=${SHELL_PATH}/job DATAX_LOG=${SHELL_PATH}/logs/datax.log.`date "+%Y-%m-%d"` HDFS_PATH="hdfs://hdfs-cluster" #START_TIME=$(date -d "-1 day" +%Y-%m-%d) #END_TIME=$(date "+%Y-%m-%d") #DT_TIME=$(date -d "-1 day" +%Y%m%d) START_TIME="" END_TIME="" DT_TIME="" #失效日期 INVALID_DATE="" #失效天数 INVALID_DAYS=180 #是否清除失效数据:默认清除 IS_CLEAR_INVALID_DATA=1 #参数 ISCOMPRESS="false" WRITEMODE="nonConflict" FIELDDELIMITER="|" FILETYPE="orc" PATH_HIVE="/user/hive/warehouse/" MYSQL_URL="" #数据库用户名 MYSQL_USERNAME="admin" #数据库密码 MYSQL_PASSWD="123456" #默认同步目标库名 TARGET_DB_NAME="ods" #同步源库名 SOURCE_DB_NAME="" #同步源表名 SOURCE_TABLE_NAME="" #业务名称 BUSINESS_NAME="" #datax json文件 DATAX_JSON_FILE=/temp # 数据库实例信息 declare -A db_instance_conf # 数据库用户名 declare -A db_instance_user_conf # 数据库密码 declare -A db_instance_pwd_conf # 数据库实例与库映射关系 declare -A db_instance_maps # 初始化数据库实例配置 function initInstanceConf() { # 主业务线 ywx1 db_instance_conf["db_main_data"]="jdbc:mysql://192.168.1.1:3306/" db_instance_user_conf["db_main_data"]="admin" db_instance_pwd_conf["db_main_data"]="123456" # 业务线2 ywx2 db_instance_conf["db_data"]="jdbc:mysql://192.168.1.2:3306/" db_instance_user_conf["db_data"]="admin" db_instance_pwd_conf["db_data"]="123456" ... } # 初始化库和数据库实例映射关系 function initDbAndInstanceMaps() { #主业务线 db_instance_maps["ywx1_db_main"]="db_main_data" #业务线2 db_instance_maps["ywx2_db_data"]="db_data" #业务线3 db_instance_maps["ywx3_db_insurance"]="db_ywx3" ... ... db_instance_maps["dss_db_dss"]="db_dss" } #时间处理 传入参数 yyyy-mm-dd function DateProcess() { echo "日期时间为"$1 if echo $1 | grep -Eq "[0-9]{4}-[0-9]{2}-[0-9]{2}" && date -d $1 +%Y%m%d > /dev/null 2>&1 then : START_TIME=$(date -d $1 "+%Y-%m-%d") END_TIME=$(date -d "$1 +1 day" +%Y-%m-%d) DT_TIME=$(date -d $1 +"%Y%m%d") INVALID_DATE=$(date -d "$1 -$INVALID_DAYS day" +%Y%m%d) echo 时间正确: $START_TIME / $END_TIME / $DT_TIME / $INVALID_DATE; else echo "输入的日期格式不正确,应为yyyy-mm-dd"; exit 1; fi; } function DataConnect() { db_business_key="$BUSINESS_NAME""_""$SOURCE_DB_NAME" db_instance_key=${db_instance_maps["$db_business_key"]} echo $db_business_key $db_instance_key if [ ! -n "$db_instance_key" ]; then echo "当前数据库连接信息不存在,请确认业务和数据库连接是否正确或联系管理员添加" exit 1; fi db_instance_value=${db_instance_conf["$db_instance_key"]} MYSQL_USERNAME=${db_instance_user_conf["$db_instance_key"]} MYSQL_PASSWD=${db_instance_pwd_conf["$db_instance_key"]} echo $db_instance_value if [ ! -n "$db_instance_value" ]; then echo "当前数据库连接信息不存在,请确认业务和数据库连接是否正确或联系管理员添加" exit 1; fi MYSQL_URL="$db_instance_value$SOURCE_DB_NAME" } #每天运行 执行dataX function BaseDataxMysql2Hive() { #清除重复同步数据分区&新增分区 hive -e "ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION(day='$DT_TIME');ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME ADD IF NOT EXISTS PARTITION (day='$DT_TIME')"; #执行同步 echo "开始执行同步" if ! python ${DATAX_HOME}/bin/datax.py -p"-DtargetDBName=$TARGET_DB_NAME -DtargetTableName=$TARGET_TABLE_NAME -DjdbcUrl=$MYSQL_URL -Dusername=$MYSQL_USERNAME -Dpassword=$MYSQL_PASSWD -DsourceTableName=$SOURCE_TABLE_NAME -DhdfsPath=$HDFS_PATH -DstartTime=${START_TIME} -DendTime=${END_TIME} -DisCompress=$ISCOMPRESS -DwriteMode=$WRITEMODE -DfileType=$FILETYPE -DfieldDelimiter='$FIELDDELIMITER' -DfileName=$TARGET_TABLE_NAME -Dpath=${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/pt_day=$DT_TIME" $DATAX_JSON_FILE;then echo "command failed" exit 1; fi echo "同步结束" #删除定义的失效日期数据 if(($IS_CLEAR_INVALID_DATA==1));then echo "清除失效$INVALID_DATE天数的历史数据" hive -e "ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION (pt_day<=${INVALID_DATE});" fi #同步分区元数据 #hive -e "ANALYZE TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME PARTITION (day=${DT_TIME}) COMPUTE STATISTICS;" #删除分区数据 } function parseArgs() { while getopts ":d:ab:s:m:f:t:n:u:p:" opt do case $opt in d) echo "参数d的值$OPTARG" DateProcess $OPTARG ;; a) IS_CLEAR_INVALID_DATA=0 echo "参数a的值$OPTARG" ;; b) echo "参数b的值$OPTARG" BUSINESS_NAME=$OPTARG ;; m) echo "参数m的值$OPTARG" SOURCE_DB_NAME=$OPTARG ;; s) echo "参数s的值$OPTARG" SOURCE_TABLE_NAME=$OPTARG ;; f) echo "参数f的值$OPTARG" DATAX_JSON_FILE=$OPTARG ;; n) echo "参数n的值$OPTARG" TARGET_DB_NAME=$OPTARG ;; t) echo "参数t的值$OPTARG" TARGET_TABLE_NAME=$OPTARG ;; u) echo "参数u的值$OPTARG" MYSQL_USERNAME=$OPTARG ;; p) echo "参数t的值$OPTARG" MYSQL_PASSWD=$OPTARG ;; ?) echo "未知参数" exit 1;; :) echo "没有输入任何选项 $OPTARG" ;; esac done } function judgeParams() { if [ ! -n "$DT_TIME" ] ;then echo "you have not input a etlDate! format {-d yyyy-mm-dd} " exit 1; fi if [ ! -n "$BUSINESS_NAME" ] ;then echo "you have not input a businessName! incloud(xxx,xxxx,x,xx) example {-b xxx}" exit 1; fi if [ ! -n "$SOURCE_DB_NAME" ] ;then echo "you have not input a sourceDB!" exit 1; fi if [ ! -n "$SOURCE_TABLE_NAME" ] ;then echo "you have not input a sourceTable example {-s user_info}!" exit 1; fi if [ ! -n "$DATAX_JSON_FILE" ] ;then echo "you have not input a dataxJson! example {-f ods_ywx1_user_info_di.json}" exit 1; fi if [ ! -n "$TARGET_TABLE_NAME" ] ;then echo "you have not input a targetTable! example {-t ods_ywx1_user_info_di}" exit 1; fi } function startsync() { #初始化数据库实例 initInstanceConf #初始化库和数据库实例映射关系 initDbAndInstanceMaps #解析参数 parseArgs "$@" #初始化数据链接 DataConnect #判断参数 judgeParams #同步数据 BaseDataxMysql2Hive } # -d: 处理时间 # -b:业务线 (ywx,ywx1,ywx1,...,ywxn) # -m:源数据库 # -a:增量数据不清除分区数据:默认清除 # -s:源数据表 # -n:目标数据库 # -t:目标数据表 # -f:datax同步json文件 # -p:密码 # -u:用户名 startsync "$@"

有了这个shell脚本,后续对于同步的一些同步完成功能的通知以及新功能都可以新增。同时又新形成了一个数据同步的规范性和开发的规范性。

4、调度平台调度脚本

有了上面的脚本,我们就可以只需要写好源表和目的表的名称。同时通过 -a 来区别增量还是全量同步进行处理。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
#源表 -s SOURCE_TABLE_NAME="user_info" #目标表 -t TARGET_TABLE_NAME="ods_main_user_info_df" #datax 文件 -f DATAX_FILE="${BASE_DIR_PATH}/ods_main_user_info_df.json" ETL_DATE=${ETL_DATE} BUSINESS_NAME=${BUSINESS_NAME} SOURCE_DB_NAME=${SOURCE_DB_NAME} #!/bin/bash source /etc/profile sh dataxsync.sh -d $ETL_DATE -b $BUSINESS_NAME -m $SOURCE_DB_NAME -s $SOURCE_TABLE_NAME -t $TARGET_TABLE_NAME -f $DATAX_FILE

我们可以发现这个脚本中包含了四个变量:
${BASE_DIR_PATH}
${ETL_DATE}
${BUSINESS_NAME}
${SOURCE_DB_NAME}

这几个变量主要是通过调度平台传入,
BASE_DIR_PATH:dataX脚本的统一地址,之所以弄这个目录主要是为了区分不同业务线。
ETL_DATE:每天同步的时间 : yyyy-mm-dd,同时我们可以再脚本上多增加几个时间,通过这个变量转换出来(yyyyMMdd, yyyyMMdd-1…)
BUSINESS_NAME:业务线的标识,我们也是可以用主题域区分。主要是用来识别数据库
SOURCE_DB_NAME:业务库的表名。
这一套的使用,我们搭配小海豚调度一起使用。定时运行,定时调度。

最后

以上就是怕黑菠萝最近收集整理的关于dataX同步mysql至hive的全部内容,更多相关dataX同步mysql至hive内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部