我是靠谱客的博主 潇洒鸡翅,最近开发中收集的这篇文章主要介绍Flume使用1. 架构2. 安装3. 简单案例4. Agent内部原理5. Flume的拓扑结构6. Flume事务7. 自定义Source、interceptors、sink8. 监控数据流9.常见问题,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文档查看地址:http://flume.apache.org/FlumeUserGuide.html

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。

Flume的作用就是实时读取服务器本地磁盘的数据,并将数据写入HDFS中

1. 架构

 名词解释:

  1. Agent:Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。Agent主要有3个部分组成,Source、Channel、Sink。

  2. Source:Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directorynetcat、sequence generator、syslog、http、legacy。

  3. Channel:Channel是位于Source和Sink之间的缓冲区。从Source采集的数据会被封装成一个event,然后将event放入channel中,供sink读取。
    因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。Flume自带两种Channel:Memory Channel(内存中的队列,在程序死亡,机器宕机的情况下会丢失数据)File Channel(所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据

  4. Sink:Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

    Sink组件目的地包括hdfsloggeravro、thrift、ipc、fileHBase、solr、自定义。

  5. Event:传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由HeaderBody两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。

  6. Interceptors:在source将event放入到channel之前,调用拦截器对event进行拦截和处理!Flume支持拦截器链,即由多个拦截器组合而成!通过指定拦截器链中拦截器的顺序,event将按照顺序依次被拦截器进行处理!

  7. Channel Selectors:Channel Selectors用于source组件将event传输给多个channel的场景。常用的有replicating(默认)和multiplexing两种类型。replicating负责将event复制到多个channel,而multiplexing则根据event的属性和配置的参数进行匹配,匹配成功则发送到指定的channel!

  8. Sink Processors:用户可以将多个sink组成一个整体(sink组),Sink Processors可用于提供组内的所有sink的负载平衡功能,或在时间故障的情况下实现从一个sink到另一个sink的故障转移。一般情况下,当多个sink从一个channel取数据时,为了保证数据的顺序,由sink processor从多个sink中挑选一个sink,由这个sink干活!

2. 安装

下载地址:http://archive.apache.org/dist/flume/

1. 下载apache-flume-1.9.0-bin.tar.gz包,上传到服务器,并解压

2. 配置flume的环境变量

3. 保证有JAVA_HOME

4. 查看版本:flume-ng version

5. 运行flume

 flume-ng  agent -c 配置文件所在的目录  -n agent的名称  -f agent配置文件   -Dproperty=value

3. 简单案例

3.1 实时监控端口数据

参考博客: https://blog.csdn.net/weixin_42063239/article/details/88362390

3.2 实时监控单个文件

需求:实时监控Hive日志,并上传到HDFS中

做法:使用EXEC SOURCE 

Execsouce的缺点:execsource和异步的source一样,无法做到在source向channel中放入event突发故障时,及时通知客户端,暂停生成数据,容易造成数据丢失!如果希望数据有强的可靠性保证,可以考虑使用SpoolingDirSource或TailDirSource或自己写Source自己控制!

可查看博客:https://blog.csdn.net/qq_41338249/article/details/101420440

3.3 监控多个文件

3.3.1 SpoolingDirSource

SpoolingDirSource用于监听多个文件。

SpoolingDirSource指定本地磁盘的一个目录为"Spooling(自动收集)"的目录!这个source可以读取目录中新增的文件,将文件的内容封装为event!  SpoolingDirSource在读取一整个文件到channel之后,它会采取策略,要么删除文件(是否可以删除取决于配置),要么对文件进程一个完成状态的重命名,这样可以保证source持续监控新的文件!SpoolingDirSource和execsource不同,SpoolingDirSource是可靠的!即使flume被杀死或重启,依然不丢数据!但是为了保证这个特性,付出的代价是,一旦flume发现以下情况,flume就会报错,停止!
   ①一个文件已经被放入目录,在采集文件时,不能被修改
   ②文件的名在放入目录后又被重新使用(出现了重名的文件)

1.  创建配置文件flume-dir-hdfs.conf(文件名自定义),并放入FLUME_HOME/conf文件夹中

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#组名.属性名=属性值,使用SpoolingDirSource时,type属性必须为spooldir
#spoolDir表示SpoolingDirSource监控哪个目录读取文件
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/usr/local/workspace/flume/upload
#忽略所有以.tmp结尾的文件,不上传
a1.sources.r1.ignorePattern=\S*\.tmp
#source处理完成后的文件自动添加的完成标识即文件名后缀
a1.sources.r1.fileSuffix=.COMPLETED

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#定义sink
a1.sinks.k1.type=hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要设置useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://hadoop101:9000/flume/%Y%m%d/%H/%M
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix=logs-

#以下三个和目录的滚动相关,目录一旦设置了时间转义序列,基于时间戳滚动
#是否将时间戳向下舍
a1.sinks.k1.hdfs.round=true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue=1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit=minute

#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp=true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize=100

#以下三个和文件的滚动相关,以下三个参数是或的关系!以下三个参数如果值为0都代表禁用!
#30秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval=30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize=134217700
#每写多少个event滚动一次,0表示不配置此项,文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount=0
#以不压缩的文本形式保存数据
a1.sinks.k1.hdfs.fileType=DataStream 


#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!绑定source和sink到channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

2. 指定配置文件并启动flume

# -n表示指定agent名称,由于我们配置文件中使用的a1,所以这里也写a1
# -c 指定flume的配置文件目录, -f指定配置文件, -c 和-f 是没有关系的,各自配置各自地址 
flume-ng -n a1 agent -c /usr/local/workspace/flume/conf/ -f /usr/local/workspace/flume/conf/flume-spooldir-source.conf -Dflume.root.logger=DEBUG,console

3. 上传文件测试

向SpoolingDirSource监控的目录(/usr/local/workspace/flume/upload)插入文件h.txt,插入完成,等待上传至HDFS,再次查看upload中的文件,发现文件名已被重命名为h.txt.COMPLETED,且成功上传至HDFS上

4. 遇到的bug

版本信息:我的hadoop版本为3.1.4,flume版本为1.9

说明:当我向SpoolingDirSource监控的目录插入文件后,预期结果应该是正确将文件上传至HDFS的flume目录,但是此时却报错了:

2021-08-05 23:53:47,350 ERROR hdfs.HDFSEventSink: process failed
java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
	at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679)
	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221)
	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
	at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
	at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679)
	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221)
	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572)
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)

原因猜测:flume lib下的guava包可能跟hadoop的guava包冲突了

解决办法:删除flume下的guava包,或者将hadoop中的guava包中复制一份到flume中

3.3.2 TailDirSource

  1. Taildir Source在工作时,会将读取文件的最后的位置记录在一个json文件中,一旦agent重启,会从之前已经记录的位置,继续执行tail操作!所以它相对于SpoolingDirSource可以读取多个文件最新追加写入的内容!
  2. Taildir Source是可靠的,即使flume出现了故障或挂掉。
  3. Json文件中,位置是可以修改,修改后,Taildir Source会从修改的位置进行tail操作!如果JSON文件丢失了,此时会重新从每个文件的第一行,重新读取,这会造成数据的重复!
  4. Taildir Source目前只能读文本文件!

常见问题: TailDirSource采集的文件,不能随意重命名!如果日志在正在写入时,名称为 xxxx.tmp,写入完成后,滚动,改名为xxx.log,此时一旦匹配规则可以匹配上述名称,就会发生数据的重复采集!

配置文件:

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/usr/local/workspace/flume/a.txt
a1.sources.r1.filegroups.f2=/usr/local/workspace/flume/b.txt
#a1.sources.r1.positionFile=/usr/local/workspace/tail-json/taildir_position.json

#定义sink
#sink将获取的数据打印到控制台上
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog=100


#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

注意:配置中定义的f1和f2均是文件,而不是文件夹,配置文件夹是无法监听的,一个filegroup想监听多个文件,可写正则表达式来匹配文件

4. Agent内部原理

ChannelSelector:

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

SinkProcessor

SinkProcessor有以下三种:
1.Default Sink Processor:如果agent中,只有一个sink,默认就使用Default Sink Processor,这个sink processor是强制用户将sink组成一个组。如果有多个sink,多个sink对接一个channel,不能选择Default Sink Processor。
2.Failover Sink Processor: 维护了一个多个sink的有优先级的列表,按照优先级保证,至少有一个sink是可以干活的!如果根据优先级发现,优先级高的sink故障了,故障的sink会被转移到一个故障的池中冷却!在冷却时,故障的sink也会不管尝试发送event,一旦发送成功,此时会将故障的sink再移动到存活的池中!

3.Load balancing Sink Processor:负载均衡的sink processor! Load balancing Sink Processor维持了sink组中active状态的sink!使用round_robin 或 random 算法,来分散sink组中存活的sink之间的负载!

5. Flume的拓扑结构

如果AgentA需要将Event对象发送到其他的agent进程中!那么 AgentA的sink必须为AvroSink,其他的agent在接收时,必须选择AvroSource!注意两个flume之间传输的是event,传输时使用的序列化方式。

5.1 简单串联

 将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

需求:agent1监听主机hadoop101的44444端口收集数据,然后将数据传输到agent2主机为hadoop102的33333

配置文件如图所示:

agent1
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=netcat
a1.sources.r1.bind=hadoop101
a1.sources.r1.port=44444

#定义sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=hadoop102
a1.sinks.k1.port=33333
#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

--------------------------------------------------------
#agent2
#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=avro
a1.sources.r1.bind=hadoop102
a1.sources.r1.port=33333

#定义sink
a1.sinks.k1.type=logger

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

5.2 复制和多路复用

 Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。

5.3 聚合

 一般的使用场景为:

1. Flume1和Flume2负责外网环境下采集数据,传输给内网环境的Flume3

2. 统一处理采集的数据

6. Flume事务

 概念:

  1. putList: source在向channel放入数据时的缓冲区!putList在初始化时,需要根据一个固定的size初始化,这个size在channel中设置,在channel中,这个size由参数transactionCapacity决定!
  2. takeList: sink在向channel拉取数据时的缓冲区!

6.1 事务流程

put事务流程:

source将封装好的一批event(批的数量可以通过source的参数定义),先放入到putList中,放入完成后,一次性commit(),这批event就可以写入到channel!写入完成后,清空putList,开始下一批数据的写入!假如一批event中的某些event在放入putList时,发生了异常,此时要执行rollback(),rollback()直接清空putList。

take事务流程:

sink不断从channel中拉取event,每拉取一个event,这个event会先放入takeList中!当一个batchSize的event全部拉取到takeList中之后,此时由sink执行写出处理!假如在写出过程中,发送了异常,此时执行回滚!将takeList中所有的event全部回滚到channel! 反之,如果写出没有异常,执行commit(),清空takeList!

注意:假设takeList中有50个event,sink一批获取50个event开始写出数据,当sink写到第30个event时出现了异常,那么前30个的event写出依然是成功的,只是会将takeList的这50个event全部回滚到channel中,当下次sink再次取event进行写操作时,前30个event就会被重复写出。

6.2 数量大小关系

  1. batchSize:  每个Source和Sink都可以配置一个batchSize的参数。 这个参数代表一次性到channel中put|take 多少个event!
  2. transactionCapacity: putList和takeList的初始值!
  3. capacity: channel中存储event的容量大小!

大小原则为:整个数据流转的流程中,前面的批处理大小要小于等于后面的容量

那么大小关系应该是:batchSize <=  transactionCapacity <=  capacity

如果不遵循该规则,可能会存在容量大小不够的问题,比如source的batchSize为100,而transactionCapacity为50,那么当source将一批event放入putList时,明显putList的容量不够,会报异常

7. 自定义Source、interceptors、sink

7.1 自定义拦截器

https://flume.apache.org/FlumeUserGuide.html#flume-interceptors

7.2 自定义Source

http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#source

7.3 自定义sink

http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#sink

8. 监控数据流

 在使用flume期间,我们需要监控以下数据:

  1.  channel当前的容量是多少?
  2. channel当前已经使用了多少容量?
  3. source向channel中put成功了多少个event?
  4. sink从channel中take成功了多少个event?

此时我们可以使用JMX技术,它是 java的监控扩展模块,J可以帮助我们实时监控一个java进程中需要了解的参数,可以实时修改java进程中某个对象的参数!

实现JMX监控的流程需要如下组件:

①MBean(monitor bean): 监控的参数封装的Bean
②JMX的monitor服务:这个服务可以在程序希望获取到MBean参数时,来请求服务,请求后服务帮我们对MBean的参数进行读写!flume已经提供了基于JMX的服务实现,如果我们希望使用,只需要启动此服务即可!
③客户端:客户端帮我们来向JMX服务发送请求,显示服务返回的Mbean的结果

MBean和JMX的monitor服务Flume已帮我们完成,我们只需要选择合适的客户端即可,客户端有如下几种方式:

  1. 使用JCONSOLE程序查看:在flume的conf/env.sh文件中,配置
    export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false”
  2.  使用web浏览器向JMX服务发请求查看,返回JSON格式的数据,这种方式更灵活,可以对接公司自己的大数据监控服务
    bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
  3.  使用第三方框架:官方有提供Ganglia框架,该框架自带web页面查看数据

9.常见问题

9.1 Flume采集数据会丢失吗

根据Flume的架构原理,Flume是不可能丢失数据的,其内部有完善的事务机制,Source到Channel是事务性的,Channel到Sink是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是Channel采用memoryChannel,agent宕机导致数据丢失,或者Channel存储数据已满,导致Source不再写入,未写入的数据丢失。

Flume不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。或者6.1中的注意也有可能会有数据的重复

最后

以上就是潇洒鸡翅为你收集整理的Flume使用1. 架构2. 安装3. 简单案例4. Agent内部原理5. Flume的拓扑结构6. Flume事务7. 自定义Source、interceptors、sink8. 监控数据流9.常见问题的全部内容,希望文章能够帮你解决Flume使用1. 架构2. 安装3. 简单案例4. Agent内部原理5. Flume的拓扑结构6. Flume事务7. 自定义Source、interceptors、sink8. 监控数据流9.常见问题所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部