概述
1.下面这个问题挺坑的
(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - kafka.utils.Logging$class.error(Logging.scala:97)] Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test
(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - kafka.utils.Logging$class.error(Logging.scala:103)] Producer connection to DEV33:9092 unsuccessful
java.nio.channels.UnresolvedAddressException
(SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - kafka.utils.Logging$class.warn(Logging.scala:89)] Failed to send producer request with correlation id 17 to broker 2 with data for partitions [test,0]
查了又查,试了又试,终于知道哪出问题了。
配置文件如下
a1.sources=r1
a1.sinks=k1
a1.channels=c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /opt/xfs/logs/tomcat/xfs-cs/logs/xfs_cs_41
# Describe the sink
#a1.sinks.k2.type=logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = 192.168.0.72:9092,192.168.0.73:9092,192.168.0.83:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
#a1.sinks.k1.custom.partition.key=kafkaPartition
#a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#a1.sinks.k1.max.message.size=1000000
#a1.sinks.k1.custom.encoding=UTF-8
# Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
就是这行有问题
a1.sinks.k1.brokerList = 192.168.0.72:9092,192.168.0.73:9092,192.168.0.83:9092
配置下主机名与IP映射
[root@DEMO41 conf]# vim /etc/hosts
192.168.0.72 dev22
192.168.0.73 dev23
192.168.0.83 dev33
然后把配置文件的那行改成了主机名就没问题了
a1.sinks.k1.brokerList = dev22:9092,dev23:9092,dev33:9092
2.brokerList must contain at least one Kafka broker
查了一下发现可能是因为broker list配置项名称应该是 agent.sinks.kafkaSink.brokerList 而不是 agent.sinks.kafkaSink.metadata.broker.list。不过我发现是我把sink名称敲错了,应该是agent.sinks.k2.brokerList = dev22:9092,dev23:9092,dev33:9092,而我写成了k1。
3.卡在这不动了
2016-08-12 16:15:05,869 (conf-file-poller-0) [WARN - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:133)] No configuration found for this host:agent
2016-08-12 16:15:05,881 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{} sinkRunners:{} channels:{} }
经检查发现是我agent的名字写错了!
4.这是什么鬼
[WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:590)] Could not configure source r1 due to: Failed to configure component!
org.apache.flume.conf.ConfigurationException: Failed to configure component!
at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:111)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:567)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:346)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:213)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:127)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:109)
at org.apache.flume.node.PropertiesFileConfigurationProvider.getFlumeConfiguration(PropertiesFileConfigurationProvider.java:189)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:89)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.conf.ConfigurationException: No channels set for r1
at org.apache.flume.conf.source.SourceConfiguration.configure(SourceConfiguration.java:69)
... 15 more
加上下面的问题解决
agent.sources.r1.channels = c1 c2
下面贴一下不报错的配置文件
agent.sources = r1
agent.channels = c1 c2
agent.sinks = k1 k2
############################define source begin
##define source-r1-exec
agent.sources.r1.channels = c1 c2
agent.sources.r1.type = exec
agent.sources.r1.command = tail -f /opt/xfs/logs/tomcat/xfs-cs/logs/xfs_cs_1
############################define sink begin
##define sink-k1-hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://192.168.0.71:9000/flumetest/%y-%m-%d/%H
agent.sinks.k1.hdfs.filePrefix = cs-%H
agent.sinks.k1.hdfs.round = true
agent.sinks.k1.hdfs.roundValue = 1
agent.sinks.k1.hdfs.roundUnit = hour
agent.sinks.k1.hdfs.useLocalTimeStamp = true
agent.sinks.k1.hdfs.minBlockReplicas = 1
agent.sinks.k1.hdfs.fileType = DataStream
#agent.sinks.k1.hdfs.writeFormat=Text
agent.sinks.k1.hdfs.rollInterval = 3600
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.hdfs.idleTimeout = 0
##define sink-k2-kafka
agent.sinks.k2.channel = c2
agent.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k2.topic = test
agent.sinks.k2.brokerList = dev22:9092,dev23:9092,dev33:9092
agent.sinks.k2.requiredAcks = 1
agent.sinks.k2.batchSize = 20
#agent.sinks.k2.serializer.class = Kafka.serializer.StringEncoder
agent.sinks.k2.producer.type = async
#agent.sinks.k2.batchSize = 100
agent.sources.r1.selector.type = replicating
############################define channel begin
##define c1
agent.channels.c1.type = file
agent.channels.fileChannel.checkpointDir = /opt/soft/apache-flume-1.6.0-bin/checkpoint
agent.channels.fileChannel.dataDirs = /opt/soft/apache-flume-1.6.0-bin/dataDir
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 100
##define c2
agent.channels.c2.type = memory
agent.channels.c2.capacity = 1000000
agent.channels.c2.transactionCapacity = 100
最后
以上就是大气老虎为你收集整理的Flume常见错误整理(持续更新ing...)的全部内容,希望文章能够帮你解决Flume常见错误整理(持续更新ing...)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复