我是靠谱客的博主 彪壮茉莉,最近开发中收集的这篇文章主要介绍Sqoop Mysql Hive同步,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

统一文件存储格式Parquet,使用Impala进行计算

全量同步:

sqoop import 
 --username 'root' 
 --password '000' 
 --connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" 
 --table a 
 --hcatalog-database default 
 --hcatalog-table a 
 --hcatalog-partition-keys dt 
 --hcatalog-partition-values '2022-02-08' 
 --fields-terminated-by '01' 
 --lines-terminated-by 'n' 
 --hive-drop-import-delims 
 --null-string '\N' 
 --null-non-string '\N' 
 --split-by id 
 -m 10
sqoop import 
--username root 
--password '000' 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8"  
--query "select id ,name, age, ads, birthday, hobday, gzads , source from student where ads='北京市丰台区南苑机场警备东路6号一区' and $CONDITIONS limit 100 " 
--hcatalog-database default 
--hcatalog-table student 
--hcatalog-partition-keys dt 
--hcatalog-partition-values '2022-02-08' 
--fields-terminated-by '001' 
--lines-terminated-by 'n' 
--hive-drop-import-delims 
--null-string '\N' 
--null-non-string '\N' 
--split-by id 
--m 3

增量同步:同时需要对历史修改的记录进行同步,且自动获取last-value最新时间,这里创建sqoop job 然后通过执行sqoop job 后可以自动保存增量数据里最新的last-value值就不用手动去指定了,但是sqoop 在自动保存最新的last-value值的时候要比原表中最后的last-value值要大一点,这个不会出现增量导入数据不一致的问题所以不用担心,并且由于hcatalog的lastmodified模式中merge-key不生效…也就导致了无法将Mysql里历史数据中修改过的记录和Hive中的历史数据进行合并,最后会导致Hive里数据重复,数据和MYSQL数据不一致的问题,所以在进行增量同步的时候使用传统方式,但是由于Hive中的表是Parquet格式,在同步的时候无法将MYSQL表里timestamp类型的数据转换成对应的类型,Sqoop在转换Parquet格式文件时会自动把Mysql里timestamp类型的数据转换成Int类型,这样就导致最后格式不对入库失败,所以需要通过map-column-java crt_date=String 来表示Sqoop在解析该字段的时候会把该字段自动转化成String来解析,通过map-column-hive crt_date=String指定Hive中日期字段的类型认为是String,这样才能解析成功,如果不加这2个参数,Mysql里如果有timestamp类型的数据最后会解析失败.

sqoop job 
--create ajob 
-- import 
--username 'root' 
--password '000' 
--table a 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" 
--map-column-java crt_date=String 
--map-column-hive crt_date=String 
--incremental lastmodified 
--check-column crt_date 
--last-value "2022-02-01 13:35:41" 
--merge-key id 
--target-dir /user/hive/warehouse/a/dt=2022-02-08 
--as-parquetfile 
--split-by id 
-m 3

sqoop job 
--create ajob 
-- import 
--username 'root' 
--password '000' 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" 
--table a 
--hive-database default 
--hive-table a 
--map-column-java crt_date=String 
--map-column-hive crt_date=String 
--fields-terminated-by '01' 
--incremental lastmodified 
--check-column crt_date 
--last-value "2022-02-07 13:35:41" 
--merge-key id 
--target-dir /user/hive/warehouse/a/dt=2022-02-08 
--as-parquetfile 
-m 1

sqoop import 
--username 'root' 
--password '000' 
--query "select id,name,crt_date from a where $CONDITIONS" 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8" 
--map-column-java crt_date=String 
--map-column-hive crt_date=String 
--incremental lastmodified 
--check-column crt_date 
--last-value "2022-02-09 17:42:19" 
--merge-key id 
--target-dir /user/hive/warehouse/a/dt=2022-02-08 
--as-parquetfile 
-m 1

这两种方式都可以,last-value随便指定一个就行了,跑完一次sqoop job以后 sqoop会自动保存last-value的值

sqoop job --exec ajob
sqoop job --delete ajob
sqoop job --show ajob
sqoop job --list a job 

最后基于Parquet格式导出Mysql

sqoop export 
--username root 
--password '000' 
--connect "jdbc:mysql://cm1:3306/demo?useUnicode=true&characterEncoding=utf8"  
--table student_hive 
--hcatalog-database default 
--hcatalog-table student 
--hcatalog-partition-keys dt 
--hcatalog-partition-values '2022-02-08' 
--num-mappers 3 

需要在impala里做的相关操作

alter table a add partition(dt='2022-02-08');
refresh a partition(dt='2022-02-08');
alter table a drop partition(dt='2022-02-08')

这里使用定时监听启动Sqoop启动指令,启动指令是由外部程序发送到MYSQL里的字符串,3分钟监听一次,如果查询到了该字符串就启动Sqoop,Sqoop运行完成成,将MYSQL里的启动指令字符串删除等待程序发送的下一次指令.

在这里插入图片描述

#!/bin/sh
hostname=localhost
port=3306
username=root
password=000

dbname=demo
tbname=sync_command

select_sql_tb1="select * from ${dbname}.${tbname} where syn_str='000001' and syn_table='tb1'"
tb1_result=$(/opt/mysql-5.7.25/bin/mysql -h${hostname} -P${port} -u${username} -p${password} 2>/dev/null -e "${select_sql_tb1}")
delete_sql_tb1="delete from ${dbname}.${tbname} where syn_str='000001' and syn_table='tb1'"
if [ ! -n "$tb1_result" ]; then
  echo "没有获取到tb1表同步指令" >> /opt/tb1.log
else
  echo "成功获取tb1表同步指令开始同步....." >> /opt/tb1.log
  echo $tb1_result
  /opt/mysql-5.7.25/bin/mysql -h${hostname} -P${port} -u${username} -p${password} 2>/dev/null -e "${delete_sql_tb1}"
  echo "已成功同步完成tb1表删除同步记录等待下次同步命令...." >>/opt/tb1.log 
fi

最后

以上就是彪壮茉莉为你收集整理的Sqoop Mysql Hive同步的全部内容,希望文章能够帮你解决Sqoop Mysql Hive同步所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部