概述
1、单一代理流配置
1.1 官网介绍
http://flume.apache.org/FlumeUserGuide.html#avro-source
通过一个通道将来源和接收器链接。需要列出源,接收器和通道,为给定的代理,然后指向源和接收器及通道。一个源的实例可以指定多个通道,但只能指定一个接收器实例。格式如下:
实例解析:一个代理名为agent_foo,外部通过avro客户端,并且发送数据通过内存通道给hdfs。在配置文件foo.config的可能看起来像这样:
案例说明:这将使事件流从avro-appserver-src-1到hdfs-sink-1通过内存通道mem-channel-1。当代理开始foo.config作为其配置文件,它会实例化流。
配置单个组件
定义流之后,需要设置每个源,接收器和通道的属性。可以分别设定组件的属性值。
“type”属性必须为每个组件设置,以了解它需要什么样的对象。每个源,接收器和通道类型有其自己的一套,它所需的性能,以实现预期的功能。所有这些,必须根据需要设置。在前面的例子中,从hdfs-sink-1中的流到HDFS,通过内存通道mem-channel-1的avro-appserver-src-1源。下面是 一个例子,显示了这些组件的配置。
1.2、测试示例(一)
流配置
单一代理流配置
案例1:通过flume来监控一个目录,当目录中有新文件时,将文件内容输出到控制台。
#文件名:sample1.properties
#配置内容:
分别在linux系统里面建两个文件夹:一个文件夹用于存储配置文件(flumetest),一个文件夹用于存储需要读取的文件(flume)
#监控指定的目录,如果有新文件产生,那么将文件的内容显示到控制台
#配置一个agent agent的名称可以自定义
#指定agent的 sources,sinks,channels
#分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义
a1.sources=s1
a1.channels=c1
a1.sinks=k1
#配置 source 根据 agent的 sources 的名称来对 source 进行配置
#source 的参数是根据 不同的数据源 配置不同---在文档查找即可
#配置目录 source
flume这个文件夹用于存储需要读取的文件
a1.sources.s1.type=spooldir
a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume
#配置 channel 根据 agent的 channels的名称来对 channels 进行配置
#配置内存 channel
a1.channels.c1.type=memory
#配置 sink 根据 agent的sinks 的名称来对 sinks 进行配置
#配置一个 logger sink
a1.sinks.k1.type=logger
#绑定 特别注意 source的channel 的绑定有 s,sink的 channel的绑定没有 s
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
把 sample1.properties 配置文件上传到linux系统上的 flumetest 文件夹:
用这个命令来启动Flume:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/sample1.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置
--conf-file 指定日志收集的配置文件
--name 指定agent的名称
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动的部分日志内容:
18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Creating channels
18/05/05 20:28:16 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Created channel c1
18/05/05 20:28:16 INFO source.DefaultSourceFactory: Creating instance of source s1, type spooldir
18/05/05 20:28:16 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
18/05/05 20:28:16 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1]
18/05/05 20:28:16 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Spool Directory source s1: { spoolDir: /home/hadoop/apps/apache-flume-1.8.0-bin/flume } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@101f0f3a counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/05/05 20:28:16 INFO node.Application: Starting Channel c1
18/05/05 20:28:16 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/05/05 20:28:16 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/05/05 20:28:16 INFO node.Application: Starting Sink k1
18/05/05 20:28:16 INFO node.Application: Starting Source s1
18/05/05 20:28:16 INFO source.SpoolDirectorySource: SpoolDirectorySource source starting with directory: /home/hadoop/apps/apache-flume-1.8.0-bin/flume
18/05/05 20:28:17 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
18/05/05 20:28:17 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
在liunx系统中新建一个文件 hello.txt
[hadoop@hadoop02 ~]$ vi hello.txt
hello
world
把这个文件复制到 存储读取文件的目录下:(这个配置文件所设置的文件夹)
a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume
使用命令:
[hadoop@hadoop02 ~]$ cp hello.txt ~/apps/apache-flume-1.8.0-bin/flume
读取结果:
18/05/05 20:30:10 INFO avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
18/05/05 20:30:10 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apps/apache-flume-1.8.0-bin/flume/hello.txt to /home/hadoop/apps/apache-flume-1.8.0-bin/flume/hello.txt.COMPLETED
18/05/05 20:30:14 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F
hello }
18/05/05 20:30:14 INFO sink.LoggerSink: Event: { headers:{} body: 77 6F 72 6C 64
world }
1.3、测试案例(二)
案例2:实时模拟从web服务器中读取数据到hdfs中
Exec Source介绍
模拟web界面的数据,需要一直启动着
新建一个空文件:
[hadoop@hadoop02 tomcat]$ touch catalina.out
[hadoop@hadoop02 tomcat]$ ll
total 0
-rw-rw-r--. 1 hadoop hadoop 0 May
6 12:19 catalina.out
写一个脚本,依次往这个文件里面读入数据:
[hadoop@hadoop02 tomcat]$ while true; do echo `date` >> catalina.out; sleep 1; done
用这个命令进行查看:(数据在不断增加)
[hadoop@hadoop02 tomcat]$ tail -F catalina.out
Sun May 6 12:24:57 CST 2018
Sun May 6 12:24:58 CST 2018
Sun May 6 12:24:59 CST 2018
Sun May 6 12:25:00 CST 2018
Sun May 6 12:25:01 CST 2018
#文件名:case_hdfs.properties
#配置内容:(是在同一个节点上进行操作)
读取的是tomcat/catalina.out 里面的数据(这个数据一直在不断的更新,每次读取的都是最后一次的数据)
#配置一个agent
agent的名称可以自定义
#指定agent的sources,sinks,channels
#分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#配置source
根据agent的sources的名称来对source进行配置
#source的参数是根据 不同的数据源 配置不同---在文档查找即可
#配置source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /home/hadoop/tomcat/catalina.out
#配置channel 根据agent的channels的名称来对channels进行配置
#配置channel
a1.channels.c1.type = memory
#配置sink 根据agent的sinks的名称来对sinks进行配置
#配置一个hdfs sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M
#设置目录的回滚
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#设置前缀和后缀
a1.sinks.k1.hdfs.filePrefix = taobao
a1.sinks.k1.hdfs.fileSuffix = log
#设置文件的回滚
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 1024
a1.sinks.k1.hdfs.rollCount = 10
a1.sinks.k1.hdfs.fileType = DataStream
#为source 指定它的channel
a1.sources.s1.channels = c1
#为sink 指定他的 channel
a1.sinks.k1.channel = c1
运行命令:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_hdfs.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置
--conf-file 指定日志收集的配置文件
--name 指定agent的名称
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台
运行的部分日志结果:
18/05/06 16:09:44 INFO conf.FlumeConfiguration: Processing:k1
18/05/06 16:09:44 INFO conf.FlumeConfiguration: Processing:k1
18/05/06 16:09:44 INFO conf.FlumeConfiguration: Processing:k1
18/05/06 16:09:44 INFO conf.FlumeConfiguration: Processing:k1
18/05/06 16:09:44 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
18/05/06 16:09:44 INFO node.AbstractConfigurationProvider: Creating channels
18/05/06 16:09:44 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
18/05/06 16:09:44 INFO node.AbstractConfigurationProvider: Created channel c1
18/05/06 16:09:44 INFO source.DefaultSourceFactory: Creating instance of source s1, type exec
18/05/06 16:09:44 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: hdfs
18/05/06 16:09:44 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1]
18/05/06 16:09:44 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@c992421 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/05/06 16:09:44 INFO node.Application: Starting Channel c1
18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/05/06 16:09:44 INFO node.Application: Starting Sink k1
18/05/06 16:09:44 INFO node.Application: Starting Source s1
18/05/06 16:09:44 INFO source.ExecSource: Exec source starting with command: tail -F /home/hadoop/tomcat/catalina.out
18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: s1 started
18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
18/05/06 16:09:44 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
在HDFS节点上查看结果:
读取文件的多少由配置的参数决定的。
1.4、测试案例(三)
案例2:tcp
#文件名:case_tcp.properties
#配置内容:(是在同一个节点上进行操作)
上传到hadoop02节点:
分别在linux系统里面建两个文件夹:一个文件夹用于存储配置文件(flumetest),一个文件夹用于存储需要读取的文件(flume)
#通过 avro source 读取指定端口的输入数据
到控制台显示。
a1.sources=s1
a1.channels=c1
a1.sinks=k1
a1.sources.s1.type=netcat
a1.sources.s1.bind=192.168.123.102
a1.sources.s1.port=55555
a1.channels.c1.type=memory
a1.sinks.k1.type=logger
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
把 case_tcp.properties 配置文件上传到linux系统上的 flumetest 文件夹:
用这个命令来启动Flume:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_tcp.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置
--conf-file 指定日志收集的配置文件
--name 指定agent的名称
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动后的部分日志结果:
op/apps/apache-flume-1.8.0-bin/flumetest/case_tcp.properties
18/05/06 10:41:34 INFO conf.FlumeConfiguration: Added sinks: k1 Agent: a1
18/05/06 10:41:34 INFO conf.FlumeConfiguration: Processing:k1
18/05/06 10:41:34 INFO conf.FlumeConfiguration: Processing:k1
18/05/06 10:41:34 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
18/05/06 10:41:34 INFO node.AbstractConfigurationProvider: Creating channels
18/05/06 10:41:34 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
18/05/06 10:41:34 INFO node.AbstractConfigurationProvider: Created channel c1
18/05/06 10:41:34 INFO source.DefaultSourceFactory: Creating instance of source s1, type netcat
18/05/06 10:41:34 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: logger
18/05/06 10:41:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1]
18/05/06 10:41:34 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@738ed94d counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/05/06 10:41:34 INFO node.Application: Starting Channel c1
18/05/06 10:41:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/05/06 10:41:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/05/06 10:41:34 INFO node.Application: Starting Sink k1
18/05/06 10:41:34 INFO node.Application: Starting Source s1
18/05/06 10:41:34 INFO source.NetcatSource: Source starting
18/05/06 10:41:34 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.123.102:55555]
打开一个相同节点的另一个窗口:
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ telnet 192.168.123.102 55555
-bash: telnet: command not found
输入上面的命令发现找不到这个 telnet 这个组件,需要从yum 上下载:(需要切换至root用户下)
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ su
Password:
[root@hadoop02 apache-flume-1.8.0-bin]# yum install telnet
Loaded plugins: fastestmirror, refresh-packagekit, security
Setting up Install Process
Determining fastest mirrors
epel/metalink
| 6.2 kB
00:00
* base: mirrors.sohu.com
* epel: mirrors.tongji.edu.cn
* extras: mirror.bit.edu.cn
* updates: mirror.bit.edu.cn
base
| 3.7 kB
00:00
epel
| 4.7 kB
00:00
epel/primary_db
| 6.0 MB
00:12
extras
| 3.4 kB
00:00
updates
| 3.4 kB
00:00
updates/primary_db
| 7.0 MB
00:03
Resolving Dependencies
--> Running transaction check
---> Package telnet.x86_64 1:0.17-48.el6 will be installed
--> Finished Dependency Resolution
Dependencies Resolved
===============================================================================================================
Package
Arch
Version
Repository
Size
===============================================================================================================
Installing:
telnet
x86_64
1:0.17-48.el6
base
58 k
Transaction Summary
===============================================================================================================
Install
1 Package(s)
Total download size: 58 k
Installed size: 109 k
Is this ok [y/N]: y
Downloading Packages:
telnet-0.17-48.el6.x86_64.rpm
|
58 kB
00:00
Running rpm_check_debug
Running Transaction Test
Transaction Test Succeeded
Running Transaction
Installing : 1:telnet-0.17-48.el6.x86_64
1/1
Verifying
: 1:telnet-0.17-48.el6.x86_64
1/1
Installed:
telnet.x86_64 1:0.17-48.el6
Complete!
[root@hadoop02 apache-flume-1.8.0-bin]#
下载完成之后运行这个命令:
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ telnet 192.168.123.102 55555
Trying 192.168.123.102...
Connected to 192.168.123.102.
Escape character is '^]'.
在下方输入hello:
[hadoop@hadoop02 apache-flume-1.8.0-bin]$ telnet 192.168.123.102 55555
Trying 192.168.123.102...
Connected to 192.168.123.102.
Escape character is '^]'.
hello
OK
在另一个节点上会展示在控制台上:
18/05/06 10:42:28 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 0D
hello. }
2、单代理多流配置
单代理多流配置是上面的加强版,相当于一个代理两个流,一个是从外部avro客户端到HDFS,另一个是linux命令(tail)的输出到Avro接受代理,2个做成配置。
单个Flume代理可以包含几个独立的流。你可以在一个配置文件中列出多个源,接收器和通道。这些组件可以连接形成多个流。
可以连接源和接收器到其相应的通道,设置两个不同的流。例如,如果需要设置一个agent_foo代理两个流,一个从外部Avro客户端到HDFS,另外一个是tail的输出到Avro接收器,然后在这里是做一个配置
2.1、官方案例
3、配置多代理流程
流合并
设置一个多层的流,需要有一个指向下一跳avro源的第一跳的avro 接收器。这将导致第一Flume代理转发事件到下一个Flume代理。例如,如果定期发送的文件,每个事件(1文件)AVRO客户端使用本地Flume 代理,那么这个当地的代理可以转发到另一个有存储的代理。
配置如下
3.1 官方案例这里连接从weblog-agent的avro-forward-sink 到hdfs-agent的avro-collection-source收集源。最终结果从外部源的appserver最终存储在HDFS的事件。
3.2、测试案例(二)
#配置内容:(现在配置的在不同的节点上)(如果要配置在同一个节点上 agent 的别名必须不一样,一个是a1,一个是a2)
在不同的节点上,端口号可以相同(因为一个是写入,一个是写出)。
在同一个节点上,端口号一定不一样。
#文件名:case_source.properties
#配置一个agent
agent的名称可以自定义
#指定agent的sources,sinks,channels
#分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#配置source
根据agent的sources的名称来对source进行配置
#source的参数是根据 不同的数据源 配置不同---在文档查找即可
#配置source
a1.sources.s1.type = netcat
a1.sources.s1.bind = 192.168.123.102
a1.sources.s1.port = 44455
#配置channel 根据agent的channels的名称来对channels进行配置
#配置channel
a1.channels.c1.type = memory
#配置sink 根据agent的sinks的名称来对sinks进行配置
#配置一个hdfs sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.123.103
a1.sinks.k1.port = 44466
#为source 指定它的channel
a1.sources.s1.channels = c1
#为sink 指定他的 channel
a1.sinks.k1.channel = c1
#文件名:case_sink.properties
#配置一个agent
agent的名称可以自定义
#指定agent的sources,sinks,channels
#分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#配置source
根据agent的sources的名称来对source进行配置
#source的参数是根据 不同的数据源 配置不同---在文档查找即可
#配置source
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.123.103
a1.sources.s1.port = 44466
#配置channel 根据agent的channels的名称来对channels进行配置
#配置channel
a1.channels.c1.type = memory
#配置sink 根据agent的sinks的名称来对sinks进行配置
#配置一个hdfs sink
a1.sinks.k1.type = logger
#为source 指定它的channel
a1.sources.s1.channels = c1
#为sink 指定他的 channel
a1.sinks.k1.channel = c1
必须先启动hadoop03(192.168.123.103)这个节点:
启动命令:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_sink.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置
--conf-file 指定日志收集的配置文件
--name 指定agent的名称
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动的部分日志结果:
2018-05-06 17:06:59,944 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1
2018-05-06 17:07:00,133 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2018-05-06 17:07:00,134 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2018-05-06 17:07:00,141 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
2018-05-06 17:07:00,142 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1
2018-05-06 17:07:00,149 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:234)] Starting Avro source s1: { bindAddress: 192.168.123.103, port: 44466 }...
2018-05-06 17:07:01,029 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
2018-05-06 17:07:01,054 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: s1 started
2018-05-06 17:07:01,064 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source s1 started.
2018-05-06 17:07:32,758 (New I/O server boss #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x5fadd41f, /192.168.123.102:54177 => /192.168.123.103:44466] OPEN
2018-05-06 17:07:32,761 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x5fadd41f, /192.168.123.102:54177 => /192.168.123.103:44466] BOUND: /192.168.123.103:44466
2018-05-06 17:07:32,761 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x5fadd41f, /192.168.123.102:54177 => /192.168.123.103:44466] CONNECTED: /192.168.123.102:54177
再启动hadoop02(192.168.123.102)这个节点:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_source.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置
--conf-file 指定日志收集的配置文件
--name 指定agent的名称
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台
启动的部分日志结果:
18/05/06 17:07:32 INFO node.AbstractConfigurationProvider: Creating channels
18/05/06 17:07:32 INFO channel.DefaultChannelFactory: Creating instance of channel c1 type memory
18/05/06 17:07:32 INFO node.AbstractConfigurationProvider: Created channel c1
18/05/06 17:07:32 INFO source.DefaultSourceFactory: Creating instance of source s1, type netcat
18/05/06 17:07:32 INFO sink.DefaultSinkFactory: Creating instance of sink: k1, type: avro
18/05/06 17:07:32 INFO sink.AbstractRpcSink: Connection reset is set to 0. Will not reset connection to next hop
18/05/06 17:07:32 INFO node.AbstractConfigurationProvider: Channel c1 connected to [s1, k1]
18/05/06 17:07:32 INFO node.Application: Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:s1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@67749305 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/05/06 17:07:32 INFO node.Application: Starting Channel c1
18/05/06 17:07:32 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/05/06 17:07:32 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/05/06 17:07:32 INFO node.Application: Starting Sink k1
18/05/06 17:07:32 INFO node.Application: Starting Source s1
18/05/06 17:07:32 INFO sink.AbstractRpcSink: Starting RpcSink k1 { host: 192.168.123.103, port: 44466 }...
18/05/06 17:07:32 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
18/05/06 17:07:32 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
18/05/06 17:07:32 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: 192.168.123.103, port: 44466
18/05/06 17:07:32 INFO sink.AvroSink: Attempting to create Avro Rpc client.
18/05/06 17:07:32 INFO source.NetcatSource: Source starting
18/05/06 17:07:32 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.123.102:44455]
18/05/06 17:07:32 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
18/05/06 17:07:33 INFO sink.AbstractRpcSink: Rpc sink k1 started.
进行测试:
在hadoop02(192.168.123.102)上写数据:(记住,是当前的IP地址)
[hadoop@hadoop02 tomcat]$ telnet 192.168.123.102 44455
Trying 192.168.123.102...
Connected to 192.168.123.102.
Escape character is '^]'.
hello
OK
world
OK
会在hadoop03(192.168.123.103)控制台上显示:
2018-05-06 17:08:58,204 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 0D
hello. }
2018-05-06 17:09:28,215 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 77 6F 72 6C 64 0D
world. }
4、多路复用流
Flume支持扇出流从一个源到多个通道。有两种模式的扇出,复制和复用。在复制流的事件被发送到所有的配置通道。在复用的情况下,事件被发送到合格的渠 道只有一个子集。扇出流,需要指定源和扇出通道的规则。这是通过添加一个通道“选择”,可以复制或复用。再进一步指定选择的规则,如果它是一个多路。如果你 不指定一个选择,则默认情况下它复制。
复用的选择集的属性进一步分叉。这需要指定一个事件属性映射到一组通道。选择配置属性中的每个事件头检查。如果指定的值相匹配,那么该事件被发送到所有的通道映射到该值。如果没有匹配,那么该事件被发送到设置为默认配置的通道。
映射允许每个值通道可以重叠。默认值可以包含任意数量的通道。下面的示例中有一个单一的流复用两条路径。代理有一个单一的avro源和连接道两个接收器的两个通道。
4.1、官方案例
“State”作为Header的选择检查。如果值是“CA”,然后将其发送到mem-channel-1,如果它的“AZ”的,那么jdbc- channel-2,如果它的“NY”那么发到这两个。如果“State”头未设置或不匹配的任何三个,然后去默认的mem-channel-1通道。
4.2、测试案例(二)复制
#配置内容:(现在配置的在不同的节点上)(如果要配置在同一个节点上 agent 的别名必须不一样,一个是a1,一个是a2)
在不同的节点上,端口号可以相同(因为一个是写入,一个是写出)。
在同一个节点上,端口号一定不一样。
(先启动k1,k2(Avro的source)监听端口,再启动sink)
#文件名:case_replicate_sink.properties(192.168.123.102)
#2个channel和2个sink的配置文件
# Name the components on this agent
a1.sources = s1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.s1.type = netcat
a1.sources.s1.port = 44455
a1.sources.s1.bind = 192.168.123.102
a1.sources.s1.selector.type = replicating
a1.sources.s1.channels = c1 c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.123.103
a1.sinks.k1.port = 44466
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 192.168.123.104
a1.sinks.k2.port = 44466
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
启动命令:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_replicate_sink.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置
--conf-file 指定日志收集的配置文件
--name 指定agent的名称
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台
部分结果日志:
2018-05-06 19:18:00,047 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2018-05-06 19:18:00,047 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:205)] Rpc sink k2: Building RpcClient with hostname: 192.168.123.104, port: 44466
2018-05-06 19:18:00,047 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:126)] Attempting to create Avro Rpc client.
2018-05-06 19:18:00,047 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1
2018-05-06 19:18:00,079 (lifecycleSupervisor-1-7) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)] Source starting
2018-05-06 19:18:00,084 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:287)] Starting RpcSink k1 { host: 192.168.123.103, port: 44466 }...
2018-05-06 19:18:00,084 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
2018-05-06 19:18:00,084 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
2018-05-06 19:18:00,085 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:205)] Rpc sink k1: Building RpcClient with hostname: 192.168.123.103, port: 44466
2018-05-06 19:18:00,085 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:126)] Attempting to create Avro Rpc client.
2018-05-06 19:18:00,124 (lifecycleSupervisor-1-1) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers
2018-05-06 19:18:00,127 (lifecycleSupervisor-1-5) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers
2018-05-06 19:18:00,127 (lifecycleSupervisor-1-7) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.123.102:44455]
2018-05-06 19:18:00,681 (lifecycleSupervisor-1-5) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:301)] Rpc sink k2 started.
2018-05-06 19:18:00,682 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:301)] Rpc sink k1 started.
#文件名:case_replicate_k1.properties(配置在hadoop03上,也就是 192.168.123.103)
# Name the components on this agent
a1.sources = s1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.s1.type = avro
a1.sources.s1.channels = c1
a1.sources.s1.bind = 192.168.123.103
a1.sources.s1.port = 44466
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
启动命令:
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_replicate_k1.properties --name a1 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置
--conf-file 指定日志收集的配置文件
--name 指定agent的名称
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台
部分结果日志:
(AbstractConfigurationProvider.java:201)] Created channel c1
2018-05-06 19:08:09,357 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source s1, type avro
2018-05-06 19:08:09,390 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
2018-05-06 19:08:09,396 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [s1, k1]
2018-05-06 19:08:09,405 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Avro source s1: { bindAddress: 192.168.123.103, port: 44466 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3a803148 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2018-05-06 19:08:09,421 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1
2018-05-06 19:08:09,546 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2018-05-06 19:08:09,554 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2018-05-06 19:08:09,561 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
2018-05-06 19:08:09,563 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1
2018-05-06 19:08:09,565 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:234)] Starting Avro source s1: { bindAddress: 192.168.123.103, port: 44466 }...
2018-05-06 19:08:10,244 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: s1: Successfully registered new MBean.
2018-05-06 19:08:10,255 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: s1 started
2018-05-06 19:08:10,258 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source s1 started.
#文件名:case_replicate_k2.properties(配置在hadoop04上,也就是 192.168.123.104)
# Name the components on this agent
a3.sources = s1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.s1.type = avro
a3.sources.s1.channels = c1
a3.sources.s1.bind = 192.168.123.104
a3.sources.s1.port = 44466
# Describe the sink
a3.sinks.k1.type = logger
a3.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
启动命令:注意:如果agent是a3,则命令中必须用a3
bin/flume-ng agent --conf conf --conf-file /home/hadoop/apps/apache-flume-1.8.0-bin/flumetest/case_replicate_k2.properties --name a3 -Dflume.root.logger=INFO,console
--conf 指定flume配置文件的位置
--conf-file 指定日志收集的配置文件
--name 指定agent的名称
-Dflume.root.logger=INFO,console 让收集的信息打印到控制台
部分结果日志:
roperty(FlumeConfiguration.java:1016)] Processing:k1
2018-05-06 19:07:32,260 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
2018-05-06 19:07:32,304 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a3]
2018-05-06 19:07:32,304 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
2018-05-06 19:07:32,333 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
2018-05-06 19:07:32,352 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1
2018-05-06 19:07:32,353 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source s1, type avro
2018-05-06 19:07:32,393 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
2018-05-06 19:07:32,400 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [s1, k1]
2018-05-06 19:07:32,409 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{s1=EventDrivenSourceRunner: { source:Avro source s1: { bindAddress: 192.168.123.104, port: 44466 } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@3a803148 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
2018-05-06 19:07:32,426 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1
2018-05-06 19:07:32,594 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
2018-05-06 19:07:32,597 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2018-05-06 19:07:32,603 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
2018-05-06 19:07:32,604 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source s1
#生成测试log
echo "hello via channel selector" | nc 192.168.123.102 6666
4.3、测试案例(三)复用
case_multi_sink.properties
#2个channel和2个sink的配置文件
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5140
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2
a1.sources.r1.selector.default = c1
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = 172.25.4.33
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
case_ multi _s1.properties
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.channels = c1
a2.sources.r1.bind = 172.25.4.23
a2.sources.r1.port = 4545
# Describe the sink
a2.sinks.k1.type = logger
a2.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
case_ multi _s2.properties
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.channels = c1
a3.sources.r1.bind = 172.25.4.33
a3.sources.r1.port = 4545
# Describe the sink
a3.sinks.k1.type = logger
a3.sinks.k1.channel = c1
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#先启动Avro的Source,监听端口
flume-ng agent -c . -f case_ multi _s1.conf -n a2 -Dflume.root.logger=INFO,console
flume-ng agent -c . -f case_ multi _s2.conf -n a3 -Dflume.root.logger=INFO,console
#再启动Avro的Sink
flume-ng agent -c . -f case_multi_sink.conf -n a1-Dflume.root.logger=INFO,console
#根据配置文件生成测试的header 为state的POST请求
curl -X POST -d '[{ "headers" :{"state" : "CZ"},"body" : "TEST1"}]' http://localhost:5140
curl -X POST -d '[{ "headers" :{"state" : "US"},"body" : "TEST2"}]' http://localhost:5140
curl -X POST -d '[{ "headers" :{"state" : "SH"},"body" : "TEST3"}]' http://localhost:5140最后
以上就是义气小白菜为你收集整理的Flume学习(三)Flume的配置方式1、单一代理流配置2、单代理多流配置3、配置多代理流程4、多路复用流的全部内容,希望文章能够帮你解决Flume学习(三)Flume的配置方式1、单一代理流配置2、单代理多流配置3、配置多代理流程4、多路复用流所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复