我是靠谱客的博主 优雅睫毛膏,最近开发中收集的这篇文章主要介绍Flume案例,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、Flume的案例
1.案例1:Avro
Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。
1)创建agent的配置文件avro.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)启动flume

[root@logsrv03 apache-flume-1.6.0-bin]# bin/flume-ng agent -c . -f conf/avro.conf -n a1 -Dflume.root.logger=INFO,console

3)创建指定的文件(在当前的目录下./)

[root@logsrv03 apache-flume-1.6.0-bin]#echo "hello boy">./log.0

4)通过avro-client发送文件(这里的-H也可以写主机名,我的是logsrv03)

[root@logsrv03 apache-flume-1.6.0-bin]# bin/flume-ng avro-client -c . -H 172.17.6.148 -p 4141 -F ./log.0

5)然后在logsrv03的服务端窗口可以看到以下信息,注意最后一行的信息:
15/08/17 16:11:06 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/08/17 16:11:06 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/avro.conf
15/08/17 16:11:06 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
15/08/17 16:11:06 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 16:11:06 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 16:11:06 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
15/08/17 16:11:06 INFO node.AbstractConfigurationProvider: Creating channels
15/08/17 16:11:06 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
15/08/17 16:11:06 INFO node.AbstractConfigurationProvider: Created channel c1
15/08/17 16:11:06 INFO source.DefaultSourceFactory: Creating instance of source r1, type avro
15/08/17 16:11:06 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
15/08/17 16:11:07 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
15/08/17 16:11:07 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Avro source r1: { bindAddress: 0.0.0.0, port: 4141 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@245babce counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
15/08/17 16:11:07 INFO node.Application: Starting Channel c1
15/08/17 16:11:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
15/08/17 16:11:07 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
15/08/17 16:11:07 INFO node.Application: Starting Sink k1
15/08/17 16:11:07 INFO node.Application: Starting Source r1
15/08/17 16:11:07 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 0.0.0.0, port: 4141 }…
15/08/17 16:11:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
15/08/17 16:11:07 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
15/08/17 16:11:07 INFO source.AvroSource: Avro source r1 started.
15/08/17 16:11:19 INFO ipc.NettyServer: [id: 0xe795f282, /172.17.6.148:56953 => /172.17.6.148:4141] OPEN
15/08/17 16:11:19 INFO ipc.NettyServer: [id: 0xe795f282, /172.17.6.148:56953 => /172.17.6.148:4141] BOUND: /172.17.6.148:4141
15/08/17 16:11:19 INFO ipc.NettyServer: [id: 0xe795f282, /172.17.6.148:56953 => /172.17.6.148:4141] CONNECTED: /172.17.6.148:56953
15/08/17 16:11:19 INFO ipc.NettyServer: [id: 0xe795f282, /172.17.6.148:56953 :> /172.17.6.148:4141] DISCONNECTED
15/08/17 16:11:19 INFO ipc.NettyServer: [id: 0xe795f282, /172.17.6.148:56953 :> /172.17.6.148:4141] UNBOUND
15/08/17 16:11:19 INFO ipc.NettyServer: [id: 0xe795f282, /172.17.6.148:56953 :> /172.17.6.148:4141] CLOSED
15/08/17 16:11:19 INFO ipc.NettyServer: Connection to /172.17.6.148:56953 disconnected.
15/08/17 16:11:22 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64hello world }
2.案例2:spool
Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。
1)创建配置文件spool.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /home/hadoop/flume-1.5.0-bin/logs
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)启动flume

[root@logsrv03 apache-flume-1.6.0-bin]# bin/flume-ng agent -c . -f conf/spool.conf -n a1 -Dflume.root.logger=INFO,console

3)追加文件到usr/local/logs,这里的logs文件夹需要自己新建

[root@logsrv03 apache-flume-1.6.0-bin]#echo "spool test">./logs/spool_test.log

4)在控制台上可以看到以下信息:
15/08/17 16:31:06 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/08/17 16:31:06 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/spool.conf
15/08/17 16:31:06 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
15/08/17 16:31:06 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 16:31:06 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 16:31:06 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
15/08/17 16:31:06 INFO node.AbstractConfigurationProvider: Creating channels
15/08/17 16:31:06 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
15/08/17 16:31:07 INFO node.AbstractConfigurationProvider: Created channel c1
15/08/17 16:31:07 INFO source.DefaultSourceFactory: Creating instance of source r1, type spooldir
15/08/17 16:31:07 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
15/08/17 16:31:07 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
15/08/17 16:31:07 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Spool Directory source r1: { spoolDir: /usr/local/apache-flume-1.6.0-bin/logs } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@c25b3d5 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
15/08/17 16:31:07 INFO node.Application: Starting Channel c1
15/08/17 16:31:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
15/08/17 16:31:07 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
15/08/17 16:31:07 INFO node.Application: Starting Sink k1
15/08/17 16:31:07 INFO node.Application: Starting Source r1
15/08/17 16:31:07 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /usr/local/apache-flume-1.6.0-bin/logs
15/08/17 16:31:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
15/08/17 16:31:07 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
15/08/17 16:31:52 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
15/08/17 16:31:52 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /usr/local/apache-flume-1.6.0-bin/logs/spool_test.log_COMPLETED to /usr/local/apache-flume-1.6.0-bin/logs/spool_test.log_COMPLETED.COMPLETED
15/08/17 16:31:53 INFO sink.LoggerSink: Event: { headers:{file=/usr/local/apache-flume-1.6.0-bin/logs/spool_test.log_COMPLETED} body: 73 70 6F 6F 6C 20 74 65 73 74 spool test }
3.Exec
EXEC执行一个给定的命令获得输出的源,如果要使用tail命令,必选使得file足够大才能看到输出内容
1)创建agent的配置文件exec.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /home/hadoop/flume-1.5.0-bin/log_exec_tail
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)启动flume
[root@logsrv03 apache-flume-1.6.0-bin]# bin/flume-ng agent -c . -f conf/exec.conf -n a1 -Dflume.root.logger=INFO,console
3)使用循环加echo生成足够多的内容在文件中(文件需要新建)

[root@logsrv03 apache-flume-1.6.0-bin]#for i in {1..100};do echo "exec test$i">>./log_exec_tail;echo $i;sleep 0.1;done;

4)然后在控制台这边就可以看到以下信息:
15/08/17 16:40:27 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/08/17 16:40:27 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/exec.conf
15/08/17 16:40:27 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
15/08/17 16:40:27 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 16:40:27 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 16:40:27 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
15/08/17 16:40:27 INFO node.AbstractConfigurationProvider: Creating channels
15/08/17 16:40:27 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
15/08/17 16:40:27 INFO node.AbstractConfigurationProvider: Created channel c1
15/08/17 16:40:27 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
15/08/17 16:40:27 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
15/08/17 16:40:27 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
15/08/17 16:40:27 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6caf446 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
15/08/17 16:40:27 INFO node.Application: Starting Channel c1
15/08/17 16:40:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
15/08/17 16:40:27 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
15/08/17 16:40:27 INFO node.Application: Starting Sink k1
15/08/17 16:40:27 INFO node.Application: Starting Source r1
15/08/17 16:40:27 INFO source.ExecSource: Exec source starting with command:tail -F /usr/local/apache-flume-1.6.0-bin/log_exec_tail
15/08/17 16:40:27 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
15/08/17 16:40:27 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 31exec test91 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 32exec test92 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 33exec test93 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 34exec test94 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 35exec test95 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 36exec test96 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 37exec test97 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 38exec test98 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 39exec test99 }
15/08/17 16:40:31 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 30 30 exec test100 }
15/08/17 16:40:36 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 exec test1 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 32 exec test2 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 33 exec test3 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 exec test4 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 35 exec test5 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 36 exec test6 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 37 exec test7 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 38 exec test8 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 39 exec test9 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 30exec test10 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 31exec test11 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 32exec test12 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 33exec test13 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 34exec test14 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 35exec test15 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 36exec test16 }
15/08/17 16:40:38 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 31 37exec test17 }
····················
15/08/17 16:40:40 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 33 38exec test38 }
15/08/17 16:40:40 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 33 39exec test39 }
15/08/17 16:40:40 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 30exec test40 }
15/08/17 16:40:40 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 31exec test41 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 32exec test42 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 33exec test43 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 34exec test44 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 35exec test45 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 36exec test46 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 37exec test47 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 38exec test48 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 34 39exec test49 }
15/08/17 16:40:45 INFO sink.LoggerSink: Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 35 30exec test50 }
4.Syslogtcp
Syslogtcp监听TCP的端口做为数据源
1)创建agent配置文件syslog_tcp.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)启动flume

[root@logsrv03 apache-flume-1.6.0-bin]#bin/flume-ng agent -c . -f conf/exec.conf -n a1 -Dflume.root.logger=INFO,console

3)测试产生systemlog

[root@logsrv03 apache-flume-1.6.0-bin]#echo "hello
syslog_test" | nc localhost 5140

4)在控制台可以看到以下信息:
15/08/17 16:50:59 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/08/17 16:50:59 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/system_tcp.conf
15/08/17 16:50:59 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
15/08/17 16:50:59 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 16:50:59 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 16:50:59 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
15/08/17 16:50:59 INFO node.AbstractConfigurationProvider: Creating channels
15/08/17 16:50:59 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
15/08/17 16:50:59 INFO node.AbstractConfigurationProvider: Created channel c1
15/08/17 16:50:59 INFO source.DefaultSourceFactory: Creating instance of source r1, type syslogtcp
15/08/17 16:50:59 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
15/08/17 16:50:59 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
15/08/17 16:50:59 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@328bdd6b counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
15/08/17 16:50:59 INFO node.Application: Starting Channel c1
15/08/17 16:50:59 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
15/08/17 16:50:59 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
15/08/17 16:50:59 INFO node.Application: Starting Sink k1
15/08/17 16:50:59 INFO node.Application: Starting Source r1
15/08/17 16:51:00 INFO source.SyslogTcpSource: Syslog TCP Source starting…
15/08/17 16:51:13 WARN source.SyslogUtils: Event created from Invalid Syslog data.
15/08/17 16:51:14 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 68 65 6C 6C 6F 2C 73 79 73 6C 6F 67 5F 74 65 73 hello,syslog_test }
5.JSONHandler
1)创建agent配置文件post_json.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 8888
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)启动flume

[root@logsrv03 apache-flume-1.6.0-bin]# bin/flume-ng agent -c . -f conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console

3)生成JSON 格式的POST request

curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello boy"}]' http://localhost:8888

4)在控制台可以看到以下信息:

15/08/17 17:06:11 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
15/08/17 17:06:11 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/post_json.conf
15/08/17 17:06:11 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
15/08/17 17:06:11 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 17:06:11 INFO conf.FlumeConfiguration: Processing:k1
15/08/17 17:06:11 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
15/08/17 17:06:11 INFO node.AbstractConfigurationProvider: Creating channels
15/08/17 17:06:11 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
15/08/17 17:06:11 INFO node.AbstractConfigurationProvider: Created channel c1
15/08/17 17:06:11 INFO source.DefaultSourceFactory: Creating instance of source r1, type org.apache.flume.source.http.HTTPSource
15/08/17 17:06:11 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
15/08/17 17:06:11 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
15/08/17 17:06:11 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.http.HTTPSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4ef9bdff counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
15/08/17 17:06:11 INFO node.Application: Starting Channel c1
15/08/17 17:06:12 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
15/08/17 17:06:12 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
15/08/17 17:06:12 INFO node.Application: Starting Sink k1
15/08/17 17:06:12 INFO node.Application: Starting Source r1
15/08/17 17:06:12 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
15/08/17 17:06:12 INFO mortbay.log: jetty-6.1.26
15/08/17 17:06:12 INFO mortbay.log: Started SelectChannelConnector@0.0.0.0:8888
15/08/17 17:06:12 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
15/08/17 17:06:12 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
15/08/17 17:06:58 INFO sink.LoggerSink: Event: { headers:{b=b1, a=a1} body: 68 65 6C 6C 6F 20 62 6F 79
hello boy }

最后

以上就是优雅睫毛膏为你收集整理的Flume案例的全部内容,希望文章能够帮你解决Flume案例所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部