概述
在数据处理的生态群里sqoop组件,占据了举足轻重地位。特别是将Hadoop和关系型数据库中的数据相互转移的sqoop是不二之选的工具。Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个Apache项目。
由于目前Sqoop1.4.6-CDH版本,把关系数据库mysql数据迁移到hive中,是不支持多级分区的。于是开启了两天魔改源码的节奏 - -
源码地址:https://github.com/apache/sqoopgithub.com
sqoop项目活跃度从客观上看还是比较活跃的。毕竟属于Apache开源顶级项目。
OK开启我的源码之路:
先整到本地idea,默认是maven
找到生成hive核心类TableDefWriter.java
sqoop传递参数如下所示都是存储在SqoopOptions中
分区信息如下:
生成创建hive表分区字符串拼接如下:
做了如下改造:
生成创建hive表分区字符串拼接如下:
分区信息如下:
数据库导入工具类——ImportTool
org.apache.sqoop.tool.ImportTool是sqoop实现数据库导入功能的工具类。
ImportTool最核心的两个属性
protected ConnManager manager;
private CodeGenTool codeGenerator;
ImportTool的run方法在run方法中执行init(options)时,会根据命令行参数,利用org.apache.sqoop.ConnFactory生成对应的ConnManager对象。
在importTable方法中,codeGenerator会将表结构对应的java类写入jar文件。这个jar文件和其他选项会构造出一个ImportJobContext对象。
如果是增量模式,initIncrementalConstraints(options, context)会把IncrementalTestColumn的最大值保存在options.incrementalLastValue属性中,作为下一次增量的起点。
manager利用ImportJobContext对象中的信息完成表数据的导入。
manager完成importTable或importQuery后,程序再根据命令行参数选择将新数据追加到老数据、将新老数据合并或者将新数据导入hive。如果是增量任务,整个ImportTool对象会被保存到metastore。
连接类——ConnManager
org.apache.sqoop.manager.ConnManager是SqoopTool导入导出数据的核心类。它有3个子类:DummyManager——主要用于单元测试
MainFrameManager——连接主机,目前只支持通过FTP协议
SqlManager——连接各种支持JDBC连接的数据库
SqlManager
SqlManager查询数据库主要有两种方式:本地模式和分布式模式
本地模式即直接在当前JVM中查询,比如查询表的结构。
分布式模式下进行数据导入导出的查询,如表的导入导出。importTable()方法是一个典型的例子。
根据命令行参数的选项,数据将被导入HBase、Accumulo或者HDFS。
在DataDrivenImportJob的runImport方法中,可以看到MapReduce Job的构造和提交过程。
Job的InputFormat的默认类型是DataDrivenDBInputFormat。OutputFormat默认为null,使用hadoop默认的FileOutputFormat。MapperClass根据输出文件格式的不同而不同。可能的文件格式有纯文本、avro、parquet等。
HBaseImportJob、AccumuloImportJob是DataDrivenImportJob的子类。为了将数据导入HBase 和Accumulo,它们实现了不同的MapperClass和OutputFormat。
SQOOP - HIVE
HiveConfig:hive配置实例类
HiveImport:连接Hive工具类
HiveTypes:Hive类型映射类
TableDefWriter:hive定义语法,生成语法核心类
HiveConfig:
获取HiveConfig配置API:getHiveConf。传入Configuration实例,利用Java反射机制反创建实例HiveConfClass。其中通过构造器newInstance一个Configuration实例。
设置HiveConfig配置API:addHiveConfigs。传入Configuration作为hiveConf实例+Configuration作为返回设置后实例conf。通过Map遍历hiveConf,通过conf.set重置conf配置。并返回Configuration的conf。
HiveImport:
导入表进hive的工具类,管理自身连接hive,同时协调使用此类的其它package的类。
构造器,入参SqoopOptions(Sqoop收集命令参数),ConnManager(连接器),Configuration(配置hive conf),generateOnly(?没太搞懂这个变量含义以及作用是个boolean类型)
importTable,核心API:入参inputTableName(导入hdfs文件名称);outputTableName(导入hive的表名称);createOnly(是否只创建表)
sqoop导入hive,实质是导入hdfs。因为hive的底层存储是hdfs,sqoop也是分了两步做了导入hive:导入hdfse(inputTableName)
load data进hive(outputTableName)先判断如果outputTableName == null,则inputTableName=outputTableName
是否debug模式,如果debug模式则,重置环境变量env
生成要运行的HQL语句,并重置hive连接,因为它可能已超时
isGenerateOnly,removeTempLogs在load data into hive时候把中间log日志文件删除
codec确定压缩方式,如果是lzop怎使用java反射机制new一个Tool来进行压缩处理
生成脚本文件,finally中删除处理后的脚本文件
HiveTypes:映射转换
isHiveTypeImprovised:如果SQL类型无法精确匹配转换为hive中type,我们必须把它转换成更通用类型
TableDefWriter:创建新的TableDefWriter以生成配置单元CREATE TABLE语句。
TableDefWriter分为两步走:
第一步:根据sqoop传递的参数,拿到指定schema和其它相关信息生成创建表的语句。
第二步:创建好指定表之后,开始把数据load进hive。
第一步API:getCreateTableStmt,生成创建hive表的语句,这个其实就是重点要修改的源码部分。判断是否外部表isHiveExternalTableSet
创建表schema信息映射,externalColTypes != null,则是在sqoop中指定了type类型
获取列名称,通过getColumnNames
StringBuilder sb = new StringBuilder() 用于生成hive语句的承载的变量
options中获取是否外部表,数据库,以及输出hdfs路径
检查结果集中是否存在所有显式映射列,否则抛出异常找不到column
生成分区信息,first是一个判断是否最后一个,若最后一个不调用 sb.append(",")。并同时拼接分区信息
hive创建语句小尾巴收个尾,例如“ROW FORMAT DELIMITED FIELDS TERMINATED BY”;“ LINES TERMINATED BY ”;“STORED AS INPUTFORMAT”等之类信息
最终生成一个完成hive创建语句
第二步API:getLoadDataStmt,LOAD DATA statement to import the data in HDFS into hive。getFinalPath获取,FileSystemUtil.makeQualified(new Path(tablePath), configuration)获取路径存储
初始化sb.append("LOAD DATA INPATH '");
doOverwriteHiveTable判断是否覆盖。若覆盖则sb.append(" OVERWRITE");
sb.append(" INTO TABLE `"),并获取database。拼接成database.table
增加分区信息。 sb.append(" PARTITION (");(此处需要修改源码地方)
最后
以上就是聪慧水杯为你收集整理的sqoop 抽取mysql 源码_SQOOP 源码浅析的全部内容,希望文章能够帮你解决sqoop 抽取mysql 源码_SQOOP 源码浅析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复