我是靠谱客的博主 无语御姐,这篇文章主要介绍flume+kafka+zookeeper 单机实现实时数据的获取,现在分享给大家,希望可以做个参考。

之前在做大数据的时候,一直不知道数据是怎么上传到hdfs的,问了架构师用flume,自己也一直想玩一下flume,无奈没太多的时间,今天有点时间,就查找资料,搭建了一个单机环境下的日志监控。所有资料全部来源与网络,我只是做了一个简单的整合。

首先,第一步安装flume。

1.安装flume,首先要安装好jvm。

2.下载flume。地址 http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

3.解压项目,进入conf下面,默认的配置文件带一个后缀是.template,去掉这个后缀。

4.修改flume-env.sh 设置jdk的安装目录,

5.校验flume是否安装成功,可以进入bin目录下,输入:

复制代码
1
flume-ng version


查看是否输出版本信息,即为以成功。

6.测试flume的功能,修改配置文件:flume-conf.properties

复制代码
1
2
3
#Spool监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:     #1) 拷贝到spool目录下的文件不可以再打开编辑。     #2) spool目录下不可包含相应的子目录

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/logs a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 # logser可以看做是flume服务的名称,每个flume都由sources、channels和sinks三部分组成 # sources可以看做是数据源头、channels是中间转存的渠道、sinks是数据后面的去向 logser.sources = src_launcherclick logser.sinks = kfk_launcherclick logser.channels = ch_launcherclick # source # 源头类型是TAILDIR,就可以实时监控以追加形式写入文件的日志 logser.sources.src_launcherclick.type = TAILDIR # positionFile记录所有监控的文件信息 logser.sources.src_launcherclick.positionFile = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log1/taildir_position.json # 监控的文件组 logser.sources.src_launcherclick.filegroups = f1 # 文件组包含的具体文件,也就是我们监控的文件 logser.sources.src_launcherclick.filegroups.f1 = /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log/.* # interceptor # 写kafka的topic即可 logser.sources.src_launcherclick.interceptors = i1 i2 logser.sources.src_launcherclick.interceptors.i1.type=static logser.sources.src_launcherclick.interceptors.i1.key = type logser.sources.src_launcherclick.interceptors.i1.value = launcher_click logser.sources.src_launcherclick.interceptors.i2.type=static logser.sources.src_launcherclick.interceptors.i2.key = topic logser.sources.src_launcherclick.interceptors.i2.value = launcher_click # channel logser.channels.ch_launcherclick.type = memory logser.channels.ch_launcherclick.capacity = 10000 logser.channels.ch_launcherclick.transactionCapacity = 1000 # kfk sink # 指定sink类型是Kafka,说明日志最后要发送到Kafka logser.sinks.kfk_launcherclick.type = org.apache.flume.sink.kafka.KafkaSink # Kafka broker logser.sinks.kfk_launcherclick.brokerList = 10.0.5.203:9092 # Bind the source and sink to the channel logser.sources.src_launcherclick.channels = ch_launcherclick logser.sinks.kfk_launcherclick.channel = ch_launcherclick


安装kafka:(zookeeper的安装不再写了,也很简单,网上都是例子:http://blog.csdn.net/wo541075754/article/details/56483533)

1.下载kafka。地址:http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz

2.解压,之后可以直接启动:

复制代码
1
bin/kafka-server-start.sh config/server.properties &
结下来整合:

1.启动flume:

复制代码
1
./flume-ng agent -c . -f ../conf/flume-conf.properties -n logser -Dflume.root.logger=INFO
2.重新打开一个窗口,启动kafka的消费模式,监听: launcher_click 主题。命令:

复制代码
1
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic launcher_click --from-beginning

3.新建一个窗口:对

复制代码
1
/Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log这个目录下的文件进行追加,命令:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
复制代码
echo "spool test2s nihaoi www.baidu.com" >> /Users/haomaiche/Downloads/apache-flume-1.7.0-bin/log/spool_text1sd.log
这个时候,在kafka的监听窗口会打印:
$bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic launcher_click --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
spool test2s
spool test2s nihaoi www
spool test2s nihaoi www.baidu.com
下面是用java监听输出数据:
	1.maven
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

	2.代码:
	
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("launcher_click"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.println("内容是:"+ record.value());
        }
    }
}
复制代码
参考资料:
Flume1.5.0入门:安装、部署、及flume的案例:http://www.aboutyun.com/thread-8917-1-1.html
kafka的安装  http://www.cnblogs.com/wangyangliuping/p/5546465.html
flume实时日志分析  http://itindex.net/detail/56956-flume-kafka-sparkstreaming
kafka的java代码来源: http://blog.csdn.net/lnho2015/article/details/51353936
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码
复制代码

最后

以上就是无语御姐最近收集整理的关于flume+kafka+zookeeper 单机实现实时数据的获取的全部内容,更多相关flume+kafka+zookeeper内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部