我是靠谱客的博主 愉快马里奥,最近开发中收集的这篇文章主要介绍Flume之同时向HDFS以及Kafka写数据,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言:本篇文章详细的介绍了Flume的Agent配置Multiple flows向Kafka以及hdfs些数据,涉及的Hadoop、Zookeeper、Kafka均是伪分布式部署。

1.基础环境

1.1硬件环境

一台4G2Core的虚拟机

1.2组件版本
组件名称组件版本百度网盘地址
Flumeflume-ng-1.6.0-cdh5.7.0.tar.gz链接:https://pan.baidu.com/s/11QeF7rk2rqnOrFankr4TzA 提取码:3ojw
ZookeeperZookeeper-3.4.5链接:https://pan.baidu.com/s/1upNcB53WGWP_89lhYnqP6g 提取码:j50f
Kafkakafka_2.11-0.10.0.0.tgz链接:https://pan.baidu.com/s/1TpU6QPnoF1tuUy-7HnGgmQ 提取码:aapj

注意:踩坑,Kafka0.10我无法直接从浏览器上打开官网下载地址,思考后我使用github下载,但是github下载的是源码,无法安装运行,最终将下载地址粘贴迅雷才可以正常下载。

2.安装部署

2.1安装Flume

省略,可参考Flume之生产正确的使用方式一(Singel Agent)中的Flume部署,非常的简单

2.2安装单点Zookeeper
#解压
[hadoop@hadoop001 ~]$ cd ~/soft/
[hadoop@hadoop001 soft]$ tar -zxvf zookeeper-3.4.6.tar.gz
-C ~/app/
[hadoop@hadoop001 soft]$ cd ~/app/zookeeper-3.4.6/
#修改Zookeeper数据存储位置
[hadoop@hadoop001 zookeeper-3.4.6]$ cp conf/zoo_sample.cfg conf/zoo.cfg
[hadoop@hadoop001 zookeeper-3.4.6]$ mkdir -p
~/app/zookeeper-3.4.6/data
[hadoop@hadoop001 zookeeper-3.4.6]$ vim conf/zoo.cfg
dataDir=/home/hadoop/app/zookeeper-3.4.6/data
#添加环境变量
[hadoop@hadoop001 zookeeper-3.4.6]$ vim ~/.bash_profile
export ZOOKEEPER_HOME=/home/hadoop/app/zookeeper-3.4.6
export PATH=$ZOOKEEPER_HOME/bin:$PATH
[hadoop@hadoop001 zookeeper-3.4.6]$ source ~/.bash_profile
[hadoop@hadoop001 zookeeper-3.4.6]$ which zkServer.sh
~/app/zookeeper-3.4.6/bin/zkServer.sh
#启动
[hadoop@hadoop001 zookeeper-3.4.6]$ zkServer.sh start
#查看状态, 若显示standalone,则表示Zookeeper启动正常
[hadoop@hadoop001 zookeeper-3.4.6]$ zkServer.sh status
#进入Zk的客户端
[hadoop@hadoop001 zookeeper-3.4.6]$ zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 1] ls /
#关闭
[hadoop@hadoop001 zookeeper-3.4.6]$ zkServer.sh stop
2.3安装单点Kafka
#解压
[hadoop@hadoop001 soft]$ cd ~/soft
[hadoop@hadoop001 soft]$ tar -zxvf kafka_2.11-0.10.0.0.tgz -C ~/app/
#修改数据存储位置
[hadoop@hadoop001 soft]$ cd ~/app/kafka_2.11-0.10.0.0/
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ mkdir -p ~/app/kafka_2.11-0.10.0.0/datalogdir
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ vim config/server.properties
log.dirs=/home/hadoop/app/kafka_2.11-0.10.0.0/datalogdir
#添加环境变量
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ vim ~/.bash_profile
export KAFKA_HOME=/home/hadoop/app/kafka_2.11-0.10.0.0
export PATH=$KAFKA_HOME/bin:$PATH
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ source ~/.bash_profile
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ which kafka-topics.sh
~/app/kafka-0.10.1.1/bin/kafka-topics.sh
#启动
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ bin/kafka-server-start.sh config/server.properties
#测试:创建Topic
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wsk_test
#测试:显示Topic列表
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
#测试:控制台生产者
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wsk_test
#测试:控制台消费者
[hadoop@hadoop001 kafka_2.11-0.10.0.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wsk_test --from-beginning

3.配置Flume作业

使用Flume的TailDir Source采集数据发送到Kafka以及HDFS。具体配置如下:

Taildir-HdfsAndKafka-Agnet.sources = taildir-source
Taildir-HdfsAndKafka-Agnet.channels = c1 c2
Taildir-HdfsAndKafka-Agnet.sinks = hdfs-sink kafka-sink
Taildir-HdfsAndKafka-Agnet.sources.taildir-source.type = TAILDIR
Taildir-HdfsAndKafka-Agnet.sources.taildir-source.filegroups = f1
Taildir-HdfsAndKafka-Agnet.sources.taildir-source.filegroups.f1 = /home/hadoop/data/flume/HdfsAndKafka/input/.*
Taildir-HdfsAndKafka-Agnet.sources.taildir-source.positionFile = /home/hadoop/data/flume/HdfsAndKafka/taildir_position/taildir_position.json
Taildir-HdfsAndKafka-Agnet.sources.taildir-source.selector.type = replicating
Taildir-HdfsAndKafka-Agnet.channels.c1.type = memory
Taildir-HdfsAndKafka-Agnet.channels.c2.type = memory
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.type = hdfs
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/flume/HdfsAndKafka/%Y%m%d%H%M
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.hdfs.useLocalTimeStamp=true
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.hdfs.filePrefix = wsktest-
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.hdfs.rollInterval = 10
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.hdfs.rollSize = 100000000
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.hdfs.rollCount = 0
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.hdfs.fileType=DataStream
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.hdfs.writeFormat=Text
Taildir-HdfsAndKafka-Agnet.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
Taildir-HdfsAndKafka-Agnet.sinks.kafka-sink.brokerList = localhost:9092
Taildir-HdfsAndKafka-Agnet.sinks.kafka-sink.topic = wsk_test
Taildir-HdfsAndKafka-Agnet.sources.taildir-source.channels = c1 c2
Taildir-HdfsAndKafka-Agnet.sinks.hdfs-sink.channel = c1
Taildir-HdfsAndKafka-Agnet.sinks.kafka-sink.channel = c2

启动命令:

flume-ng agent 
--name Taildir-HdfsAndKafka-Agnet 
--conf $FLUME_HOME/conf 
--conf-file $FLUME_HOME/conf/Taildir-HdfsAndKafka-Agnet.conf 
-Dflume.root.logger=INFO,console

测试结果:略

最后

以上就是愉快马里奥为你收集整理的Flume之同时向HDFS以及Kafka写数据的全部内容,希望文章能够帮你解决Flume之同时向HDFS以及Kafka写数据所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(67)

评论列表共有 0 条评论

立即
投稿
返回
顶部