概述
Flume_03_笔记
需求:1.采集日志数据 1w条 :java shell
采集到hdfs上 采用压缩存储 bzip2
source: exec taildir
channle
:mem file
sink: hdfs
=> bzip2
#采集日志数据脚本
for X in {1..10000}
do
echo "${X}" >> 1.log
done
1.文件用压缩格式进行存储:压缩格式:bzip2
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1=/home/hadoop/tmp/1.log
a1.channels.c1.type = memory
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata13:9000/flume/log/
a1.sinks.k1.hdfs.fileType=CompressedStream
a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.codeC=bzip2
#文件前后缀
a1.sinks.k1.hdfs.filePrefix=events
a1.sinks.k1.hdfs.fileSuffix=.log
#文件滚动
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=134217728
a1.sinks.k1.hdfs.rollCount=100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动:启动agent 从后往前启,保证数据不丢失
flume-ng agent
–name a1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/taildir-mem-hdfs.conf1
-Dflume.root.logger=info,console
channel选用:
file channel:
agent:
agent1.sources = r1
agent1.sinks = k1
agent1.channels = c1
agent1.sources.r1.type = TAILDIR
agent1.sources.r1.filegroups = f1
agent1.sources.r1.filegroups.f1=/home/hadoop/tmp/codec01.log
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /home/hadoop/project/flume/checkpoint/codec
agent1.channels.c1.dataDirs = /home/hadoop/project/flume/data/codec
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.hdfs.path = hdfs://bigdata13:9000/flume/bzip2/
agent1.sinks.k1.hdfs.fileType=CompressedStream
agent1.sinks.k1.hdfs.writeFormat=Text
#文件前后缀
agent1.sinks.k1.hdfs.filePrefix=events
agent1.sinks.k1.hdfs.fileSuffix=.bz2
agent1.sinks.k1.hdfs.codeC=bzip2
#文件滚动
agent1.sinks.k1.hdfs.rollInterval=60
agent1.sinks.k1.hdfs.rollSize=134217728
agent1.sinks.k1.hdfs.rollCount=1000
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
启动:
flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/taildir-mem-hdfs.conf2
-Dflume.root.logger=info,console
高可用:
nn: 维护hdfs命名空间 =》 存储 整个hdfs 文件块的信息+ client读写请求
snn: 合并nn上面的镜像文件 =》 存储 整个hdfs 文件块的信息
默认 1h小时 3600s
snn不是nn的热备 冷备
负载、均衡:
flume 为了防止sink 发生故障
Sink Processors:
负载:failover
均衡:load_balance:
1.随机发送数据
random
2.轮循发送数据 :
round_robin
Default Sink
需求:读取1111端口数据 数据发送到 2222端口和3333端口 最终数据输出到 控制台?
均衡:load_balance:
3个agent :
agent1:
source:nc
channel :mem
sink : avro 两个sink
agent2:2222端口
source:avro
channel :mem
sink : logger
agent3:3333端口
source:avro
channel :mem
sink : logger
agent1:
agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata32
agent1.sources.r1.port = 1111
agent1.channels.c1.type = memory
#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata32
agent1.sinks.k1.port = 2222
#定义sink 3333
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = bigdata32
agent1.sinks.k2.port = 3333
#定义sink processers
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1
agent2:2222端口
agent2.sources = r1
agent2.sinks = k1
agent2.channels = c1
agent2.sources.r1.type = avro
agent2.sources.r1.bind = bigdata32
agent2.sources.r1.port = 2222
agent2.channels.c1.type = memory
agent2.sinks.k1.type = logger
agent2.sources.r1.channels = c1
agent2.sinks.k1.channel = c1
agent3: 3333端口
agent3.sources = r1
agent3.sinks = k1
agent3.channels = c1
agent3.sources.r1.type = avro
agent3.sources.r1.bind = bigdata32
agent3.sources.r1.port = 3333
agent3.channels.c1.type = memory
agent3.sinks.k1.type = logger
agent3.sources.r1.channels = c1
agent3.sinks.k1.channel = c1
启动agent:
从后往前 启动
flume-ng agent
–name agent3
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/sink/agent3.conf
-Dflume.root.logger=info,console
flume-ng agent
–name agent2
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/sink/agent2.conf
-Dflume.root.logger=info,console
flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/sink/agent1.conf
-Dflume.root.logger=info,console
telnet bigdata32 1111
容灾: sink 出现故障
负载:Failover
均衡:load_balance
均衡:load_balance
1.将数据分开 提供并行度的功能 减轻sink 压力
2.如果 第二个或者第三个 agent挂掉 数据都会发送到 没挂的sink 对应的agent上面
processor.backoff
true
processor.selector.maxTimeOut
agent1:
agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata32
agent1.sources.r1.port = 1111
agent1.channels.c1.type = memory
#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata32
agent1.sinks.k1.port = 2222
#定义sink 3333
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = bigdata32
agent1.sinks.k2.port = 3333
#定义sink processers
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = failover
agent1.sinkgroups.g1.processor.priority.k1 = 5
agent1.sinkgroups.g1.processor.priority.k2 = 10
agent1.sinkgroups.g1.processor.maxpenalty = 2000
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c1
flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/sink/agent1_failover.conf
-Dflume.root.logger=info,console
flume核心组件:
sources
拦截器:interceptors 主要处理采集的数据 做数据转换/数据清洗
channel选择器:channel selectors 负责 指定采集的数据 发送到指定的channel
channels
sinks
sink processers 采集的数据 发送到 哪个sink
需求:定一个agent 端口1111采集数据 一个发送到 hdfs
另外一个 发送到 logger
agent:
agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1 c2
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata32
agent1.sources.r1.port = 1111
#0 配置source channle
agent1.sources.r1.selector.type = replicating
agent1.sources.r1.channels = c1 c2
#1.配置两个channel
agent1.channels.c1.type = memory
agent1.channels.c2.type = memory
#定义sink hdfs
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.hdfs.path = hdfs://bigdata32:9000/flume/channel_selector/
agent1.sinks.k1.hdfs.fileType=DataStream
agent1.sinks.k1.hdfs.writeFormat=Text
#文件前后缀
agent1.sinks.k1.hdfs.filePrefix=events
agent1.sinks.k1.hdfs.fileSuffix=.log
agent1.sinks.k1.hdfs.useLocalTimeStamp=true
#文件滚动
agent1.sinks.k1.hdfs.rollInterval=60
agent1.sinks.k1.hdfs.rollSize=134217728
agent1.sinks.k1.hdfs.rollCount=1000
#定义sink logger
agent1.sinks.k2.type = logger
#定义 连接
agent1.sources.r1.channels = c1 c2
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c2
启动agent:
flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/channle/agent_logger_hdfs.conf
-Dflume.root.logger=info,console
telnet bigdata32 1111
channel_selector :
1.replicating => 所有channle 数据发送一致
作业:
1.三个agent完成 上面的事情:
agent1: 1111接收数据 发送 2222 和3333端口
agent2: 接收2222 数据发送到 logger
agent3: 接收3333 数据发送到 logger
一个agent实现:
agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1 c2
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata32
agent1.sources.r1.port = 1111
#0 配置source channle
agent1.sources.r1.selector.type = replicating
agent1.sources.r1.channels = c1 c2
#1.配置两个channel
agent1.channels.c1.type = memory
agent1.channels.c2.type = memory
#定义sink hdfs
#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata32
agent1.sinks.k1.port = 2222
#定义sink 3333
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = bigdata32
agent1.sinks.k2.port = 3333
#定义 连接
agent1.sources.r1.channels = c1 c2
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c2
启动agent:
flume-ng agent
–name agent3
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/one2many/agent3.conf
-Dflume.root.logger=info,console
flume-ng agent
–name agent2
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/one2many/agent2.conf
-Dflume.root.logger=info,console
flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/one2many/agent1.conf
-Dflume.root.logger=info,console
telnet bigdata32 1111
多个agent实现:
agent1:
agent1.sources = r1
agent1.sinks = k1 k2
agent1.channels = c1 c2
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata13
agent1.sources.r1.port = 1111
#0 配置source channle
agent1.sources.r1.selector.type = replicating
agent1.sources.r1.channels = c1 c2
agent1.channels.c1.type = memory
agent1.channels.c2.type = memory
#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata13
agent1.sinks.k1.port = 2222
#定义sink 3333
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname = bigdata13
agent1.sinks.k2.port = 3333
#定义sink processers
agent1.sinkgroups = g1
agent1.sinkgroups.g1.sinks = k1 k2
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut = 10000
agent1.sources.r1.channels = c1 c2
agent1.sinks.k1.channel = c1
agent1.sinks.k2.channel = c2
agent2:2222端口
agent2.sources = r1
agent2.sinks = k1
agent2.channels = c1
agent2.sources.r1.type = avro
agent2.sources.r1.bind = bigdata13
agent2.sources.r1.port = 2222
agent2.channels.c1.type = memory
#定义sink hdfs
agent2.sinks.k1.type = hdfs
agent2.sinks.k1.hdfs.path = hdfs://bigdata13:9000/flume/channel_selector/
agent2.sinks.k1.hdfs.fileType=DataStream
agent2.sinks.k1.hdfs.writeFormat=Text
#文件前后缀
agent2.sinks.k1.hdfs.filePrefix=events
agent2.sinks.k1.hdfs.fileSuffix=.log
agent2.sinks.k1.hdfs.useLocalTimeStamp=true
#文件滚动
agent2.sinks.k1.hdfs.rollInterval=60
agent2.sinks.k1.hdfs.rollSize=134217728
agent2.sinks.k1.hdfs.rollCount=1000
agent2.sources.r1.channels = c1
agent2.sinks.k1.channel = c1
agent3: 3333端口
agent3.sources = r1
agent3.sinks = k2
agent3.channels = c2
agent3.sources.r1.type = avro
agent3.sources.r1.bind = bigdata13
agent3.sources.r1.port = 3333
agent3.channels.c2.type = memory
agent3.sinks.k2.type = logger
agent3.sources.r1.channels = c2
agent3.sinks.k2.channel = c2
启动agent3:
flume-ng agent
–name agent3
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/channel/agent3.conf
-Dflume.root.logger=info,console
启动agent2:
flume-ng agent
–name agent2
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/channel/agent2.conf
-Dflume.root.logger=info,console
启动agent1:
flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/channel/agent1.conf
-Dflume.root.logger=info,console
需求:多种日志采集到一个agent里面 之后 通过这个agent进行指定数据分发
agent1:
agent1.sources = r1
agent1.sinks = k1
agent1.channels = c1
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = bigdata13
agent1.sources.r1.port = 1111
#添加一个拦截器 =》 数据清洗 + event打标签
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = dl2262
agent1.sources.r1.interceptors.i1.value = boy
#0 配置source channle
agent1.sources.r1.channels = c1
#1.配置两个channel
agent1.channels.c1.type = memory
#定义sink 2222
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = bigdata13
agent1.sinks.k1.port = 2222
#定义 连接
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
agent2:
agent2.sources = r1
agent2.sinks = k1
agent2.channels = c1
agent2.sources.r1.type = netcat
agent2.sources.r1.bind = bigdata13
agent2.sources.r1.port = 1112
#添加一个拦截器 =》 数据清洗 + event打标签
agent2.sources.r1.interceptors = i1
agent2.sources.r1.interceptors.i1.type = static
agent2.sources.r1.interceptors.i1.key = dl2262
agent2.sources.r1.interceptors.i1.value = girl
#0 配置source channle
agent2.sources.r1.channels = c1
#1.配置两个channel
agent2.channels.c1.type = memory
#定义sink 2222
agent2.sinks.k1.type = avro
agent2.sinks.k1.hostname = bigdata13
agent2.sinks.k1.port = 2222
#定义 连接
agent2.sources.r1.channels = c1
agent2.sinks.k1.channel = c1
agent3:
agent3.sources = r1
agent3.sinks = k1
agent3.channels = c1
agent3.sources.r1.type = netcat
agent3.sources.r1.bind = bigdata13
agent3.sources.r1.port = 1113
#添加一个拦截器 =》 数据清洗 + event打标签
agent3.sources.r1.interceptors = i1
agent3.sources.r1.interceptors.i1.type = static
agent3.sources.r1.interceptors.i1.key = dl2262
agent3.sources.r1.interceptors.i1.value = tea
#0 配置source channle
agent3.sources.r1.channels = c1
#1.配置两个channel
agent3.channels.c1.type = memory
#定义sink 2222
agent3.sinks.k1.type = avro
agent3.sinks.k1.hostname = bigdata13
agent3.sinks.k1.port = 2222
#定义 连接
agent3.sources.r1.channels = c1
agent3.sinks.k1.channel = c1
agent4:
agent4.sources = r1
agent4.sinks = k1 k2 k3
agent4.channels = c1 c2 c3
agent4.sources.r1.type = avro
agent4.sources.r1.bind = bigdata13
agent4.sources.r1.port = 2222
#0 配置source channle
agent4.sources.r1.selector.type = multiplexing
agent4.sources.r1.selector.header = dl2262
agent4.sources.r1.selector.mapping.boy = c1
agent4.sources.r1.selector.mapping.girl = c2
agent4.sources.r1.selector.default = c3
agent4.sources.r1.channels = c1 c2 c3
#1.配置两个channel
agent4.channels.c1.type = memory
agent4.channels.c2.type = memory
agent4.channels.c3.type = memory
#定义sink logger
agent4.sinks.k1.type =logger
agent4.sinks.k2.type =logger
agent4.sinks.k3.type =logger
#定义 连接
agent4.sources.r1.channels = c1 c2 c3
agent4.sinks.k1.channel = c1
agent4.sinks.k2.channel = c2
agent4.sinks.k3.channel = c3
启动:
flume-ng agent
–name agent4
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/many2one/agent4.conf
-Dflume.root.logger=info,console
flume-ng agent
–name agent3
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/many2one/agent3.conf
-Dflume.root.logger=info,console
flume-ng agent
–name agent2
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/many2one/agent2.conf
-Dflume.root.logger=info,console
flume-ng agent
–name agent1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/many2one/agent1.conf
-Dflume.root.logger=info,console
telnet bigdata13 1111
telnet bigdata13 1112
telnet bigdata13 1113
补充:
channel容量:
1.默认容量 capacity
2.事务容量 transactionCapacity
分为两部分:1.从source存数据
2.从sink读数据
监控:监控的组件:channel
措施:
1…flume 提供 ganglia 框架 指标 【需要安装ganglia + 】
2.2.通过 agent 启动 配置一些参数 http 方式获取 【建议用这个 easy】
json数据 =》 http接口数据 =》
1.前端人员 可视化界面展示
2.采集 http接口数据 =》 mysql =》 可视化
拿到数据:
javase:api =》 url
springboot:
scala : source api
获取jason参数 linux命令:curl + 链接
落盘:存到文件中
参数解释:
SOURCE:
OpenConnectionCount(打开的连接数)
Type(组件类型)
AppendBatchAcceptedCount(追加到channel中的批数量)
AppendBatchReceivedCount(source端刚刚追加的批数量)
EventAcceptedCount(成功放入channel的event数量)
AppendReceivedCount(source追加目前收到的数量)
StartTime(组件开始时间)
StopTime(组件停止时间)
EventReceivedCount(source端成功收到的event数量)
AppendAcceptedCount(放入channel的event数量)
CHANNEL:
EventPutSuccessCount(成功放入channel的event数量)
ChannelFillPercentage(通道使用比例)
Type(组件类型)
EventPutAttemptCount(尝试放入将event放入channel的次数)
ChannelSize(目前在channel中的event数量)
StartTime(组件开始时间)
StopTime(组件停止时间)
EventTakeSuccessCount(从channel中成功取走的event数量)
ChannelCapacity(通道容量)
EventTakeAttemptCount(尝试从channel中取走event的次数)
SINK
BatchCompleteCount(完成的批数量)
ConnectionFailedCount(连接失败数)
EventDrainAttemptCount(尝试提交的event数量)
ConnectionCreatedCount(创建连接数)
Type(组件类型)
BatchEmptyCount(批量取空的数量)
ConnectionClosedCount(关闭连接数量)
EventDrainSuccessCount(成功发送event的数量)
StartTime(组件开始时间)
StopTime(组件停止时间)
BatchUnderflowCount(正处于批量处理的batch数)
如何使用 http+json方式监控 flume ?
需求:
日志文件 =》 输出 logger
agent:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1=/home/hadoop/tmp/dt01.log
a1.channels.c1.type = memory
a1.sinks.k1.type = logger
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent:
flume-ng agent
–name a1
–conf ${FLUME_HOME}/conf
–conf-file /home/hadoop/project/flume/monitor/agent.conf
-Dflume.root.logger=info,console
-Dflume.monitoring.type=http
-Dflume.monitoring.port=9527
flume启动 error 尝试重启 =》 pid
json数据
http://bigdata13:9527/metrics
最后
以上就是繁荣大碗为你收集整理的【Flume_03_笔记】的全部内容,希望文章能够帮你解决【Flume_03_笔记】所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复