概述
首先,我们要创建两个conf文件供Flume来使用,分别为flume-kafka.conf和kafka-flume-hdfs.conf,然后我们创建一个kafka的topic,叫做flume,我们通过这个topic来检验flume的数据是否传输给了kafka。
下面是示例代码。
首先创建flume-kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/wdp/flume/datas
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = //这里写你自己的kafka端口号,如master:9092
a1.sinks.k1.kafka.topic = flume
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这个flume-kafka.conf主要是用来监听目录的,当我们向目录中mv或cp数据是,flume将会把数据传输给kafka
然后建立kafka的topic
kafka-topics.sh --create --bootstrap-server master:9092 --replication-factor 1 --partitions 1 --topic flume
接下来可以先运行flume的flume-kafka.conf然后kafka的topic开启消费者模式,看一看数据能否传输到kafka中
bin/flume-ng agent --conf conf/ --name a1 --conf-file job1/flume-kafka.conf
kafka-console-consumer.sh --bootstrap-server master:9092 --topic flume
如果数据正常到达kafka中,我们来写kafka-flume-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
#a1.sources.r1.type = netcat
a2.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a2.sources.r2.batchSize=100
a2.sources.r2.batchDurationMillis=2000
a2.sources.r2.kafka.bootstrap.servers = master:9092
a2.sources.r2.kafka.topics= flume
#对于sink的配置描述 传递到hdfs上面
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://master:9000/flume/data/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = yinhang-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 6
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小副本数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
这时候再运行该conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file job1/kafka-flume-hdfs.conf
即可将数据导入到hdfs中。
在这个试验中,一定要注意的是
a2.sources.r2.batchSize=100
a2.channels.c2.transactionCapacity = 100
这两个的参数一定要设置成一样的。
初次试验有什么不对的地方,希望大家指正。
感谢观看,希望对你有所帮助。
最后
以上就是危机紫菜为你收集整理的采用Flume-kafka-Flume将数据导入到HDFS中的全部内容,希望文章能够帮你解决采用Flume-kafka-Flume将数据导入到HDFS中所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复