概述
首先先介绍一下背景,博主是hadoop大数据小白一枚,相信也有很多自学的大数据的人都跟我处境一样,在很多人已经在讨论大数据架构,实时分析,离线分析,优化等等问题的时候,我们还在闭门造车地玩着手里的那套hdfs+hbase+zookeeper+hive+sqoop+jdbc+mysql 的原始框架,然而还是到懂不懂的
hadoop集群的环境和查询处理都还是十分容易搭建,但是遇到一个十分棘手的问题,基于使用hive这个工具来操作离线数据,很麻烦的就是数据查询出结果以后的一个存储问题,虽然我们知道hive不支持insert 和update,但却又支持HQL ,真是又爱又恨的,除了查询结果的保存还有数据的导出,下面就是一个十分亲切的运行模式:
但是 ,如果都手动去操作实在太麻烦了,而且博主还是在Windows下面的虚拟机跑的集群,令人感到郁闷的是 hive都发展这么长时间了,对于相关的问题还是没有一个好的处理方式,网上的资料十分少,而且全都是什么hive导入导出方式、数据格式等等,
然后在遇到这两个老大难问题以后又去咨询大牛们,然后什么es+hbase、impala+hbase等等,我是有找一个工具更替hive的想法,而且越来越强烈,刚才这两个就是后期的方向,但目前还是先把自己熟悉的东西找到解决的办法了来,
目前有两个问题:
问题一:hive查询数据存储的问题,当然不是insert into / insert overweight 解决的问题,是如何把要经过多次查询的多个结果保存在以datetime(rowkey)为主键的表中,就是一行有多列,一个条件肯定是不能满足需求的
问题二: 就是hive导出 hive中的表过程十分复杂(其实也不复杂,就想抹去sqoop这个中间环节)
目前只解决了问题二,问题一待更新
注意: 1.记住在写udf这个类的时候 要导入hbase、hive、hadoop的包,不然很多错
2.在hive中添加jar包的时候 ,注意版本信息 和 路径
3. 方法的名字 注意,创建function的 导包注意
4. 本文是导入mysql ,其他数据库需要更改驱动
关于这个问题,网上提供了很多文章,但是可能会有不太明确的地方,我只在阐述一点:
hive>add jar /usr/lib/hive/lib/hive-contrib-0.9.0-cdh4.1.2.jar;
Added /usr/lib/hive/lib/hive-contrib-0.9.0-cdh4.1.2.jar to class path
Added resource: /usr/lib/hive/lib/hive-contrib-0.9.0-cdh4.1.2.jar
hive>add jar /usr/share/java/mysql-connector-java-5.1.17.jar;
Added /usr/share/java/mysql-connector-java-5.1.17.jar to class path
Added resource: /usr/share/java/mysql-connector-java-5.1.17.jar
hive>CREATE TEMPORARY FUNCTION dboutput AS 'org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput';
(需要特别指出,as后面的 class路径很容易出错,在创建的udf函数的时候,只需要写包名.class名即可)
注意:在执行udf函数的时候,务必确保之前步骤都完成,然后确定hive表和本地/目标表的字段一一对应正确拼写
hive>select dboutput('jdbc:mysql://localhost/result','root','123456','INSERT INTO dc(code,size) VALUES (?,?)',code,size) from accesslog limit 10;
注:result为mysql数据库名,dc为数据库result中的表名 dc(code,size)括号中的字段为mysql表dc字段,values(?,?)对应hive统计结果的值后面的code,size为hive表中的字段,accesslog表示hive中的表名称。
说明下:
1. 前两步是将需要到的jar包加载到当前的classpath中。
2. 第3步是创建一个临时的方法,就跟数据库函数的max()啥的都一样,创建完之后你就可以使用这个函数。
3. dboutput输入意思是:
1.jdbcurl 2.数据库用户名 3 密码 4 sql 5之后的就是刚才sql用到的预查询用到的参数。
4. dboutput输出有3个,0:成功 1 sql报错 2 数据库连接错误。 5. 鉴于一般每次重启hive的cli 都需要重新执行前3步,比较麻烦,有2个办法可以解决
(1)修改hive源码,FunctionRegistry.java 添加这个UDF(这个比较麻烦,我没有试验过)
(2)在启动hive的时候加载文件,使用hive -i xxx ,xxx是你的配置文件,你可以把前三步的命令都写进去,那他每次启动的时候就自动加载了。
通过以上步骤即可将hive统计结果直接导入到mysql数据库中。
绿色部分为其他博友贡献的,我在试验的时候发现总提示找不到org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput
后来经过琢磨才弄明白org.apache.Hadoop.hive.contrib.genericudf.example.GenericUDFDBOutput部分自己要去编写,编写后打成jar包用add jar添加进去就可以了。
我是把这个方法编写后达成了udf.jar的包 udf下直接写GenericUDFDBOutput,就是说没有那么多层次的包。
add jar /home/hadoop/hive-0.9.0/lib/hive-contrib-0.9.0.jar;
add jar /home/hadoop/hive-0.9.0/lib/mysql-connector-java-5.1.10.jar;
add jar /home/hadoop/hive-0.9.0/lib/hive_contrib.jar;
add jar /home/hadoop/hive-0.9.0/lib/hive-exec-0.9.0.jar;
add jar /home/hadoop/hive-0.9.0/lib/udf.jar;
CREATE TEMPORARY FUNCTION dboutput AS 'GenericUDFDBOutput';
select dboutput('jdbc:mysql://192.168.239.100/hive','hadoop','hadoop','INSERT INTO testroom(Name,CtfId) VALUES (?,?)',Name,CtfId) from hive_hbase limit 10;
咖啡色部分为我自己的代码结果运行成功。
其中GenericUDFDBOutput源码如下,是我借用别人的;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.IntWritable;
@Description(name = "dboutput",
value = "_FUNC_(jdbcstring,username,password,preparedstatement,[arguments])"
+ " - sends data to a jdbc driver",
extended = "argument 0 is the JDBC connection stringn"
+ "argument 1 is the user namen"
+ "argument 2 is the passwordn"
+ "argument 3 is an SQL query to be used in the PreparedStatementn"
+ "argument (4-n) The remaining arguments must be primitive and are "
+ "passed to the PreparedStatement objectn")
@UDFType(deterministic = false)
public class GenericUDFDBOutput extends GenericUDF {
private static final Log LOG = LogFactory
.getLog(GenericUDFDBOutput.class.getName());
private transient ObjectInspector[] argumentOI;
private transient Connection connection = null;
private String url;
private String user;
private String pass;
private final IntWritable result = new IntWritable(-1);
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentTypeException {
argumentOI = arguments;
// this should be connection url,username,password,query,column1[,columnn]*
for (int i = 0; i < 4; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of function should be ""
+ Constants.STRING_TYPE_NAME + "", but ""
+ arguments[i].getTypeName() + "" is found");
}
}
}
for (int i = 4; i < arguments.length; i++) {
if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative" + ", but ""
+ arguments[i].getTypeName() + "" is found");
}
}
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
url = ((StringObjectInspector) argumentOI[0])
.getPrimitive;
user = ((StringObjectInspector) argumentOI[1])
.getPrimitive;
pass = ((StringObjectInspector) argumentOI[2])
.getPrimitive;
try {
connection = DriverManager.getConnection(url, user, pass);
} catch (SQLException ex) {
LOG.error("Driver loading or connection issue", ex);
result.set(2);
}
if (connection != null) {
try {
PreparedStatement ps = connection
.prepareStatement(((StringObjectInspector) argumentOI[3])
.getPrimitive;
for (int i = 4; i < arguments.length; ++i) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) argumentOI[i]);
ps.setObject(i - 3, poi.getPrimitive;
}
ps.execute();
ps.close();
result.set(0);
} catch (SQLException e) {
LOG.error("Underlying SQL exception", e);
result.set(1);
} finally {
try {
connection.close();
} catch (Exception ex) {
LOG.error("Underlying SQL exception during close", ex);
}
}
}
return result;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("dboutput(");
if (children.length > 0) {
sb.append(children[0]);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children[i]);
}
}
sb.append(")");
return sb.toString();
}
}
最后,当实现了之前的所有操作,需要把udf函数加入hive启动服务里面,不然如果每次需要使用udf函数都需要重新添加jar文件,然后才能使用,下面是收集的一些不错的方法:
通过设置hive的配置文件hive-site.xml 加入
在配置文件中增加配置
hive.aux.jars.path
file:///jarpath/all_new1.jar,file:///jarpath/all_new2.jar
保存即可。
该方法比第一种方法方便很多。不需要每次启动Hive执行命令加入,只是配置稍微复杂一些。
3. 在${HIVE_HOME中创建文件夹auxlib ,然后将自定义jar文件放入该文件夹中。
个人推荐这种方法,方便快捷。
ref:http://blog.sina.com.cn/s/blog_13088c10a0102wxit.html
最后
以上就是动听蜻蜓为你收集整理的hive表直接入库本地mysql,用udf从将hive的查询结果直接写入mysql数据库中的全部内容,希望文章能够帮你解决hive表直接入库本地mysql,用udf从将hive的查询结果直接写入mysql数据库中所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复