我是靠谱客的博主 彪壮大叔,最近开发中收集的这篇文章主要介绍Flume exec source日志 sink到Hive表中,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

日志文件格式:
2018-10-31-15-46-26|PDAT20181031T330494|24496|TDZA20181031D344959|epicc

以“|”分隔分别表示为time,policyno,sumpremium,proposalno,qudao
**

FLume配置文件:

**

a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command =tail -F /home/hadoop/bigData/test.log
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://10.133.12.104:9083
a1.sinks.k1.hive.database = default
a1.sinks.k1.hive.partition = %y%m%d
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.hive.table = kuaibao
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "|"
a1.sinks.k1.serializer.serdeSeparator = '|'
a1.sinks.k1.serializer.fieldnames =time,policyno,sumpremium,proposalno,qudao
a1.sinks.k1.useLocalTimeStamp = true
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
a1.sources.r1.channels=c1

**

Hive配置文件

**

<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.133.12.104:3306/hive?characterEncoding=UTF-8&amp;createDatabaseIfNotExist=true&amp;useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver Class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>1q2w3e4r__QWE</value>
<description>pasword to use against metastore database</description>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>

**

Hive中建表

**

create table kuaibao(time string,policyno string,sumpremium string,proposalno string,qudao string)
partitioned by(day string)
clustered by (time) into 5 buckets
stored as orc
TBLPROPERTIES ('transactional'='true');

**

启动hivemetastore

**

hive --service metastore &

**

启动flume

**

./flume-ng agent --conf conf --conf-file /home/hadoop/DataCleaning/flume/apache-flume-1.6.0-bin/conf/FlumeKHive.conf --name a1 -Dflume.root.logger=INFO,console

**

常见问题

**
问题1:

org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thr
at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:98)
at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)

解决办法:
把hive目录下的hcatalog/share/hcatalog中所有kar拷贝到 flume安装目录下的lib目录下

问题2:

Exception in thread "PollableSourceRunner-KafkaSource-source1" java.lang.OutOfMemoryError: Java heap space

解决办法:
vi flume-ng
JAVA_OPTS="-Xmx500m"

问题3:

java.lang.NullPointerException: Expected timestamp

解决办法:
在flume配置文件中加入 a1.sinks.k1.useLocalTimeStamp = true
k1是sink

最后

以上就是彪壮大叔为你收集整理的Flume exec source日志 sink到Hive表中的全部内容,希望文章能够帮你解决Flume exec source日志 sink到Hive表中所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部