我是靠谱客的博主 忧心钢铁侠,这篇文章主要介绍[flume]Source r1 has been removed due to an error during configuration,现在分享给大家,希望可以做个参考。
下游flume启动出现问题
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133//问题 Info: Including Hadoop libraries found via (/opt/module/hadoop-3.1.3/bin/hadoop) for HDFS access Info: Including Hive libraries found via () for Hive access + exec /opt/module/jdk1.8.0_212/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/module/flume/conf:/opt/module/flume/lib/*:/opt/module/hadoop-3.1.3/etc/hadoop:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/common/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/hdfs/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/mapreduce/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn:/opt/module/hadoop-3.1.3/share/hadoop/yarn/lib/*:/opt/module/hadoop-3.1.3/share/hadoop/yarn/*:/lib/*' -Djava.library.path=:/opt/module/hadoop-3.1.3/lib/native org.apache.flume.node.Application -n a1 -f /opt/module/applog/kafka-flume-hdfs.conf SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2022-07-05 11:05:50,278 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting 2022-07-05 11:05:50,284 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:138)] Reloading configuration file:/opt/module/applog/kafka-flume-hdfs.conf 2022-07-05 11:05:50,290 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2022-07-05 11:05:50,292 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2022-07-05 11:05:50,292 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2022-07-05 11:05:50,292 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,292 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,292 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,292 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2022-07-05 11:05:50,293 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,293 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2022-07-05 11:05:50,293 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2022-07-05 11:05:50,293 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,293 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,294 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2022-07-05 11:05:50,294 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2022-07-05 11:05:50,296 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2022-07-05 11:05:50,296 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,296 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1117)] Added sinks: k1 Agent: a1 2022-07-05 11:05:50,296 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,296 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,296 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2022-07-05 11:05:50,297 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:c1 2022-07-05 11:05:50,297 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2022-07-05 11:05:50,297 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2022-07-05 11:05:50,297 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:r1 2022-07-05 11:05:50,297 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addComponentConfig(FlumeConfiguration.java:1203)] Processing:k1 2022-07-05 11:05:50,298 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet(FlumeConfiguration.java:623)] Agent configuration for 'a1' has no configfilters. 2022-07-05 11:05:50,316 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:163)] Post-validation flume configuration contains configuration for agents: [a1] 2022-07-05 11:05:50,317 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:151)] Creating channels 2022-07-05 11:05:50,325 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type file 2022-07-05 11:05:50,359 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1 2022-07-05 11:05:50,360 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type org.apache.flume.source.kafka.KafkaSource 2022-07-05 11:05:50,369 (conf-file-poller-0) [INFO - org.apache.flume.source.kafka.KafkaSource.doConfigure(KafkaSource.java:417)] Group ID was not specified. Using flume as the group id. 2022-07-05 11:05:50,385 (conf-file-poller-0) [ERROR - org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:118)] Could not instantiate Builder. Exception follows. java.lang.InstantiationException: com.linmeng.flume.interceptor.TimestampInterceptor$Build at java.lang.Class.newInstance(Class.java:427) at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50) at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111) at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:342) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NoSuchMethodException: com.linmeng.flume.interceptor.TimestampInterceptor$Build.<init>() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.newInstance(Class.java:412) ... 14 more 2022-07-05 11:05:50,388 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:355)] Source r1 has been removed due to an error during configuration org.apache.flume.FlumeException: Interceptor.Builder not constructable. at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:119) at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82) at org.apache.flume.conf.Configurables.configure(Configurables.java:41) at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:342) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InstantiationException: com.linmeng.flume.interceptor.TimestampInterceptor$Build at java.lang.Class.newInstance(Class.java:427) at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50) at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111) ... 12 more Caused by: java.lang.NoSuchMethodException: com.linmeng.flume.interceptor.TimestampInterceptor$Build.<init>() at java.lang.Class.getConstructor0(Class.java:3082) at java.lang.Class.newInstance(Class.java:412) ... 14 more 2022-07-05 11:05:50,389 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: hdfs 2022-07-05 11:05:50,689 (conf-file-poller-0) [INFO - com.hadoop.compression.lzo.GPLNativeCodeLoader.<clinit>(GPLNativeCodeLoader.java:52)] Loaded native gpl library from the embedded binaries 2022-07-05 11:05:50,693 (conf-file-poller-0) [INFO - com.hadoop.compression.lzo.LzoCodec.<clinit>(LzoCodec.java:76)] Successfully loaded & initialized native-lzo library [hadoop-lzo rev 52decc77982b58949890770d22720a91adce0c3f] 2022-07-05 11:05:50,701 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:120)] Channel c1 connected to [k1] 2022-07-05 11:05:50,704 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:162)] Starting new configuration:{ sourceRunners:{} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@fdb5e31 counterGroup:{ name:null counters:{} } }} channels:{c1=FileChannel c1 { dataDirs: [/opt/data/flume/data/behavior1] }} } 2022-07-05 11:05:50,704 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:169)] Starting Channel c1 2022-07-05 11:05:50,706 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:278)] Starting FileChannel c1 { dataDirs: [/opt/data/flume/data/behavior1] }... 2022-07-05 11:05:50,758 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean. 2022-07-05 11:05:50,758 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started 2022-07-05 11:05:50,770 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.<init>(Log.java:356)] Encryption is not enabled 2022-07-05 11:05:50,770 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:406)] Replay started 2022-07-05 11:05:50,777 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:418)] Found NextFileID 5, from [/opt/data/flume/data/behavior1/log-4, /opt/data/flume/data/behavior1/log-3, /opt/data/flume/data/behavior1/log-1, /opt/data/flume/data/behavior1/log-5, /opt/data/flume/data/behavior1/log-2] 2022-07-05 11:05:50,782 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:55)] Starting up with /opt/data/behavior1/flume/checkpoint/behavior1/checkpoint and /opt/data/behavior1/flume/checkpoint/behavior1/checkpoint.meta 2022-07-05 11:05:50,782 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:59)] Reading checkpoint metadata from /opt/data/behavior1/flume/checkpoint/behavior1/checkpoint.meta 2022-07-05 11:05:50,852 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FlumeEventQueue.<init>(FlumeEventQueue.java:115)] QueueSet population inserting 0 took 0 2022-07-05 11:05:50,855 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:457)] Last Checkpoint Tue Jul 05 10:59:47 CST 2022, queue depth = 0 2022-07-05 11:05:50,857 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.doReplay(Log.java:542)] Replaying logs with v2 replay logic 2022-07-05 11:05:50,859 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:249)] Starting replay of [/opt/data/flume/data/behavior1/log-1, /opt/data/flume/data/behavior1/log-2, /opt/data/flume/data/behavior1/log-3, /opt/data/flume/data/behavior1/log-4, /opt/data/flume/data/behavior1/log-5] 2022-07-05 11:05:50,860 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:262)] Replaying /opt/data/flume/data/behavior1/log-1 2022-07-05 11:05:50,867 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.getDefaultDirectMemorySize(DirectMemoryUtils.java:112)] Unable to get maxDirectMemory from VM: NoSuchMethodException: sun.misc.VM.maxDirectMemory(null) 2022-07-05 11:05:50,869 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.allocate(DirectMemoryUtils.java:48)] Direct Memory Allocation: Allocation = 1048576, Allocated = 0, MaxDirectMemorySize = 18874368, Remaining = 18874368 2022-07-05 11:05:50,936 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition(LogFile.java:660)] Checkpoint for file(/opt/data/flume/data/behavior1/log-1) is: 1656987195500, which is beyond the requested checkpoint time: 1656989987074 and position 0 2022-07-05 11:05:50,936 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:262)] Replaying /opt/data/flume/data/behavior1/log-2 2022-07-05 11:05:50,937 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition(LogFile.java:660)] Checkpoint for file(/opt/data/flume/data/behavior1/log-2) is: 1656989500806, which is beyond the requested checkpoint time: 1656989987074 and position 0 2022-07-05 11:05:50,937 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:262)] Replaying /opt/data/flume/data/behavior1/log-3 2022-07-05 11:05:50,937 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition(LogFile.java:660)] Checkpoint for file(/opt/data/flume/data/behavior1/log-3) is: 1656989764692, which is beyond the requested checkpoint time: 1656989987074 and position 0 2022-07-05 11:05:50,937 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:262)] Replaying /opt/data/flume/data/behavior1/log-4 2022-07-05 11:05:50,937 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition(LogFile.java:660)] Checkpoint for file(/opt/data/flume/data/behavior1/log-4) is: 1656989913084, which is beyond the requested checkpoint time: 1656989987074 and position 0 2022-07-05 11:05:50,938 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:262)] Replaying /opt/data/flume/data/behavior1/log-5 2022-07-05 11:05:50,938 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition(LogFile.java:660)] Checkpoint for file(/opt/data/flume/data/behavior1/log-5) is: 1656989987074, which is beyond the requested checkpoint time: 1656989987074 and position 0 2022-07-05 11:05:50,938 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:345)] read: 0, put: 0, take: 0, rollback: 0, commit: 0, skip: 0, eventCount:0 2022-07-05 11:05:50,938 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FlumeEventQueue.replayComplete(FlumeEventQueue.java:417)] Search Count = 0, Search Time = 0, Copy Count = 0, Copy Time = 0 2022-07-05 11:05:50,948 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:505)] Rolling /opt/data/flume/data/behavior1 2022-07-05 11:05:50,948 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.roll(Log.java:990)] Roll start /opt/data/flume/data/behavior1 2022-07-05 11:05:50,950 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$Writer.<init>(LogFile.java:220)] Opened /opt/data/flume/data/behavior1/log-6 2022-07-05 11:05:50,962 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.roll(Log.java:1006)] Roll end 2022-07-05 11:05:50,962 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint(EventQueueBackingStoreFile.java:230)] Start checkpoint for /opt/data/behavior1/flume/checkpoint/behavior1/checkpoint, elements to sync = 0 2022-07-05 11:05:50,972 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:255)] Updating checkpoint metadata: logWriteOrderID: 1656990350791, queueSize: 0, queueHead: 0 2022-07-05 11:05:50,989 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:1065)] Updated checkpoint for file: /opt/data/flume/data/behavior1/log-6 position: 0 logWriteOrderID: 1656990350791 2022-07-05 11:05:50,990 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:289)] Queue Size after replay: 0 [channel=c1] 2022-07-05 11:05:50,990 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:196)] Starting Sink k1 2022-07-05 11:05:50,991 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 2022-07-05 11:05:50,992 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
flume拦截器代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55package com.linmeng.flume.interceptor; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; public class TimestampInterceptor implements Interceptor { private JsonParser jp; @Override public void initialize() { jp = new JsonParser(); } /** * 为单个event添加时间戳 * */ @Override public Event intercept(Event event) { //1.获取输入的event的header和body Map<String, String> headers = event.getHeaders(); byte[] body = event.getBody(); //2.从body中读取出日志生成的事件,写入header String s = new String(body, StandardCharsets.UTF_8); JsonElement parse = jp.parse(s); JsonObject aj = parse.getAsJsonObject(); long timestamp = aj.get("ts").getAsLong(); headers.put("timestap", Long.toString(timestamp)); //3.返回event return event; } @Override public List<Event> intercept(List<Event> events) { for (Event event : events) { intercept(event); } return events; } @Override public void close() { } public class Build implements Builder { @Override public Interceptor build() { return new TimestampInterceptor(); } @Override public void configure(Context context) { } } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34//配置文件 kafka-flume-hdfs # 组件 a1.sources=r1 a1.channels=c1 a1.sinks=k1 # source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics=topic_log a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.linmeng.flume.interceptor.TimestampInterceptor$Build # channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/data/behavior1//flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/data/flume/data/behavior1/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 # sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log- a1.sinks.k1.hdfs.useLocalTimeStamp = false a1.sinks.k1.hdfs.rollInterval = 20 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 # 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip # 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1
问题解决:
1.继承Buider的类,是静态的,需要加static
2.之前传入flume/lib中多个拦截器jar,造成了包冲突,删掉其他的即可
最后
以上就是忧心钢铁侠最近收集整理的关于[flume]Source r1 has been removed due to an error during configuration的全部内容,更多相关[flume]Source内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复