我是靠谱客的博主 忧虑鸵鸟,最近开发中收集的这篇文章主要介绍大数据----【Flume、常用组件、load-balance、failover、日志采集汇总、Flume中小文件频繁滚动注意事项】Flume,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Flume

1 概述

Flume 是 Cloudera 提供的一个高可用的,高可靠的分布式的海量日志采集、聚合和传输软件

Flume核心 :

  • 数据源(source)
  • 目的地(sink)
  • 数据传输通道(channel)

Flume版本 :

  • FlumeOG(0.9X版本的统称) , 老版本
  • FlumeNG(1.X版本的统称) , 该版本常用

2 . 运行机制

Flume 系统中核心的角色是 agent,agent 本身是一个 Java 进程,一般运行在日志收集节点。

在这里插入图片描述

每一个 agent 相当于一个数据传递员,内部有三个组件:
Source:采集源,用于跟数据源对接,以获取数据;
Sink:下沉地,采集数据的传送目的,用于往下一级 agent 传递数据或者往最终存储系统传递数据;
Channel:agent 内部的数据传输通道,用于从 source 将数据传递到 sink;在整个数据的传输的过程中,流动的是 event,它是 Flume 内部数据传输的最基本单元

一个完整的 event 包括:event headers、event body、event 信息,其中event 信息就是 flume 收集到的日记记录。

3. Flume采集结构图

3.1 简单结构(单个agent)

在这里插入图片描述

3.2 复杂结构(多级agent串联)

在这里插入图片描述

4. Flume的安装部署

  • 将安装包放到/export/servers下 , 解压到当前文件夹
  • 配置环境变量JAVA_HOME , 将配置文件中注释掉的JAVA_HOME放开并修改即可export JAVA_HOME=/export/servers/jdk1.8.0_65
    测试 :
  • 1 、 先在 flume 的 的 conf 目录下新建一个文件vi netcat-logger.conf
#从网络端口接收数据,下沉到logger
#采集配置文件,netcat-logger.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动命令:
bin/flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

传入数据:
telnet localhost 44444
如果没有telnet , 下载即可: yum -y install telnet
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
看到返回OK说明配置成功
  • 2 、 启动 agent 去采集数据

    首先切换到flume目录下 , 然后执行下列命令

    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console

    • --conf/-c : 指定flume自身的配置文件所在目录
    • --conf-file/-f : 指定我们所描述的采集方案
    • --name/-n : 指定我们这个agent的名字
    • -Dflume.root.logger=INFO,console 开启日志记录功能
  • 3 、 测试

先要往 agent 采集监听的端口上发送数据,让 agent 有数据可采。

随便在一个能跟 agent 节点联网的机器上:

telnet anget-hostname port (例子 :telnet localhost 44444

5. Flume中常用组件

5.1 采集目录到DHFS

  • spooldir source

需求 : 服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

思路 : 根据需求定义一下3大要素

  • 采集源,即 source——监控文件目录 : spooldir
  • 下沉目标,即 sink——HDFS 文件系统 : hdfs sink
  • source和sink之间的传递通道——channel , 可用 file channel 也可以用内存 channel

**作用 : **监控一个指定的目录 该目录只要有新文件产生 就把新文件采集该目录一定不能有重名文件产生 否则报错 且罢工 , 将数据采集传输到hdfs

配置文件编写vi spooldir-hdfs.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
##注意:不能往监控目中重复丢同名文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/logs2
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

监视文件夹 /root/logs2
启动命令: bin/flume-ng agent -c ./conf -f ./conf/spool-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

capacity:默认该通道中最大的可以存储的 event 数量

trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

测试: 往/root/logs2放文件(mv ././xxxFile /home/hadoop/flumeSpool),但是不要在里面生成文件

扩展 :

即控制了文件夹的滚动方式

a1.sinks.k1.hdfs.round = true 表示是否开启时间上的舍弃 , 默认false
a1.sinks.k1.hdfs.roundValue = 10 舍弃的值 默认1
a1.sinks.k1.hdfs.roundUnit = minute 默认seconds
控制了文件夹的滚动 如果为false 不滚动 如果为true 滚动
该目录每 10 分钟新生成一个 17:30:00 17:40:00

控制了文件的滚动方式
a1.sinks.k1.hdfs.rollInterval = 3 时间间隔 默认30秒
a1.sinks.k1.hdfs.rollSize = 20 大小 默认1024bytes
a1.sinks.k1.hdfs.rollCount = 5 event数量 默认10个
如果三个属性都存在 谁先满足就触发
如果把某个属性设置为0 意味着不以该属性为标准

5.2 采集文件到HDFS

  • exec source

需求 : 比如业务系统使用 j log4j 生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到 hdfs

思路 : 根据需求定义一下3大要素

  • 采集源,即 source——监控文件内容更新 : exec ‘tail -F file’
  • 下沉目标,即 sink——HDFS 文件系统 : hdfs sink
  • Source和sink之间的传递通道——channel,可用 file channel 也可以用内存 channel

可以执行linux命令把命令执行的结果作为数据采集

模拟一个文件不断变化

while true;do date >> /root/logs/test.log;sleep 0.5;done
或者
#!/bin/bash
while true
do
	date >> /root/logs/test.log
	sleep 0.5
done

配置文件的编写 : vi tail-hdfs.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/test.log
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H-%M/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream



# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令前先启动hdfs服务

启动命令bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

测试 : while true;do date >> /root/logs/test.log;sleep 0.5;done

如果没有此文件夹创建即可

6. Flume的load-balance(负载均衡)、failover(容错)

负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。

解决了一台机器或者服务处理不了 多个一起处理的问题 但是不能重复处理

常见的负载均衡规则:轮询(round_robin) random随机 权重 等

Load balancing Sink Processor 能够实现 load balance 功能,如下图Agent1 是一个路由节点,负责将 Channel 暂存的 Event 均衡到对应的多个 Sink组件上,而每个 Sink 组件分别连接到一个独立的 Agent 上,示例配置,如下所示:

在这里插入图片描述

  • avro source / avro sink

当涉及两级flume之间数据传递的时候 使用avro即可 指定传递数据的ip 和端口

agent1.sinks.k2.hostname = node-3
agent1.sinks.k2.port = 52020

当涉及多级flume串联的时候 优先启动远离数据源的那级

需求 : 解决一台机器或者服务处理不了 , 用多个解决即可

node-1上编写vi exec-avro.conf

#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#设置组名
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /root/logs/123.log

# 设置下沉1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node-2
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node-3
agent1.sinks.k2.port = 52020

#设置下沉组
agent1.sinkgroups.g1.sinks = k1 k2

#设置为负载均衡
agent1.sinkgroups.g1.processor.type = load_balance
#如果开启 , 即将失败的sink放入黑名单
agent1.sinkgroups.g1.processor.backoff = true
#设置为轮询 , 还支持random
agent1.sinkgroups.g1.processor.selector = round_robin
#在黑名单放置的超时时间 , 超时结束时 , 若仍然无法接收 ,则超时时间呈指数增长
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

启动命令 : bin/flume-ng agent -c conf -f conf/exec-avro.conf -n agent1 -Dflume.root.logger=INFO,console

node-2上编写vi avro-logger.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node-2
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动 命令: bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

node-3 上编写vi avro-logger.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#描述轮询资源参数
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = node-3
a1.sources.r1.port = 52020

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令 : bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

测试 : while true;do date >> /root/logs/123.log;sleep 0.5;done

在这里插入图片描述

在这里插入图片描述

两台机器轮询输出

failover具体流程

Failover Sink Processor 维护一个优先级 Sink 组件列表,只要有一个 Sink组件可用,Event 就被传递到下一个组件。故障转移机制的作用是将失败的 Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦 Sink 成功发送一个事件,它将恢复到活动池。 Sink 具有与之相关的优先级,数量越大,优先级越高。

主要解决单点故障问题 , 备份越多 , 容错能力越强 , 同一时刻只能有一个对外提供服务

当正在干活挂了 备用的才会顶上去 从而保证服务连续可靠 即所谓的容忍了错误的发生

重写node-1中的vi exec-avro.conf

#agent1 name
agent1.channels = c1
agent1.sources = r1
agent1.sinks = k1 k2

#set gruop
agent1.sinkgroups = g1

#set channel
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100

agent1.sources.r1.channels = c1
agent1.sources.r1.type = exec
#监视文件
agent1.sources.r1.command = tail -F /root/logs/456.log

# set sink1
agent1.sinks.k1.channel = c1
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = node-2
agent1.sinks.k1.port = 52020

# set sink2
agent1.sinks.k2.channel = c1
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = node-3
agent1.sinks.k2.port = 52020

#set sink group
agent1.sinkgroups.g1.sinks = k1 k2

#set failover
agent1.sinkgroups.g1.processor.type = failover
#设置等级
agent1.sinkgroups.g1.processor.priority.k1 = 10
agent1.sinkgroups.g1.processor.priority.k2 = 1
agent1.sinkgroups.g1.processor.maxpenalty = 10000

同样是先启动原理数据源的那级 , 即node-2 , node-3

bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

在启动node-1

bin/flume-ng agent -c conf -f conf/exec-avro.conf -n agent1 -Dflume.root.logger=INFO,console

测试代码 : while true;do date >> /root/logs/456.log;sleep 0.5;done

我们发现当我们停止node-2时 , node-1会弹出信息 , 然后再去连接node-3保持继续工作

7. Flume实战案例

Flume拦截器

没有拦截器默认采集的数据的形式
Event: { headers:{} body:20 31 37 3A 35 32 Sat Nov 17 17:52 }

使用拦截器之后如下:在event heander中可以插入自定义kv对
Event: { headers:{type=access} body:20 31 37 3A 35 32 Sat Nov 17 17:52 }
Event: { headers:{type=web} body:20 31 37 3A 35 32 Sat Nov 17 17:52 }
Event: { headers:{type=nginx} body:20 31 37 3A 35 32 Sat Nov 17 17:52 }

%{type}可以提取key对应的value值

案例一 : 日志的采集和汇总

案例场景 : A、B 两台日志服务机器实时生产日志主要类型为 access.log、nginx.log、web.log

要求 : 把 A、B 机器中的 access.log、nginx.log、web.log 采集汇总到 C 机器上然后统一收集到 hdfs 中。
但是在 hdfs 中要求的目录为:
/source/logs/access/20160101/**
/source/logs/nginx/20160101/**
/source/logs/web/20160101/**

场景分析

在这里插入图片描述

在这里插入图片描述

流程处理分析

在这里插入图片描述

功能实现

  1. 在服务器 A (node-1)和服务器 B(node-2) 上创建配置文件

    在node-1 上编写 vi exec_source_avro_sink.conf

# Name the components on this agent
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
#资源状态
a1.sources.r1.type = exec
#监测的文件
a1.sources.r1.command = tail -F /root/logs1/access.log
#拦截器名字
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

a1.sources.r2.type = exec
#监测的文件
a1.sources.r2.command = tail -F /root/logs1/nginx.log
#拦截器名字
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

a1.sources.r3.type = exec
#监测的文件
a1.sources.r3.command = tail -F /root/logs1/web.log
#拦截器名字
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
#static 拦截器的功能就是往采集到的数据的 header 中插入自
#己定义的 key-value 对
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node-2
a1.sinks.k1.port = 41414

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

启动命令 : 后启动

bin/flume-ng agent -c conf -f conf/exec_source_avro_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

node-2 上编写vi avro_source_hdfs_sink.conf

#定义agent名, source、channel、sink的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1


#定义source
a1.sources.r1.type = avro
a1.sources.r1.bind = node-2
a1.sources.r1.port =41414

#添加时间拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder


#定义channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 2000
a1.channels.c1.transactionCapacity = 1000

#定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://node-1:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix =events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
#时间类型
#a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize =0

a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

#批量写入hdfs的个数
a1.sinks.k1.hdfs.batchSize = 1
#flume操作hdfs的线程数(包括新建,写入等)
a1.sinks.k1.hdfs.threadsPoolSize=10
#操作hdfs超时时间
a1.sinks.k1.hdfs.callTimeout=30000

#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令 : 先启动

bin/flume-ng agent -c conf -f conf/avro_source_hdfs_sink.conf -name a1 -Dflume.root.logger=DEBUG,console

测试命令 :

while true;do echo "access access" >> /root/logs1/access.log;sleep 0.5;done
while true;do echo "nginx nginx" >> /root/logs1/nginx.log;sleep 0.5;done
while true;do echo "web web" >> /root/logs1/web.log;sleep 0.5;done

扩展 :

Flume中遇到小文件频繁滚动注意事项

hdfs.minBlockReplicas是为了让flume感知不到hdfs的块复制,这样滚动方式配置才不会受影响。

假如hdfs的副本为3.那么配置的滚动时间为10秒,那么在第二秒的时候,flume检测到hdfs在复制块,那么这时候flume就会滚动,这样导致flume的滚动方式受到影响。所以配置flume hdfs.minBlockReplicas配置为1,就检测不到副本的复制了。但是hdfs的副本还是3

最后

以上就是忧虑鸵鸟为你收集整理的大数据----【Flume、常用组件、load-balance、failover、日志采集汇总、Flume中小文件频繁滚动注意事项】Flume的全部内容,希望文章能够帮你解决大数据----【Flume、常用组件、load-balance、failover、日志采集汇总、Flume中小文件频繁滚动注意事项】Flume所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部