我是靠谱客的博主 机灵未来,最近开发中收集的这篇文章主要介绍Flume之——配置多个Sink源(一个Source对应多个Channel和Sink),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

转载请注明出处:https://blog.csdn.net/l1028386804/article/details/98055100

配置模型如下图:

Flume的配置如下:

myagent.sources = r1
myagent.sinks = k1 k2
myagent.channels = c1 c2
myagent.sources.r1.selector.type = replicating
# 订阅/配置Source源
myagent.sources.r1.type = http
myagent.sources.r1.port = 5140
myagent.sources.r1.handler = org.apache.flume.source.http.JSONHandler

myagent.sources.r1.channels = c1 c2

# 设置Memmory Channel
myagent.sinks.k1.channel = c1
myagent.channels.c1.type = memory
myagent.channels.c1.capacity = 1000
myagent.channels.c1.transactionCapacity = 100

myagent.sinks.k2.channel = c2
myagent.channels.c2.type = memory
myagent.channels.c2.capacity = 1000
myagent.channels.c2.transactionCapacity = 100

#####发送到Kafka####
# 订阅k1 Sink
myagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.k1.topic = test
myagent.sinks.k1.brokerList = binghe100:9092
myagent.sinks.k1.requiredAcks = 1
myagent.sinks.k1.batchSize = 20

#####发送到HDFS#####
# 订阅Sink
myagent.sinks.k2.type = hdfs
myagent.sinks.k2.hdfs.path = hdfs://binghe100:9000/flume/%Y%m%d
myagent.sinks.k2.hdfs.filePrefix = log_%H_%M
myagent.sinks.k2.hdfs.fileSuffix = .log
myagent.sinks.k2.hdfs.useLocalTimeStamp = true
myagent.sinks.k2.hdfs.writeFormat = Text
myagent.sinks.k2.hdfs.fileType = DataStream
#### 一个小时保存一次
myagent.sinks.k2.hdfs.round = true
myagent.sinks.k2.hdfs.roundValue = 1
myagent.sinks.k2.hdfs.roundUnit = hour
#### 文件达到1M写新文件
myagent.sinks.k2.hdfs.rollInterval = 0
myagent.sinks.k2.hdfs.rollSize=1048576
myagent.sinks.k2.hdfs.rollCount=0

myagent.sinks.k2.hdfs.batchSize = 100
myagent.sinks.k2.hdfs.threadsPoolSize = 10
myagent.sinks.k2.hdfs.idleTimeout = 0
myagent.sinks.k2.hdfs.minBlockReplicas = 1

配置说明如下图所示:

直接监听Nginx日志变化的配置如下:

myagent.sources = s1
myagent.sinks = k1 k2
myagent.channels = c1 c2
myagent.sources.s1.selector.type = replicating
# 订阅/配置Source源
myagent.sources.s1.batchsize=10
myagent.sources.s1.type = exec
myagent.sources.s1.command = tail -F /usr/local/nginx-1.17.2/logs/access.log
 
myagent.sources.s1.channels = c1 c2
 
# 设置Memmory Channel
myagent.sinks.k1.channel = c1
myagent.channels.c1.type = memory
myagent.channels.c1.capacity = 1000
myagent.channels.c1.transactionCapacity = 100
 
myagent.sinks.k2.channel = c2
myagent.channels.c2.type = memory
myagent.channels.c2.capacity = 1000
myagent.channels.c2.transactionCapacity = 100
 
#####发送到Kafka####
# 订阅k1 Sink
myagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.k1.topic = test
myagent.sinks.k1.brokerList = binghe100:9092
myagent.sinks.k1.requiredAcks = 1
myagent.sinks.k1.batchSize = 20
 
#####发送到HDFS#####
# 订阅Sink
myagent.sinks.k2.type = hdfs
myagent.sinks.k2.hdfs.path = hdfs://binghe100:9000/flume/%Y%m%d
myagent.sinks.k2.hdfs.filePrefix = log_%H_%M
myagent.sinks.k2.hdfs.fileSuffix = .log
myagent.sinks.k2.hdfs.useLocalTimeStamp = true
myagent.sinks.k2.hdfs.writeFormat = Text
myagent.sinks.k2.hdfs.fileType = DataStream
#### 一个小时保存一次
myagent.sinks.k2.hdfs.round = true
myagent.sinks.k2.hdfs.roundValue = 1
myagent.sinks.k2.hdfs.roundUnit = hour
#### 文件达到1M写新文件
myagent.sinks.k2.hdfs.rollInterval = 0
myagent.sinks.k2.hdfs.rollSize=1048576
myagent.sinks.k2.hdfs.rollCount=0
 
myagent.sinks.k2.hdfs.batchSize = 100
myagent.sinks.k2.hdfs.threadsPoolSize = 10
myagent.sinks.k2.hdfs.idleTimeout = 0
myagent.sinks.k2.hdfs.minBlockReplicas = 1

注意:flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用,可以将minBlockReplicas设置为1。如下设置项:

myagent.sinks.k2.hdfs.minBlockReplicas = 1

参考文章链接如下:

1.Flume中的HDFS Sink配置参数说明:
http://lxw1234.com/archives/2015/10/527.htm
2.Flume(NG)架构设计要点及配置实践
http://shiyanjun.cn/archives/915.html
3.Flume NG 简介及配置实战
https://yq.aliyun.com/articles/50487
4.flume官网
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink

 

最后

以上就是机灵未来为你收集整理的Flume之——配置多个Sink源(一个Source对应多个Channel和Sink)的全部内容,希望文章能够帮你解决Flume之——配置多个Sink源(一个Source对应多个Channel和Sink)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部