概述
由Cloudera 公司开发,然后贡献给了apache现已经成为apache下面的一级开源项目。
基本介绍:按照flume的官方文档,flume是一种分布式的,可靠的,有效收集,聚集和移动大量的日志数据的可用服务。它的架构基于数据流的简单且灵活,具有很好的鲁棒性和容错可调的可靠性机制和多故障转移和恢复机制。它使用了一个简单的可扩展的数据模型,允许在线分析应用。
适用范围:业界主要用flume来收集海量分布式的日志,常见案例是全量日志进入hadoop进行离线分析,实时数据流进行在线分析。
官方文档: flume
flume的安装运行
前置条件:
- Java运行环境 - Java 1.6 or later (Java 1.7 Recommended)
- 其他:内存,磁盘空间,和采集目录的文件读写相关权限
安装和运行:
极其简单,下载解压后改给配置文件就可以运行
下载:
$: wget http://apache.dataguru.cn/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz $: tar -xzvf apache-flume-1.6.0-bin.tar.gz
运行:
通过bin目录下面的shell 脚本其他例如:
$: bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
简单的例子(参考官网)
- 配置Java
$: cd conf $: cp flume-env.sh.template flume-env.sh $: vim flume-env.sh # 加入填入java home,例如 export JAVA_HOME=/usr/lib/jvm/java-7-oracle
2.填写配置文件
$: cp flume-conf.properties.template example.conf $ vim example.conf #填入如下内容 # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 启动flume
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
flume的常见架构
- 瀑布型架构
flume agent1.1 -> flume agent 2 -> flume agent3
flume agent1.2
此架构主要是进行简单的数据流转,曾经用于我们阿里云的服务器的数据流转回内网的测试。agent1 系列主要是部署到个阿里云主机进行数据采集,agent 2部署在阿里云进行了数据后,通过 ssh转发,数据统一流向了内网的 agent3 进行储存和使用。
flume agent1 :负责数据采集,实例配置
### Main a1.sources = src-exec1 src-cdir a1.channels = ch-file1 ch-file2 a1.sinks = sink-avro1 sink-avro2 ### Source ### #exec source a1.sources.src-exec1.type = exec a1.sources.src-exec1.command = tail -F /data/java_logs/java1/bbs/mc/info.log a1.sources.src-exec1.channels = ch-file1 #exec interceptor set a1.sources.src-exec1.interceptors = i1-1 i1-2 a1.sources.src-exec1.interceptors.i1-1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.src-exec1.interceptors.i1-1.preserveExisting = false a1.sources.src-exec1.interceptors.i1-1.hostHeader = clct-host a1.sources.src-exec1.interceptors.i1-2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder #custom spooldir a1.sources.src-cdir.type = com.jfz.cdp.flume.source.CustomSpoolDirectorySource a1.sources.src-cdir.channels = ch-file2 a1.sources.src-cdir.spoolDir = ../data/spoolDir_in a1.sources.src-cdir.fileHeader = true a1.sources.src-cdir.basenameHeader=true a1.sources.src-cdir.decodeErrorPolicy = IGNORE a1.sources.src-cdir.deletePolicy = immediate a1.sources.src-cdir.skipReadFileModifyTimeLessThanMillis = 60000 #custom spooldir interceptor set a1.sources.src-cdir.interceptors = i2-1 i2-2 a1.sources.src-cdir.interceptors.i2-1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.src-cdir.interceptors.i2-1.preserveExisting = false a1.sources.src-cdir.interceptors.i2-1.hostHeader = clct-host a1.sources.src-cdir.interceptors.i2-2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder ### Channel ### #file channel 1 set a1.channels.ch-file1.type = file a1.channels.ch-file1.checkpointDir = ../data/fileChannels/ch-file1/checkpoint a1.channels.ch-file1.dataDirs = ../data/fileChannels/ch-file1/data #file channel 2 set a1.channels.ch-file2.type = file a1.channels.ch-file2.checkpointDir = ../data/fileChannels/ch-file2/checkpoint a1.channels.ch-file2.dataDirs = ../data/fileChannels/ch-file2/data ### Sink ### #sink1 a1.sinks.sink-avro1.type = avro a1.sinks.sink-avro1.channel = ch-file1 a1.sinks.sink-avro1.hostname = 10.162.95.96 a1.sinks.sink-avro1.port = 50001 a1.sinks.sink-avro1.threads = 150 #sink2 a1.sinks.sink-avro2.type = avro a1.sinks.sink-avro2.channel = ch-file2 a1.sinks.sink-avro2.hostname = 10.162.95.96 a1.sinks.sink-avro2.port = 50002 a1.sinks.sink-avro2.threads = 150
flume agent2 :作为中间层进行数据中转,实例配置
### Main a1.sources = src-avro1 src-avro2 a1.channels = ch-file1 ch-file2 a1.sinks = sink-avro1 sink-avro2 ### Source ### #avro source 1 for really time stream a1.sources.src-avro1.type = avro a1.sources.src-avro1.channels = ch-file1 a1.sources.src-avro1.bind = 0.0.0.0 a1.sources.src-avro1.port = 50001 a1.sources.src-avro1.threads = 150 #avro interceptor 1 a1.sources.src-avro1.interceptors = i1-1 i1-2 a1.sources.src-avro1.interceptors.i1-1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.src-avro1.interceptors.i1-1.preserveExisting = true a1.sources.src-avro1.interceptors.i1-1.hostHeader = clct-host a1.sources.src-avro1.interceptors.i1-2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sources.src-avro1.interceptors.i1-2.preserveExisting = true #avro source 2 from spooldir a1.sources.src-avro2.type = avro a1.sources.src-avro2.channels = ch-file2 a1.sources.src-avro2.bind = 0.0.0.0 a1.sources.src-avro2.port = 50002 a1.sources.src-avro2.threads = 150 #avro interceptor 2 a1.sources.src-avro2.interceptors = i2-1 i2-2 a1.sources.src-avro2.interceptors.i2-1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.src-avro2.interceptors.i2-1.preserveExisting = true a1.sources.src-avro2.interceptors.i2-1.hostHeader = clct-host a1.sources.src-avro2.interceptors.i2-2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sources.src-avro2.interceptors.i2-2.preserveExisting = true ### Channel ### #file channel 1 set a1.channels.ch-file1.type = file a1.channels.ch-file1.checkpointDir = ../data/fileChannels/ch-file1/checkpoint a1.channels.ch-file1.dataDirs = ../data/fileChannels/ch-file1/data #file channel 2 set a1.channels.ch-file2.type = file a1.channels.ch-file2.checkpointDir = ../data/fileChannels/ch-file2/checkpoint a1.channels.ch-file2.dataDirs = ../data/fileChannels/ch-file2/data ### Sink ### #sink1 a1.sinks.sink-avro1.type = avro a1.sinks.sink-avro1.channel = ch-file1 a1.sinks.sink-avro1.hostname = 127.0.0.1 a1.sinks.sink-avro1.port = 60001 a1.sinks.sink-avro1.threads = 150 #sink2 a1.sinks.sink-avro2.type = avro a1.sinks.sink-avro2.channel = ch-file2 a1.sinks.sink-avro2.hostname = 127.0.0.1 a1.sinks.sink-avro2.port = 60002 a1.sinks.sink-avro2.threads = 150
flume agent2 :为数据存储做准备,实例配置
### Main ### a1.sources = src-avro1 src-avro2 a1.channels = ch-file1 ch-file2 a1.sinks = sink-rfm sink-hdfs2 ### Source ### #avro source 1 for really time stream a1.sources.src-avro1.type = avro a1.sources.src-avro1.channels = ch-file1 a1.sources.src-avro1.bind = 0.0.0.0 a1.sources.src-avro1.port = 60001 a1.sources.src-avro1.threads = 150 #avro interceptor 1 a1.sources.src-avro1.interceptors = i1-1 i1-2 a1.sources.src-avro1.interceptors.i1-1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.src-avro1.interceptors.i1-1.preserveExisting = true a1.sources.src-avro1.interceptors.i1-1.hostHeader = clct-host a1.sources.src-avro1.interceptors.i1-2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sources.src-avro1.interceptors.i1-2.preserveExisting = true #avro source 2 from spooldir a1.sources.src-avro2.type = avro a1.sources.src-avro2.channels = ch-file2 a1.sources.src-avro2.bind = 0.0.0.0 a1.sources.src-avro2.port = 60002 a1.sources.src-avro2.threads = 150 #avro interceptor 2 a1.sources.src-avro2.interceptors = i2-1 i2-2 a1.sources.src-avro2.interceptors.i2-1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.src-avro2.interceptors.i2-1.preserveExisting = true a1.sources.src-avro2.interceptors.i2-1.hostHeader = clct-host a1.sources.src-avro2.interceptors.i2-2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sources.src-avro2.interceptors.i2-2.preserveExisting = true ### Channel ### #file channel 1 set a1.channels.ch-file1.type = file a1.channels.ch-file1.checkpointDir = ../data/fileChannels/ch-file1/checkpoint a1.channels.ch-file1.dataDirs = ../data/fileChannels/ch-file1/data #file channel 2 set a1.channels.ch-file2.type = file a1.channels.ch-file2.checkpointDir = ../data/fileChannels/ch-file2/checkpoint a1.channels.ch-file2.dataDirs = ../data/fileChannels/ch-file2/data # improved rolling file sink1 a1.sinks.sink-rfm.type = com.jfz.cdp.flume.sinks.ImprovedRollingFileSink a1.sinks.sink-rfm.channel = ch-file1 a1.sinks.sink-rfm.sink.directory = ../data/logs/%Y-%m-%d a1.sinks.sink-rfm.sink.fileName = %H-%M-%S a1.sinks.sink-rfm.sink.rollInterval = 3600 a1.sinks.sink-rfm.sink.useLocalTime = false #sink2 to hdfs a1.sinks.sink-hdfs2.type = hdfs a1.sinks.sink-hdfs2.channel = ch-file2 a1.sinks.sink-hdfs2.hdfs.path = /user/dadeng/flume_logs/%{category}/dt=%Y-%m-%d a1.sinks.sink-hdfs2.hdfs.filePrefix = %{clct-host}_%{basename} a1.sinks.sink-hdfs2.hdfs.fileType = DataStream a1.sinks.sink-hdfs2.hdfs.rollSize = 102400000 a1.sinks.sink-hdfs2.hdfs.rollCount = 500000
- 3层架构,中间有控制层进行负载均衡并避免单点,适合可靠的全量数据传送。
Agent 1 的数据同时发往两个 control agent的示例:
## 这里省略了source的信息 a1.channels = ch-file1 ch-file2 a1.sinks = sink-avro1-1 sink-avro1-2 sink-avro2-1 sink-avro2-2 #file channel 1 set a1.channels.ch-file1.type = file a1.channels.ch-file1.checkpointDir = ../data/fileChannels/ch-file1/checkpoint a1.channels.ch-file1.dataDirs = ../data/fileChannels/ch-file1/data #file channel 2 set a1.channels.ch-file2.type = file a1.channels.ch-file2.checkpointDir = ../data/fileChannels/ch-file2/checkpoint a1.channels.ch-file2.dataDirs = ../data/fileChannels/ch-file2/data #sink group with load balancing a1.sinkgroups = sg-avro1 a1.sinkgroups.sg-avro1.sinks = sink-avro1-1 sink-avro1-2 a1.sinkgroups.sg-avro1.processor.type = load_balance a1.sinkgroups.sg-avro1.processor.backoff = true #sink1 to 10.1.2.51:41414 a1.sinks.sink-avro1-1.type = avro a1.sinks.sink-avro1-1.channel = ch-file1 a1.sinks.sink-avro1-1.hostname = 10.1.2.51 a1.sinks.sink-avro1-1.port = 41414 #sink2 to 10.1.2.52:41414 a1.sinks.sink-avro1-2.type = avro a1.sinks.sink-avro1-2.channel = ch-file1 a1.sinks.sink-avro1-2.hostname = 10.1.2.52 a1.sinks.sink-avro1-2.port = 41414 #sink group with load balancing a1.sinkgroups = sg-avro2 a1.sinkgroups.sg-avro2.sinks = sink-avro2-1 sink-avro2-2 a1.sinkgroups.sg-avro2.processor.type = load_balance a1.sinkgroups.sg-avro2.processor.backoff = true #sink1 to 10.1.2.51:41415 a1.sinks.sink-avro2-1.type = avro a1.sinks.sink-avro2-1.channel = ch-file2 a1.sinks.sink-avro2-1.hostname = 10.1.2.51 a1.sinks.sink-avro2-1.port = 41415 #sink2 to 10.1.2.52:41415 a1.sinks.sink-avro2-2.type = avro a1.sinks.sink-avro2-2.channel = ch-file2 a1.sinks.sink-avro2-2.hostname = 10.1.2.52 a1.sinks.sink-avro2-2.port = 41415
flume的常见配置问题
Source
flume的source类型很多,常用的有“spooldir”,“exec”,和“avro”
spooldir :适用于重要的日志传输,而且一般传输前数据已经另外存文件。
NOTE:spooldir有两个坑 1,如果传输的过程中有不可解码的流出现会导致flume停止服务,所以我们最好加上" a1.sources.src-cdir.decodeErrorPolicy = IGNORE"配置, 2.放入spooldir的文件不允许再更改,如果你使用cp来复制比较大的文件到spooldir目录的时候,有可能flume已经开始读文件,但是发现它还在进行更改会导致停止服务。 为了解决这个坑,我们自己开发了一个CustomSpooldirSource,它会暂时跳过配置文件“skipReadFileModifyTimeLessThanMillis ”指定的时间内有修改的文件来避免类似问题发生。
另外,spooldir可以通过log的短时间spilt产生新文件来带到准实时数据传输。
exec:主要是tail -F xxx.log 来实时获取更改。
avro:可以拥有接收指定主机的指定端口的数据,主要用来传输数据。或者可以和log4j集成,日志数据实时通过avro流给flume。
log4j的日志自动流给flume的配置
maven:加入一个依赖的jar包
<dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>${flume.version}</version> </dependency>
log4j增加一项配置
log4j.logger.flume=INFO, flume log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = 10.1.2.50 log4j.appender.flume.Port = 41414 log4j.appender.flume.UnsafeMode = true log4j.appender.flume.layout=org.apache.log4j.PatternLayout log4j.appender.flume.layout.ConversionPattern=%m%n
最后
以上就是重要音响为你收集整理的Flume日志采集系统的安装和部署的全部内容,希望文章能够帮你解决Flume日志采集系统的安装和部署所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复