概述
1、前言
我们的业务数据基本都是在数据库中,如果需要离线同步到hdfs我们就需要使用dataX工具。使用dataX只需要学好json脚本,配置好数据源和路径就可以了。以下是我的一个mysql同步到HIve,以上的变量都可以通过传参统一一个脚本处理。
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "${username}",
"password": "${password}",
"connection": [
{
"jdbcUrl": [
"${jdbcUrl}"
],
"querySql": [
"select id,create_time,update_time from ${sourceTableName} where update_time<'${endTime}' "
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "string"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
}
],
"isCompress": "${isCompress}",
"defaultFS": "${hdfsPath}",
"fieldDelimiter": "${fieldDelimiter}",
"fileName": "${fileName}",
"fileType": "${fileType}",
"path": "${path}",
"writeMode": "${writeMode}"
}
}
}
]
}
}
2、参数调用和传参
很多人在问如何使用脚本变量如何传参。
首先我们要会使用dataX如何调用这个脚本。
python ${DATAX_HOME}/bin/datax.py -p"-DtargetDBName=$TARGET_DB_NAME -DtargetTableName=$TARGET_TABLE_NAME -DjdbcUrl=$MYSQL_URL -Dusername=$MYSQL_USERNAME -Dpassword=$MYSQL_PASSWD -DsourceTableName=$SOURCE_TABLE_NAME -DhdfsPath=$HDFS_PATH -DstartTime=${START_TIME} -DendTime=${END_TIME} -DisCompress=$ISCOMPRESS -DwriteMode=$WRITEMODE -DfileType=$FILETYPE -DfieldDelimiter='$FIELDDELIMITER' -DfileName=$TARGET_TABLE_NAME -Dpath=${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/day=$DT_TIME" $DATAX_JSON_FILE;
在这个命令中你会发现将所有的变量都通过shell命令传递进去了。后续的这些变量传递我在更新。之所以这么多变量其主要是为了方便后续的脚本更新和调度运行。
对于开发人员只需要关心主要逻辑就行了。
有了这个基础脚本,我们就可以将HIVE上的一些功能一起合并到shell脚本中:
- 增量同步,保留全部数据。
- 全量同步,全量同步只保留固定周期的历史全量。
- 刷新元数据。
- 通知更新成功。
- 多个mysql业务库的匹配。
- 生产业务库密码的保护。
3、封装shell调用脚本
基于上面的考虑。封装dataX的调用脚本。
#!/bin/bash
source /etc/profile
DATAX_HOME="/home/data/datax"
SHELL_PATH="/home/data/dw_datax"
SCRIPT_PATH=${SHELL_PATH}/job
DATAX_LOG=${SHELL_PATH}/logs/datax.log.`date "+%Y-%m-%d"`
HDFS_PATH="hdfs://hdfs-cluster"
#START_TIME=$(date -d "-1 day" +%Y-%m-%d)
#END_TIME=$(date "+%Y-%m-%d")
#DT_TIME=$(date -d "-1 day" +%Y%m%d)
START_TIME=""
END_TIME=""
DT_TIME=""
#失效日期
INVALID_DATE=""
#失效天数
INVALID_DAYS=180
#是否清除失效数据:默认清除
IS_CLEAR_INVALID_DATA=1
#参数
ISCOMPRESS="false"
WRITEMODE="nonConflict"
FIELDDELIMITER="|"
FILETYPE="orc"
PATH_HIVE="/user/hive/warehouse/"
MYSQL_URL=""
#数据库用户名
MYSQL_USERNAME="admin"
#数据库密码
MYSQL_PASSWD="123456"
#默认同步目标库名
TARGET_DB_NAME="ods"
#同步源库名
SOURCE_DB_NAME=""
#同步源表名
SOURCE_TABLE_NAME=""
#业务名称
BUSINESS_NAME=""
#datax json文件
DATAX_JSON_FILE=/temp
# 数据库实例信息
declare -A db_instance_conf
# 数据库用户名
declare -A db_instance_user_conf
# 数据库密码
declare -A db_instance_pwd_conf
# 数据库实例与库映射关系
declare -A db_instance_maps
# 初始化数据库实例配置
function initInstanceConf()
{
# 主业务线 ywx1
db_instance_conf["db_main_data"]="jdbc:mysql://192.168.1.1:3306/"
db_instance_user_conf["db_main_data"]="admin"
db_instance_pwd_conf["db_main_data"]="123456"
# 业务线2 ywx2
db_instance_conf["db_data"]="jdbc:mysql://192.168.1.2:3306/"
db_instance_user_conf["db_data"]="admin"
db_instance_pwd_conf["db_data"]="123456"
...
}
# 初始化库和数据库实例映射关系
function initDbAndInstanceMaps()
{
#主业务线
db_instance_maps["ywx1_db_main"]="db_main_data"
#业务线2
db_instance_maps["ywx2_db_data"]="db_data"
#业务线3
db_instance_maps["ywx3_db_insurance"]="db_ywx3"
...
...
db_instance_maps["dss_db_dss"]="db_dss"
}
#时间处理 传入参数 yyyy-mm-dd
function DateProcess()
{
echo "日期时间为"$1
if echo $1 | grep -Eq "[0-9]{4}-[0-9]{2}-[0-9]{2}" && date -d $1 +%Y%m%d > /dev/null 2>&1
then :
START_TIME=$(date -d $1 "+%Y-%m-%d")
END_TIME=$(date -d "$1 +1 day" +%Y-%m-%d)
DT_TIME=$(date -d $1 +"%Y%m%d")
INVALID_DATE=$(date -d "$1 -$INVALID_DAYS day" +%Y%m%d)
echo 时间正确: $START_TIME / $END_TIME / $DT_TIME / $INVALID_DATE;
else
echo "输入的日期格式不正确,应为yyyy-mm-dd";
exit 1;
fi;
}
function DataConnect()
{
db_business_key="$BUSINESS_NAME""_""$SOURCE_DB_NAME"
db_instance_key=${db_instance_maps["$db_business_key"]}
echo $db_business_key $db_instance_key
if [ ! -n "$db_instance_key" ]; then
echo "当前数据库连接信息不存在,请确认业务和数据库连接是否正确或联系管理员添加"
exit 1;
fi
db_instance_value=${db_instance_conf["$db_instance_key"]}
MYSQL_USERNAME=${db_instance_user_conf["$db_instance_key"]}
MYSQL_PASSWD=${db_instance_pwd_conf["$db_instance_key"]}
echo $db_instance_value
if [ ! -n "$db_instance_value" ]; then
echo "当前数据库连接信息不存在,请确认业务和数据库连接是否正确或联系管理员添加"
exit 1;
fi
MYSQL_URL="$db_instance_value$SOURCE_DB_NAME"
}
#每天运行 执行dataX
function BaseDataxMysql2Hive()
{
#清除重复同步数据分区&新增分区
hive -e "ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION(day='$DT_TIME');ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME ADD IF NOT EXISTS PARTITION (day='$DT_TIME')";
#执行同步
echo "开始执行同步"
if ! python ${DATAX_HOME}/bin/datax.py -p"-DtargetDBName=$TARGET_DB_NAME -DtargetTableName=$TARGET_TABLE_NAME -DjdbcUrl=$MYSQL_URL -Dusername=$MYSQL_USERNAME -Dpassword=$MYSQL_PASSWD -DsourceTableName=$SOURCE_TABLE_NAME -DhdfsPath=$HDFS_PATH -DstartTime=${START_TIME} -DendTime=${END_TIME} -DisCompress=$ISCOMPRESS -DwriteMode=$WRITEMODE -DfileType=$FILETYPE -DfieldDelimiter='$FIELDDELIMITER' -DfileName=$TARGET_TABLE_NAME -Dpath=${PATH_HIVE}$TARGET_DB_NAME.db/$TARGET_TABLE_NAME/pt_day=$DT_TIME" $DATAX_JSON_FILE;then
echo "command failed"
exit 1;
fi
echo "同步结束"
#删除定义的失效日期数据
if(($IS_CLEAR_INVALID_DATA==1));then
echo "清除失效$INVALID_DATE天数的历史数据"
hive -e "ALTER TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME DROP IF EXISTS PARTITION (pt_day<=${INVALID_DATE});"
fi
#同步分区元数据
#hive -e "ANALYZE TABLE $TARGET_DB_NAME.$TARGET_TABLE_NAME PARTITION (day=${DT_TIME}) COMPUTE STATISTICS;"
#删除分区数据
}
function parseArgs()
{
while getopts ":d:ab:s:m:f:t:n:u:p:" opt
do
case $opt in
d)
echo "参数d的值$OPTARG"
DateProcess $OPTARG
;;
a)
IS_CLEAR_INVALID_DATA=0
echo "参数a的值$OPTARG"
;;
b)
echo "参数b的值$OPTARG"
BUSINESS_NAME=$OPTARG
;;
m)
echo "参数m的值$OPTARG"
SOURCE_DB_NAME=$OPTARG
;;
s)
echo "参数s的值$OPTARG"
SOURCE_TABLE_NAME=$OPTARG
;;
f)
echo "参数f的值$OPTARG"
DATAX_JSON_FILE=$OPTARG
;;
n)
echo "参数n的值$OPTARG"
TARGET_DB_NAME=$OPTARG
;;
t)
echo "参数t的值$OPTARG"
TARGET_TABLE_NAME=$OPTARG
;;
u)
echo "参数u的值$OPTARG"
MYSQL_USERNAME=$OPTARG
;;
p)
echo "参数t的值$OPTARG"
MYSQL_PASSWD=$OPTARG
;;
?)
echo "未知参数"
exit 1;;
:)
echo "没有输入任何选项 $OPTARG"
;;
esac done
}
function judgeParams()
{
if [ ! -n "$DT_TIME" ] ;then
echo "you have not input a etlDate! format {-d yyyy-mm-dd} "
exit 1;
fi
if [ ! -n "$BUSINESS_NAME" ] ;then
echo "you have not input a businessName! incloud(xxx,xxxx,x,xx) example {-b xxx}"
exit 1;
fi
if [ ! -n "$SOURCE_DB_NAME" ] ;then
echo "you have not input a sourceDB!"
exit 1;
fi
if [ ! -n "$SOURCE_TABLE_NAME" ] ;then
echo "you have not input a sourceTable example {-s user_info}!"
exit 1;
fi
if [ ! -n "$DATAX_JSON_FILE" ] ;then
echo "you have not input a dataxJson! example {-f ods_ywx1_user_info_di.json}"
exit 1;
fi
if [ ! -n "$TARGET_TABLE_NAME" ] ;then
echo "you have not input a targetTable! example {-t ods_ywx1_user_info_di}"
exit 1;
fi
}
function startsync()
{
#初始化数据库实例
initInstanceConf
#初始化库和数据库实例映射关系
initDbAndInstanceMaps
#解析参数
parseArgs "$@"
#初始化数据链接
DataConnect
#判断参数
judgeParams
#同步数据
BaseDataxMysql2Hive
}
# -d: 处理时间
# -b:业务线 (ywx,ywx1,ywx1,...,ywxn)
# -m:源数据库
# -a:增量数据不清除分区数据:默认清除
# -s:源数据表
# -n:目标数据库
# -t:目标数据表
# -f:datax同步json文件
# -p:密码
# -u:用户名
startsync "$@"
有了这个shell脚本,后续对于同步的一些同步完成功能的通知以及新功能都可以新增。同时又新形成了一个数据同步的规范性和开发的规范性。
4、调度平台调度脚本
有了上面的脚本,我们就可以只需要写好源表和目的表的名称。同时通过 -a 来区别增量还是全量同步进行处理。
#源表 -s
SOURCE_TABLE_NAME="user_info"
#目标表 -t
TARGET_TABLE_NAME="ods_main_user_info_df"
#datax 文件 -f
DATAX_FILE="${BASE_DIR_PATH}/ods_main_user_info_df.json"
ETL_DATE=${ETL_DATE}
BUSINESS_NAME=${BUSINESS_NAME}
SOURCE_DB_NAME=${SOURCE_DB_NAME}
#!/bin/bash
source /etc/profile
sh dataxsync.sh -d $ETL_DATE -b $BUSINESS_NAME -m $SOURCE_DB_NAME -s $SOURCE_TABLE_NAME -t $TARGET_TABLE_NAME -f $DATAX_FILE
我们可以发现这个脚本中包含了四个变量:
${BASE_DIR_PATH}
${ETL_DATE}
${BUSINESS_NAME}
${SOURCE_DB_NAME}
这几个变量主要是通过调度平台传入,
BASE_DIR_PATH:dataX脚本的统一地址,之所以弄这个目录主要是为了区分不同业务线。
ETL_DATE:每天同步的时间 : yyyy-mm-dd,同时我们可以再脚本上多增加几个时间,通过这个变量转换出来(yyyyMMdd, yyyyMMdd-1…)
BUSINESS_NAME:业务线的标识,我们也是可以用主题域区分。主要是用来识别数据库
SOURCE_DB_NAME:业务库的表名。
这一套的使用,我们搭配小海豚调度一起使用。定时运行,定时调度。
最后
以上就是怕黑菠萝为你收集整理的dataX同步mysql至hive的全部内容,希望文章能够帮你解决dataX同步mysql至hive所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复