概述
日志文件格式:
2018-10-31-15-46-26|PDAT20181031T330494|24496|TDZA20181031D344959|epicc
以“|”分隔分别表示为time,policyno,sumpremium,proposalno,qudao
**
FLume配置文件:
**
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command =tail -F /home/hadoop/bigData/test.log
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://10.133.12.104:9083
a1.sinks.k1.hive.database = default
a1.sinks.k1.hive.partition = %y%m%d
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.hive.table = kuaibao
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "|"
a1.sinks.k1.serializer.serdeSeparator = '|'
a1.sinks.k1.serializer.fieldnames =time,policyno,sumpremium,proposalno,qudao
a1.sinks.k1.useLocalTimeStamp = true
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000
a1.sources.r1.channels=c1
**
Hive配置文件
**
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://10.133.12.104:3306/hive?characterEncoding=UTF-8&createDatabaseIfNotExist=true&useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver Class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>1q2w3e4r__QWE</value>
<description>pasword to use against metastore database</description>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
**
Hive中建表
**
create table kuaibao(time string,policyno string,sumpremium string,proposalno string,qudao string)
partitioned by(day string)
clustered by (time) into 5 buckets
stored as orc
TBLPROPERTIES ('transactional'='true');
**
启动hivemetastore
**
hive --service metastore &
**
启动flume
**
./flume-ng agent --conf conf --conf-file /home/hadoop/DataCleaning/flume/apache-flume-1.6.0-bin/conf/FlumeKHive.conf --name a1 -Dflume.root.logger=INFO,console
**
常见问题
**
问题1:
org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri='thr
at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:98)
at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
解决办法:
把hive目录下的hcatalog/share/hcatalog中所有kar拷贝到 flume安装目录下的lib目录下
问题2:
Exception in thread "PollableSourceRunner-KafkaSource-source1" java.lang.OutOfMemoryError: Java heap space
解决办法:
vi flume-ng
JAVA_OPTS="-Xmx500m"
问题3:
java.lang.NullPointerException: Expected timestamp
解决办法:
在flume配置文件中加入 a1.sinks.k1.useLocalTimeStamp = true
k1是sink
最后
以上就是彪壮大叔为你收集整理的Flume exec source日志 sink到Hive表中的全部内容,希望文章能够帮你解决Flume exec source日志 sink到Hive表中所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复