我是靠谱客的博主 无聊机器猫,最近开发中收集的这篇文章主要介绍电商数仓项目(八) Flume(3) 生产者和消费者配置一、生产数据写到kafka二、消费kafka数据写到hdfs,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
目录
- 一、生产数据写到kafka
- 二、消费kafka数据写到hdfs
本节讲解Flume 生产者和消费者配置。
源码下载
一、生产数据写到kafka
- 将上节生成的flume-interceptor-1.0.0.jar文件上传到$FLUME_HOME/lib目录下
- 在$FLUME_HOME/conf目录中创建file-flume-kafka.conf文件,
文件目录:/u01/gmall/data/in/log-data
读取的文件:app开头的文件
内容如下:
#定义Agent必需的组件名称,同时指定本配置文件的Agent名称为a1
a1.sources=r1
a1.channels=c1 c2
#定义Source组件相关配置
#使用Taildir Source
a1.sources.r1.type = TAILDIR
#配置Taildir Source,保存断点位置文件的目录
a1.sources.r1.positionFile = /u01/flume-1.9.0/position/log_position.json
#配置监控目录组
a1.sources.r1.filegroups = f1
#配置目录组下的目录,可配置多个目录
a1.sources.r1.filegroups.f1 = /u01/gmall/data/in/log-data/app.*
#配置Source发送数据的目标Channel
a1.sources.r1.channels = c1 c2
#拦截器
#配置拦截器名称
a1.sources.r1.interceptors =
i1 i2
#配置拦截器名称,需要写明全类名
a1.sources.r1.interceptors.i1.type = com.jack.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.jack.flume.interceptor.LogTypeInterceptor$Builder
#配置Channel选择器
#配置选择器类型
a1.sources.r1.selector.type = multiplexing
#配置选择器识别header中的key
a1.sources.r1.selector.header = topic
#配置不同的header信息,发往不同的Channel
a1.sources.r1.selector.mapping.topicStart = c1
a1.sources.r1.selector.mapping.topicEvent = c2
# configure channel配置Channel
#配置Channel类型为Kafka Channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
#配置Kafka集群节点服务器列表
a1.channels.c1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
#配置该Channel发往Kafka的Topic,该Topic需要在Kafka中提前创建
a1.channels.c1.kafka.topic = topicStart
#配置不将header信息解析为event内容
a1.channels.c1.parseAsFlumeEvent = false
#配置该Kafka Channel所属的消费者组名,为实现multiplexing类型的Channel选择器,应将2个Kafka Channel配置相同的消费者组
a1.channels.c1.kafka.consumer.group.id = flume-consumer
#配置同上
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.channels.c2.kafka.topic = topicEvent
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
- 分发到其他服务器(node01,node02)
[jack@node01 bin]$ cd /u01/bin
[jack@node01 bin]$ xsync /u01/flume-1.9.0
- 生产者统一启动集群
名称:flume_p.sh
存放目录:/u01/bin
问题跟踪:在使用中如果出现问题,在控制台使用 -Dflume.root.logger=DEBUG,console来跟踪问题
#!/bin/bash
pgmName=`basename $0`
pgmName=${pgmName##*/}
# remove the path
pgmExt=${pgmName##*.}
# get the extension
pgmName=${pgmName%.*}
# get the program name
USAGE="Usage: $pgmName.$pgmExt <start/stop>n"
if [ $# -lt 1 ]
then
echo $USAGE
exit 1
fi
case $1 in
"start"){
for i in node01 node02
do
echo "----------- start $i to collect flume ------------"
ssh $i " source /etc/profile;nohup $FLUME_HOME/bin/flume-ng agent --conf-file $FLUME_HOME/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"
done
};;
"stop"){
for i in node01 node02
do
echo "----------- end $i to collect flume ------------"
ssh $i "ps -ef|grep file-flume-kafka | grep -v grep |awk '{print $2}' | xargs kill"
done
};;
esac
二、消费kafka数据写到hdfs
- 在node03节点,在$FLUME_HOME/conf目录中创建 kafka-flume-hdfs.conf 文件,hdfs文件使用lzo压缩。接收不同类型的文件放到不同的hdfs目录中。
## Flume Agent组件声明
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## Source1属性配置
#配置Source类型为Kafka Source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
#配置Kafka Source每次从Kafka Topic中拉取的event个数
a1.sources.r1.batchSize = 5000
#配置拉取数据批次间隔为2000毫秒
a1.sources.r1.batchDurationMillis = 2000
#配置Kafka集群地址
a1.sources.r1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
#配置Source 对接Kafka主题
a1.sources.r1.kafka.topics=topicStart
## source2属性配置,与Source1配置类似,只是消费主题不同
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sources.r2.kafka.topics=topicEvent
## Channel1属性配置
#配置Channel类型为File Channel
a1.channels.c1.type = file
#配置存储File Channel传输数据的断点信息目录
a1.channels.c1.checkpointDir = /u01/flume-1.9.0/checkpoint/behavior1
#配置File Channel传输数据的存储位置
a1.channels.c1.dataDirs = /u01/flume-1.9.0/data/behavior1
#配置File Channel的最大存储容量
a1.channels.c1.maxFileSize = 2146435071
#配置File Channel最多存储event的个数
a1.channels.c1.capacity = 1000000
#配置Channel满时put事务的超时时间
a1.channels.c1.keep-alive = 6
## Channel2属性配置同Channel1,注意需要配置不同的目录路径
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /u01/flume-1.9.0/checkpoint/behavior2
a1.channels.c2.dataDirs = /u01/flume-1.9.0/data/behavior2
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## Sink1属性配置
#配置Sink1类型为HDFS Sink
a1.sinks.k1.type = hdfs
#配置发到HDFS的存储路径
a1.sinks.k1.hdfs.path = hdfs://node01:9000/gmall/log/topicStart/%Y-%m-%d
#配置HDFS落盘文件的文件名前缀
a1.sinks.k1.hdfs.filePrefix = logstart-
##Sink2 属性配置同Sink1
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://node01:9000/gmall/log/topicEvent/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 避免产生大量小文件的相关属性配置
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是压缩文件
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
- 消费者统一启动集群
文件名:flume_c.sh
目录:/u01/bin
#!/bin/bash
pgmName=`basename $0`
pgmName=${pgmName##*/}
# remove the path
pgmExt=${pgmName##*.}
# get the extension
pgmName=${pgmName%.*}
# get the program name
USAGE="Usage: $pgmName.$pgmExt <start/stop>n"
if [ $# -lt 1 ]
then
echo $USAGE
exit 1
fi
case $1 in
"start"){
for i in node03
do
echo "----------- start $i to collect flume ------------"
ssh $i "source /etc/profile;nohup $FLUME_HOME/bin/flume-ng agent --conf-file $FLUME_HOME/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"
done
};;
"stop"){
for i in node03
do
echo "----------- end $i to collect flume ------------"
ssh $i "ps -ef|grep kafka-flume-hdfs | grep -v grep |awk '{print $2}' | xargs kill"
done
};;
esac
电商数仓项目(一) 系统规划和配置
电商数仓项目(二) Maven 安装和hadoop-lzo编译
电商数仓项目(三) hadoop3.2.2 安装与配置
电商数仓项目(四) 模拟电商日志数据开发
电商数仓项目(五) azkaban安装、配置和使用
电商数仓项目(六) zookeeper安装和配置
电商数仓项目(七) kafka 安装、配置和简单操作
电商数仓项目(八) Flume(1) 安装和配置
电商数仓项目(八) Flume(2) 拦截器开发
最后
以上就是无聊机器猫为你收集整理的电商数仓项目(八) Flume(3) 生产者和消费者配置一、生产数据写到kafka二、消费kafka数据写到hdfs的全部内容,希望文章能够帮你解决电商数仓项目(八) Flume(3) 生产者和消费者配置一、生产数据写到kafka二、消费kafka数据写到hdfs所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复