首先,我们要创建两个conf文件供Flume来使用,分别为flume-kafka.conf和kafka-flume-hdfs.conf,然后我们创建一个kafka的topic,叫做flume,我们通过这个topic来检验flume的数据是否传输给了kafka。
下面是示例代码。
首先创建flume-kafka.conf
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /opt/wdp/flume/datas # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = //这里写你自己的kafka端口号,如master:9092 a1.sinks.k1.kafka.topic = flume a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
这个flume-kafka.conf主要是用来监听目录的,当我们向目录中mv或cp数据是,flume将会把数据传输给kafka
然后建立kafka的topic
复制代码
1
2kafka-topics.sh --create --bootstrap-server master:9092 --replication-factor 1 --partitions 1 --topic flume
接下来可以先运行flume的flume-kafka.conf然后kafka的topic开启消费者模式,看一看数据能否传输到kafka中
复制代码
1
2
3bin/flume-ng agent --conf conf/ --name a1 --conf-file job1/flume-kafka.conf kafka-console-consumer.sh --bootstrap-server master:9092 --topic flume
如果数据正常到达kafka中,我们来写kafka-flume-hdfs.conf
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #a1.sources.r1.type = netcat a2.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a2.sources.r2.batchSize=100 a2.sources.r2.batchDurationMillis=2000 a2.sources.r2.kafka.bootstrap.servers = master:9092 a2.sources.r2.kafka.topics= flume #对于sink的配置描述 传递到hdfs上面 a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://master:9000/flume/data/%Y%m%d/%H a2.sinks.k2.hdfs.filePrefix = yinhang- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 6 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a2.sinks.k2.hdfs.rollCount = 0 #最小副本数 a2.sinks.k2.hdfs.minBlockReplicas = 1 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
这时候再运行该conf
复制代码
1
2bin/flume-ng agent --conf conf/ --name a2 --conf-file job1/kafka-flume-hdfs.conf
即可将数据导入到hdfs中。
在这个试验中,一定要注意的是
a2.sources.r2.batchSize=100
a2.channels.c2.transactionCapacity = 100
这两个的参数一定要设置成一样的。
初次试验有什么不对的地方,希望大家指正。
感谢观看,希望对你有所帮助。
最后
以上就是危机紫菜最近收集整理的关于采用Flume-kafka-Flume将数据导入到HDFS中的全部内容,更多相关采用Flume-kafka-Flume将数据导入到HDFS中内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复