我是靠谱客的博主 繁荣大碗,最近开发中收集的这篇文章主要介绍【Flume_03_笔记】,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Flume_03_笔记

需求:1.采集日志数据 1w条 :java shell
采集到hdfs上 采用压缩存储 bzip2

source: exec taildir
channle
:mem file
sink: hdfs
=> bzip2
#采集日志数据脚本
for X in {1..10000}
do
echo "${X}" >> 1.log
done

1.文件用压缩格式进行存储:压缩格式:bzip2

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1=/home/hadoop/tmp/1.log

a1.channels.c1.type = memory

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://bigdata13:9000/flume/log/

a1.sinks.k1.hdfs.fileType=CompressedStream

a1.sinks.k1.hdfs.writeFormat=Text

a1.sinks.k1.hdfs.codeC=bzip2

#文件前后缀

a1.sinks.k1.hdfs.filePrefix=events

a1.sinks.k1.hdfs.fileSuffix=.log

#文件滚动

a1.sinks.k1.hdfs.rollInterval=60

a1.sinks.k1.hdfs.rollSize=134217728

a1.sinks.k1.hdfs.rollCount=100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动:启动agent 从后往前启,保证数据不丢失

flume-ng agent

–name a1

–conf ${FLUME_HOME}/conf

–conf-file /home/hadoop/project/flume/taildir-mem-hdfs.conf1

-Dflume.root.logger=info,console

channel选用:

file channel:

agent:

agent1.sources = r1

agent1.sinks = k1

agent1.channels = c1

agent1.sources.r1.type = TAILDIR

agent1.sources.r1.filegroups = f1

agent1.sources.r1.filegroups.f1=/home/hadoop/tmp/codec01.log

agent1.channels.c1.type = file

agent1.channels.c1.checkpointDir = /home/hadoop/project/flume/checkpoint/codec

agent1.channels.c1.dataDirs = /home/hadoop/project/flume/data/codec

agent1.sinks.k1.type = hdfs

agent1.sinks.k1.hdfs.path = hdfs://bigdata13:9000/flume/bzip2/

agent1.sinks.k1.hdfs.fileType=CompressedStream

agent1.sinks.k1.hdfs.writeFormat=Text

#文件前后缀

agent1.sinks.k1.hdfs.filePrefix=events

agent1.sinks.k1.hdfs.fileSuffix=.bz2

agent1.sinks.k1.hdfs.codeC=bzip2

#文件滚动

agent1.sinks.k1.hdfs.rollInterval=60

agent1.sinks.k1.hdfs.rollSize=134217728

agent1.sinks.k1.hdfs.rollCount=1000

agent1.sources.r1.channels = c1

agent1.sinks.k1.channel = c1

启动:

flume-ng agent

–name agent1

–conf ${FLUME_HOME}/conf

–conf-file /home/hadoop/project/flume/taildir-mem-hdfs.conf2

-Dflume.root.logger=info,console

高可用:

nn: 维护hdfs命名空间 =》 存储 整个hdfs 文件块的信息+ client读写请求
snn: 合并nn上面的镜像文件 =》 存储 整个hdfs 文件块的信息
默认 1h小时 3600s

​ snn不是nn的热备 冷备

负载、均衡:

​ flume 为了防止sink 发生故障

​ Sink Processors:
​ 负载:failover
​ 均衡:load_balance:
​ 1.随机发送数据
​ random
​ 2.轮循发送数据 :
​ round_robin
​ Default Sink

需求:读取1111端口数据 数据发送到 2222端口和3333端口 最终数据输出到 控制台?

均衡:load_balance:

3个agent :
agent1:
source:nc
channel :mem
sink : avro 两个sink

​ agent2:2222端口
​ source:avro
​ channel :mem
​ sink : logger
​ agent3:3333端口
​ source:avro
​ channel :mem
​ sink : logger

agent1:
agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1

agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata32
agent1.sources.r1.port = 1111

agent1.channels.c1.type = memory

#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata32
agent1.sinks.k1.port = 2222

#定义sink 3333
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = bigdata32
agent1.sinks.k2.port = 3333

#定义sink processers
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin

agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1

agent2:2222端口
agent2.sources = r1
agent2.sinks = k1
agent2.channels = c1

agent2.sources.r1.type = avro
agent2.sources.r1.bind = bigdata32
agent2.sources.r1.port = 2222

agent2.channels.c1.type = memory
agent2.sinks.k1.type = logger

agent2.sources.r1.channels = c1
agent2.sinks.k1.channel = c1

agent3: 3333端口
agent3.sources = r1
agent3.sinks = k1
agent3.channels = c1

agent3.sources.r1.type = avro
agent3.sources.r1.bind = bigdata32
agent3.sources.r1.port = 3333

agent3.channels.c1.type = memory
agent3.sinks.k1.type = logger

agent3.sources.r1.channels = c1
agent3.sinks.k1.channel = c1

启动agent:
从后往前 启动

flume-ng agent
–name agent3
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/sink/agent3.conf
-Dflume.root.logger=info,console

flume-ng agent
–name agent2
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/sink/agent2.conf
-Dflume.root.logger=info,console

flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/sink/agent1.conf
-Dflume.root.logger=info,console

telnet bigdata32 1111

容灾: sink 出现故障

​ 负载:Failover

​ 均衡:load_balance

均衡:load_balance
1.将数据分开 提供并行度的功能 减轻sink 压力
2.如果 第二个或者第三个 agent挂掉 数据都会发送到 没挂的sink 对应的agent上面

processor.backoff
true
processor.selector.maxTimeOut

agent1:
agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1

agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata32
agent1.sources.r1.port = 1111

agent1.channels.c1.type = memory

#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata32
agent1.sinks.k1.port = 2222

#定义sink 3333
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = bigdata32
agent1.sinks.k2.port = 3333

#定义sink processers
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 5
agent1.sinkgroups.g1.processor.priority.k2 = 10
agent1.sinkgroups.g1.processor.maxpenalty = 2000

agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1

flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/sink/agent1_failover.conf
-Dflume.root.logger=info,console

flume核心组件:

​ sources

​ 拦截器:interceptors 主要处理采集的数据 做数据转换/数据清洗

​ channel选择器:channel selectors 负责 指定采集的数据 发送到指定的channel

​ channels

​ sinks

​ sink processers 采集的数据 发送到 哪个sink

需求:定一个agent 端口1111采集数据 一个发送到 hdfs
另外一个 发送到 logger

agent:

agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1 c2

agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata32
agent1.sources.r1.port = 1111

#0 配置source channle
agent1.sources.r1.selector.type = replicating
agent1.sources.r1.channels = c1 c2

#1.配置两个channel
agent1.channels.c1.type = memory
agent1.channels.c2.type = memory

#定义sink hdfs
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.hdfs.path = hdfs://bigdata32:9000/flume/channel_selector/
agent1.sinks.k1.hdfs.fileType=DataStream
agent1.sinks.k1.hdfs.writeFormat=Text
#文件前后缀
agent1.sinks.k1.hdfs.filePrefix=events
agent1.sinks.k1.hdfs.fileSuffix=.log
agent1.sinks.k1.hdfs.useLocalTimeStamp=true
#文件滚动
agent1.sinks.k1.hdfs.rollInterval=60
agent1.sinks.k1.hdfs.rollSize=134217728
agent1.sinks.k1.hdfs.rollCount=1000

#定义sink logger
agent1.sinks.k2.type = logger

#定义 连接
agent1.sources.r1.channels = c1 c2
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c2

启动agent:
flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/channle/agent_logger_hdfs.conf
-Dflume.root.logger=info,console

telnet bigdata32 1111

channel_selector :
1.replicating => 所有channle 数据发送一致

作业:

1.三个agent完成 上面的事情:
agent1: 1111接收数据 发送 2222 和3333端口
agent2: 接收2222 数据发送到 logger
agent3: 接收3333 数据发送到 logger

一个agent实现:
agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1 c2

agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata32
agent1.sources.r1.port = 1111

#0 配置source channle
agent1.sources.r1.selector.type = replicating
agent1.sources.r1.channels = c1 c2

#1.配置两个channel
agent1.channels.c1.type = memory
agent1.channels.c2.type = memory

#定义sink hdfs
#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata32
agent1.sinks.k1.port = 2222

#定义sink 3333
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = bigdata32
agent1.sinks.k2.port = 3333

#定义 连接
agent1.sources.r1.channels = c1 c2
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c2

启动agent:

flume-ng agent
–name agent3
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/one2many/agent3.conf
-Dflume.root.logger=info,console

flume-ng agent
–name agent2
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/one2many/agent2.conf
-Dflume.root.logger=info,console

flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/one2many/agent1.conf
-Dflume.root.logger=info,console

telnet bigdata32 1111

多个agent实现:

agent1:

agent1.sources = r1

agent1.sinks = k1 k2

agent1.channels = c1 c2

agent1.sources.r1.type = netcat

agent1.sources.r1.bind = bigdata13

agent1.sources.r1.port = 1111

#0 配置source channle

agent1.sources.r1.selector.type = replicating

agent1.sources.r1.channels = c1 c2

agent1.channels.c1.type = memory

agent1.channels.c2.type = memory

#定义sink 2222

agent1.sinks.k1.type = avro

agent1.sinks.k1.hostname = bigdata13

agent1.sinks.k1.port = 2222

#定义sink 3333

agent1.sinks.k2.type = avro

agent1.sinks.k2.hostname = bigdata13

agent1.sinks.k2.port = 3333

#定义sink processers

agent1.sinkgroups = g1

agent1.sinkgroups.g1.sinks = k1 k2

agent1.sinkgroups.g1.processor.type = load_balance

agent1.sinkgroups.g1.processor.backoff = true

agent1.sinkgroups.g1.processor.selector = round_robin

agent1.sinkgroups.g1.processor.selector.maxTimeOut = 10000

agent1.sources.r1.channels = c1 c2

agent1.sinks.k1.channel = c1

agent1.sinks.k2.channel = c2

agent2:2222端口

agent2.sources = r1

agent2.sinks = k1

agent2.channels = c1

agent2.sources.r1.type = avro

agent2.sources.r1.bind = bigdata13

agent2.sources.r1.port = 2222

agent2.channels.c1.type = memory

#定义sink hdfs

agent2.sinks.k1.type = hdfs

agent2.sinks.k1.hdfs.path = hdfs://bigdata13:9000/flume/channel_selector/

agent2.sinks.k1.hdfs.fileType=DataStream

agent2.sinks.k1.hdfs.writeFormat=Text

#文件前后缀

agent2.sinks.k1.hdfs.filePrefix=events

agent2.sinks.k1.hdfs.fileSuffix=.log

agent2.sinks.k1.hdfs.useLocalTimeStamp=true

#文件滚动

agent2.sinks.k1.hdfs.rollInterval=60

agent2.sinks.k1.hdfs.rollSize=134217728

agent2.sinks.k1.hdfs.rollCount=1000

agent2.sources.r1.channels = c1

agent2.sinks.k1.channel = c1

agent3: 3333端口

agent3.sources = r1

agent3.sinks = k2

agent3.channels = c2

agent3.sources.r1.type = avro

agent3.sources.r1.bind = bigdata13

agent3.sources.r1.port = 3333

agent3.channels.c2.type = memory

agent3.sinks.k2.type = logger

agent3.sources.r1.channels = c2

agent3.sinks.k2.channel = c2

启动agent3:

flume-ng agent

–name agent3

–conf ${FLUME_HOME}/conf

–conf-file /home/hadoop/project/flume/channel/agent3.conf

-Dflume.root.logger=info,console

启动agent2:

flume-ng agent

–name agent2

–conf ${FLUME_HOME}/conf

–conf-file /home/hadoop/project/flume/channel/agent2.conf

-Dflume.root.logger=info,console

启动agent1:

flume-ng agent

–name agent1

–conf ${FLUME_HOME}/conf

–conf-file /home/hadoop/project/flume/channel/agent1.conf

-Dflume.root.logger=info,console

需求:多种日志采集到一个agent里面 之后 通过这个agent进行指定数据分发

agent1:
agent1.sources = r1
agent1.sinks = k1
agent1.channels = c1

agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata13
agent1.sources.r1.port = 1111

#添加一个拦截器 =》 数据清洗 + event打标签
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = dl2262
agent1.sources.r1.interceptors.i1.value = boy
#0 配置source channle
agent1.sources.r1.channels = c1
#1.配置两个channel
agent1.channels.c1.type = memory
#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata13
agent1.sinks.k1.port = 2222
#定义 连接
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1

agent2:
agent2.sources = r1
agent2.sinks = k1
agent2.channels = c1

agent2.sources.r1.type = netcat
agent2.sources.r1.bind = bigdata13
agent2.sources.r1.port = 1112

#添加一个拦截器 =》 数据清洗 + event打标签
agent2.sources.r1.interceptors = i1
agent2.sources.r1.interceptors.i1.type = static
agent2.sources.r1.interceptors.i1.key = dl2262
agent2.sources.r1.interceptors.i1.value = girl
#0 配置source channle
agent2.sources.r1.channels = c1
#1.配置两个channel
agent2.channels.c1.type = memory
#定义sink 2222
agent2.sinks.k1.type = avro
agent2.sinks.k1.hostname = bigdata13
agent2.sinks.k1.port = 2222
#定义 连接
agent2.sources.r1.channels = c1
agent2.sinks.k1.channel = c1

agent3:
agent3.sources = r1
agent3.sinks = k1
agent3.channels = c1

agent3.sources.r1.type = netcat
agent3.sources.r1.bind = bigdata13
agent3.sources.r1.port = 1113

#添加一个拦截器 =》 数据清洗 + event打标签
agent3.sources.r1.interceptors = i1
agent3.sources.r1.interceptors.i1.type = static
agent3.sources.r1.interceptors.i1.key = dl2262
agent3.sources.r1.interceptors.i1.value = tea
#0 配置source channle
agent3.sources.r1.channels = c1
#1.配置两个channel
agent3.channels.c1.type = memory
#定义sink 2222
agent3.sinks.k1.type = avro
agent3.sinks.k1.hostname = bigdata13
agent3.sinks.k1.port = 2222
#定义 连接
agent3.sources.r1.channels = c1
agent3.sinks.k1.channel = c1

agent4:

agent4.sources = r1
agent4.sinks = k1 k2 k3
agent4.channels = c1 c2 c3

agent4.sources.r1.type = avro
agent4.sources.r1.bind = bigdata13
agent4.sources.r1.port = 2222

#0 配置source channle
agent4.sources.r1.selector.type = multiplexing
agent4.sources.r1.selector.header = dl2262
agent4.sources.r1.selector.mapping.boy = c1
agent4.sources.r1.selector.mapping.girl = c2
agent4.sources.r1.selector.default = c3
agent4.sources.r1.channels = c1 c2 c3
#1.配置两个channel
agent4.channels.c1.type = memory
agent4.channels.c2.type = memory
agent4.channels.c3.type = memory
#定义sink logger
agent4.sinks.k1.type =logger
agent4.sinks.k2.type =logger
agent4.sinks.k3.type =logger
#定义 连接
agent4.sources.r1.channels = c1 c2 c3
agent4.sinks.k1.channel = c1
agent4.sinks.k2.channel = c2
agent4.sinks.k3.channel = c3

启动:
flume-ng agent
–name agent4
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/many2one/agent4.conf
-Dflume.root.logger=info,console

flume-ng agent
–name agent3
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/many2one/agent3.conf
-Dflume.root.logger=info,console

flume-ng agent
–name agent2
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/many2one/agent2.conf
-Dflume.root.logger=info,console

flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/many2one/agent1.conf
-Dflume.root.logger=info,console

telnet bigdata13 1111
telnet bigdata13 1112
telnet bigdata13 1113

补充:

channel容量:

​ 1.默认容量 capacity

​ 2.事务容量 transactionCapacity

​ 分为两部分:1.从source存数据

​ 2.从sink读数据

监控:监控的组件:channel

措施:

1…flume 提供 ganglia 框架 指标 【需要安装ganglia + 】

2.2.通过 agent 启动 配置一些参数 http 方式获取 【建议用这个 easy】
json数据 =》 http接口数据 =》


1.前端人员 可视化界面展示
2.采集 http接口数据 =》 mysql =》 可视化

拿到数据:
javase:api =》 url
springboot:
scala : source api
获取jason参数 linux命令:curl + 链接

​ 落盘:存到文件中

参数解释:
SOURCE:
OpenConnectionCount(打开的连接数)
Type(组件类型)
AppendBatchAcceptedCount(追加到channel中的批数量)
AppendBatchReceivedCount(source端刚刚追加的批数量)
EventAcceptedCount(成功放入channel的event数量)
AppendReceivedCount(source追加目前收到的数量)
StartTime(组件开始时间)
StopTime(组件停止时间)
EventReceivedCount(source端成功收到的event数量)
AppendAcceptedCount(放入channel的event数量)
CHANNEL:
EventPutSuccessCount(成功放入channel的event数量)
ChannelFillPercentage(通道使用比例)
Type(组件类型)
EventPutAttemptCount(尝试放入将event放入channel的次数)
ChannelSize(目前在channel中的event数量)
StartTime(组件开始时间)
StopTime(组件停止时间)
EventTakeSuccessCount(从channel中成功取走的event数量)
ChannelCapacity(通道容量)
EventTakeAttemptCount(尝试从channel中取走event的次数)
SINK
BatchCompleteCount(完成的批数量)
ConnectionFailedCount(连接失败数)
EventDrainAttemptCount(尝试提交的event数量)
ConnectionCreatedCount(创建连接数)
Type(组件类型)
BatchEmptyCount(批量取空的数量)
ConnectionClosedCount(关闭连接数量)
EventDrainSuccessCount(成功发送event的数量)
StartTime(组件开始时间)
StopTime(组件停止时间)
BatchUnderflowCount(正处于批量处理的batch数)

如何使用 http+json方式监控 flume ?
需求:
日志文件 =》 输出 logger

agent:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1=/home/hadoop/tmp/dt01.log

a1.channels.c1.type = memory
a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent:

flume-ng agent
–name a1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/monitor/agent.conf
-Dflume.root.logger=info,console
-Dflume.monitoring.type=http
-Dflume.monitoring.port=9527

flume启动 error 尝试重启 =》 pid
json数据

http://bigdata13:9527/metrics

最后

以上就是繁荣大碗为你收集整理的【Flume_03_笔记】的全部内容,希望文章能够帮你解决【Flume_03_笔记】所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部