概述
目录
一、概念
二、特征
三、常用命令示例
四、实战案例示例
1.全量导入(将数据从mysql导入到HDFS指定目录)
2.全量导入(将数据从mysql导入到已有的hive表)
3.全量导入(将数据从mysql导入到hive,hive表不存在,导入时自动创建hive表)
4.批量全库导入(将mysql全库数据批量导入到hive)
5.增量导入-append模式(将mysql数据增量导入hadoop)
6.增量导入-lastmodified模式(将mysql时间列大于等于阈值的数据增量导入HDFS)
7.全量导出(将hdfs全量导出到mysql表)
一、概念
Sqoop是一款开源的etl工具,主要用于在Hadoop(Hive)与传统数据库(mysql、postgresql...)间进行数据的传递,可以将关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导出到关系型数据库中。
二、特征
Sqoop是专为大数据批量传输而设计,能够分割数据集并创建map task任务来处理每个区块。
三、常用命令示例
sqoop-list-databases 列出服务器上存在的数据库清单
sqoop-list-tables 列出服务器上存在的数据表清单
sqoop-job 用来生成一个sqoop的任务,生成后,该任务并不执行,除非使用命令执行该任务。
sqoop import 从RDBMS导入到HDFS
sqoop export 从HDFS导出到RDBMS
--connect jdbc:mysql://ip:port/db_name 连接mysql数据库
--username username 数据库用户名
--password password 数据库密码
--table table_name 源头数据表
--target-dir /user/hive/warehouse/... 指定导入的目录,若不指定就会导入默认的HDFS存储路径
--delete-target-dir HDFS地址如果存在的话删除,一般都是用在全量导入,增量导入的时候加该参数会报错
--num-mappers 1 maptask数量
-m 3 (使用3个mapper任务,即进程,并发导入。一般RDBMS的导出速度控制在60~80MB/s,每个 map 任务的处理速度5~10MB/s 估算所需并发map数。)
--split-by id (根据id字段来切分工作单元实现哈希分片,从而将不同分片的数据分发到不同 map 任务上去跑,避免数据倾斜。)
--autoreset-to-one-mapper 如果表没有主键,导入时使用一个mapper执行
--input-null-string '\N' 空值转换
--input-null-non-string '\N' 非空字符串替换
--fields-terminated-by "t" 字符串分割
--query 'select * from test_table where id>10 and $CONDITIONS' ($CONDITIONS必须要加上就相当于一个配置参数,sql语句用单引号,用了SQL查询就不能加参数--table )
-hive-home <dir> 重写$HIVE_HOME
-hive-import 插入数据到hive当中,使用hive的默认分隔符
-hive-overwrite 重写插入
-create-hive-table 建表,如果表已经存在,该操作会报错
-hive-table <table-name> 设置到hive当中的表名
-hive-drop-import-delims 导入到hive时删除 n, r, and 001
-hive-delims-replacement 导入到hive时用自定义的字符替换掉 n, r, and 001
-hive-partition-key hive分区的key
-hive-partition-value <v> hive分区的值
-map-column-hive <map> 类型匹配,sql类型对应到hive类型
--direct 是为了利用某些数据库本身提供的快速导入导出数据的工具,比如mysql的mysqldump
--direct-split-size 在使用上面direct直接导入的基础上,对导入的流按字节数分块,特别是使用直连模式从PostgreSQL导入数据的时候,可以将一个到达设定大小的文件分为几个独立的文件。
--columns <列名> 指定列
--z, -–compress 打开压缩功能
--compression-codec < c > 使用Hadoop的压缩,默认为gzip压缩
--fetch-size < n > 从数据库一次性读入的记录数
--append 将数据追加到hdfs中已经存在的dataset中。使用该参数,sqoop将把数据先导入到一个临时目录中,然后重新给文件命名到一个正式的目录中,以避免和该目录中已存在的文件重名。
--as-avrodatafile 导入数据格式为avro
--as-sequencefile 导入数据格式为sqeuqncefile
--as-textfile 导入数据格式为textfile
--as-parquetfile 导入数据格式为parquet
详细内容可以参考Sqoop用户手册(英文版):
http://archive.cloudera.com/cdh/3/sqoop/SqoopUserGuide.html
四、实战案例示例
1.全量导入(将数据从mysql导入到HDFS指定目录)
# 全量导入(将数据从mysql导入到HDFS指定目录)
sqoop import --connect jdbc:mysql://ip:prot/db
--username username --password password
--query 'select * from mysql_table_name where $CONDITIONS'
--target-dir /user/hive/warehouse/...
--delete-target-dir
--fields-terminated-by 't'
--hive-drop-import-delims
--null-string '\N'
--null-non-string '\N'
--split-by id
-m 1
2.全量导入(将数据从mysql导入到已有的hive表)
# 全量导入(将数据从mysql导入到已有的hive表)
sqoop import --connect jdbc:mysql://ip:prot/db
--username username --password password
--table mysql_table_name
--hive-import
--hive-database hive_db_name
--hive-table hive_table_name
--fields-terminated-by 't'
--hive-overwrite
--null-string '\N'
--null-non-string '\N'
--split-by id
-m 1
3.全量导入(将数据从mysql导入到hive,hive表不存在,导入时自动创建hive表)
# 全量导入(将数据从mysql导入到hive,hive表不存在,导入时自动创建hive表)
sqoop import --connect jdbc:mysql://ip:prot/db
--username username --password password
--table mysql_table_name
--hive-import
--hive-database hive_db_name
--create-hive-table
--hive-table hive_table_name
--fields-terminated-by 't'
--null-string '\N'
--null-non-string '\N'
--split-by id
-m 1
#--hive-table hive_table_name
#该参数不添加时默认hive建表表名与mysql表名一致
4.批量全库导入(将mysql全库数据批量导入到hive)
# 全库导入(将mysql全库数据批量导入到hive)
sqoop import-all-tables
--connect jdbc:mysql://ip:prot/db
--username username --password password
--hive-database hive_db_name
--create-hive-table
--hive-import
--hive-overwrite
--fields-terminated-by 't'
--exclude-tables 'drop_table'
--autoreset-to-one-mapper
--as-textfile
#--exclude-tables 'drop_table'
# (此参数可以 exclude掉不需要import的表(多个表逗号分隔))
5.增量导入-append模式(将mysql数据增量导入hadoop)
#增量导入-append模式(将mysql数据增量导入hive表)
sqoop import jdbc:mysql://ip:prot/db
--username username --password password
--table mysql_table_name
--hive-import
--hive-database hive_db_name
--hive-table hive_table_name
--hive-drop-import-delims
--fields-terminated-by 't'
--null-string '\N'
--null-non-string '\N'
--incremental append
--check-column column_name
--last-value 10
--split-by column_name
-m 1
#增量导入-append模式(将mysql数据增量导入hdfs)
sqoop import jdbc:mysql://ip:prot/db
--username username --password password
--table mysql_table_name
--target-dir /user/hive/warehouse/hive_db_name.db/hive_table_name
--incremental append
--check-column column_name
--last-value 10
--split-by column_name
-m 1
#--incremental append 基于递增列的增量导入(将递增列大于阈值的所有数据导入hadoop)
#--check-column column_name 递增列
#--last-value 数字 阈值
6.增量导入-lastmodified模式(将mysql时间列大于等于阈值的数据增量导入HDFS)
#增量导入-lastmodified模式(将mysql时间列大于等于阈值的数据增量导入HDFS)
#lastmodified模式不支持直接导入Hive表,但是可以使用导入HDFS的方法,--target-dir设置成Hive table在HDFS中的关联位置即可)
sqoop import --connect jdbc:mysql://ip:prot/db
--username username --password password
--table mysql_table_name
--target-dir /user/hive/warehouse/hive_db_name.db/hive_table_name
--null-string '\N'
--null-non-string '\N'
--split-by id
-m 1
--fields-terminated-by 't'
--hive-drop-import-delims
--incremental lastmodified
--check-column time_column_name
--last-value '2022-09-09 10:00:01'
#--incremental lastmodified 基于时间列的增量导入(将时间列大于阈值的所有数据导入hdfs)
#--check-column time_column_name 时间列
#--last-value 时间 阈值
7.全量导出(将hdfs全量导出到mysql表)
#全量导出(将hdfs全量导出到mysql表)
sqoop export --jdbc:mysql://ip:prot/db
--username username --password password
--table mysql_table_name
--columns column1,column2,column3
--export-dir /user/hive/warehouse//hive_db_name.db/hive_table_name
--input-fields-terminated-by '