我是靠谱客的博主 无语御姐,最近开发中收集的这篇文章主要介绍flume+kafka+zookeeper 单机实现实时数据的获取,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

之前在做大数据的时候,一直不知道数据是怎么上传到hdfs的,问了架构师用flume,自己也一直想玩一下flume,无奈没太多的时间,今天有点时间,就查找资料,搭建了一个单机环境下的日志监控。所有资料全部来源与网络,我只是做了一个简单的整合。

首先,第一步安装flume。

1.安装flume,首先要安装好jvm。

2.下载flume。地址 http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

3.解压项目,进入conf下面,默认的配置文件带一个后缀是.template,去掉这个后缀。

4.修改flume-env.sh 设置jdk的安装目录,

5.校验flume是否安装成功,可以进入bin目录下,输入:

flume-ng version


查看是否输出版本信息,即为以成功。

6.测试flume的功能,修改配置文件:flume-conf.properties

       #Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:
    #1) 拷贝到spool目录下的文件不可以再打开编辑。
    #2) spool目录下不可包含相应的子目录

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/logs
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1



# logser可以看做是flume服务的名称,每个flume都由sources、channels和sinks三部分组成
# sources可以看做是数据源头、channels是中间转存的渠道、sinks是数据后面的去向
logser.sources = src_launcherclick
logser.sinks = kfk_launcherclick
logser.channels = ch_launcherclick

# source
# 源头类型是TAILDIR,就可以实时监控以追加形式写入文件的日志
logser.sources.src_launcherclick.type = TAILDIR
# positionFile记录所有监控的文件信息
logser.sources.src_launcherclick.positionFile = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log1/taildir_position.json
# 监控的文件组
logser.sources.src_launcherclick.filegroups = f1
# 文件组包含的具体文件,也就是我们监控的文件
logser.sources.src_launcherclick.filegroups.f1 = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log/.*

# interceptor
# 写kafka的topic即可
logser.sources.src_launcherclick.interceptors = i1 i2
logser.sources.src_launcherclick.interceptors.i1.type=static
logser.sources.src_launcherclick.interceptors.i1.key = type
logser.sources.src_launcherclick.interceptors.i1.value = launcher_click
logser.sources.src_launcherclick.interceptors.i2.type=static
logser.sources.src_launcherclick.interceptors.i2.key = topic
logser.sources.src_launcherclick.interceptors.i2.value = launcher_click

# channel
logser.channels.ch_launcherclick.type = memory
logser.channels.ch_launcherclick.capacity = 10000
logser.channels.ch_launcherclick.transactionCapacity = 1000

# kfk sink
# 指定sink类型是Kafka,说明日志最后要发送到Kafka
logser.sinks.kfk_launcherclick.type = org.apache.flume.sink.kafka.KafkaSink
# Kafka broker
logser.sinks.kfk_launcherclick.brokerList = 10.0.5.203:9092

# Bind the source and sink to the channel
logser.sources.src_launcherclick.channels = ch_launcherclick
logser.sinks.kfk_launcherclick.channel = ch_launcherclick


安装kafka:(zookeeper的安装不再写了,也很简单,网上都是例子:http://blog.csdn.net/wo541075754/article/details/56483533)

1.下载kafka。地址:http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz

2.解压,之后可以直接启动:

     bin/kafka-server-start.sh config/server.properties &
结下来整合:

1.启动flume:

./flume-ng agent -c . -f ../conf/flume-conf.properties -n logser -Dflume.root.logger=INFO
2.重新打开一个窗口,启动kafka的消费模式,监听: launcher_click 主题。命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic launcher_click --from-beginning

3.新建一个窗口:对

    /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log这个目录下的文件进行追加,命令:
echo "spool test2s nihaoi www.baidu.com" >> /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log/spool_text1sd.log
这个时候,在kafka的监听窗口会打印:
$bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic launcher_click --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
spool test2s
spool test2s nihaoi www
spool test2s nihaoi www.baidu.com
下面是用java监听输出数据:
	1.maven
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

	2.代码:
	
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("launcher_click"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.println("内容是:"+ record.value());
        }
    }
}
参考资料:
Flume1.5.0入门:安装、部署、及flume的案例:http://www.aboutyun.com/thread-8917-1-1.html
kafka的安装  http://www.cnblogs.com/wangyangliuping/p/5546465.html
flume实时日志分析  http://itindex.net/detail/56956-flume-kafka-sparkstreaming
kafka的java代码来源: http://blog.csdn.net/lnho2015/article/details/51353936










最后

以上就是无语御姐为你收集整理的flume+kafka+zookeeper 单机实现实时数据的获取的全部内容,希望文章能够帮你解决flume+kafka+zookeeper 单机实现实时数据的获取所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部