我是靠谱客的博主 勤恳洋葱,这篇文章主要介绍cdh5.8.0 flume+kafka用户行为日志数据采集方案详解一、日志模拟二、flume数据采集三、kafka数据四、消费kafka中的数据五、可能出现的问题及解决,现在分享给大家,希望可以做个参考。

组件版本
flume1.6.0+cdh5.8.0
kafka2.1.0+kafka4.0.0

一、日志模拟

1.1 模拟日志生成

复制代码
1
2
java -classpath /data/opt/module/log-produce.jar com.lsl.appclient.AppMain >/data/opt/module/test.log

二、flume数据采集

2.1 Flume安装

2.1.1 日志采集Flume安装

1)添加服务
2)选择Flume,点击继续
3)选择节点
4)完成

2.2.2 日志采集Flume配置

1)Flume配置分析
图片: https://uploader.shimo.im/f/0X5ZXNUeD4clkwtO.png
Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。
2)Flume的流程配置如下:
(1)在CM管理页面上点击Flume
(2)在实例页面选择hadoop01上的Agent
(3)在CM管理页面hadoop01上Flume的配置中找到代理名称改为a1
(4)在配置文件如下内容(flume-kafka)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
a1.sources=r1 # 定义组件 a1.channels=c1 c2 a1.sinks=k1 k2 # configure source a1.sources.r1.type = com.wljs.flume.source.taildir.TaildirSource # TAILDIR方式读取数据 a1.sources.r1.positionFile = /data/opt/module/flume/log_position.json # 记录日志读取位置 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ # 读取日志位置 a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.wljs.flume.interceptor.LogETLInterceptor$Builder #ETL拦截器 a1.sources.r1.interceptors.i2.type = com.wljs.flume.interceptor.LogTypeInterceptor$Builder #日志类型拦截器 # selector a1.sources.r1.selector.type = multiplexing #多路选择器 a1.sources.r1.selector.header = topic # 根据日志类型分数据 a1.sources.r1.selector.mapping.topic_start = c1 # 根据日志类型分数据 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = memory a1.channels.c1.capacity=10000 # channel中存储的最大event数,默认值100。 a1.channels.c1.byteCapacityBufferPercentage=20 #缓冲空间占Channel容量(byteCapacity)的百分比,为event中的头信息保留了空间,默认值20(单位:百分比)。 a1.channels.c2.type = memory a1.channels.c2.capacity=10000 a1.channels.c2.byteCapacityBufferPercentage=20 # configure sink # start-sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = topic_start a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sinks.k1.kafka.flumeBatchSize = 2000 #Producer端单次批量发送的消息条数,该值应该根据实际环境适当调整,增大批量发送消息的条数能够在一定程度上提高性能,但是同时也增加了延迟和Producer端数据丢失的风险。默认值为100。 a1.sinks.k1.kafka.producer.acks = 1 #kafka的ack a1.sinks.k1.channel = c1 # event-sink a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.kafka.topic = topic_event a1.sinks.k2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sinks.k2.kafka.flumeBatchSize = 2000 a1.sinks.k2.kafka.producer.acks = 1 a1.sinks.k2.channel = c2 注意:com.lsl.flume.interceptor.LogETLInterceptor和com.lsl.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。 2.2.3 线上配置 a1.sources=r1 # 定义组件 a1.channels=c1 c2 a1.sinks=k1 k2 # configure source a1.sources.r1.type = com.wljs.flume.source.taildir.TaildirSource # TAILDIR方式读取数据 a1.sources.r1.positionFile = /data/opt/module/flume/log_position.json # 记录日志读取位置 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ # 读取日志位置 a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.wljs.flume.interceptor.LogETLInterceptor$Builder #ETL拦截器 a1.sources.r1.interceptors.i2.type = com.wljs.flume.interceptor.LogTypeInterceptor$Builder #日志类型拦截器 # selector a1.sources.r1.selector.type = multiplexing #多路选择器 a1.sources.r1.selector.header = topic # 根据日志类型分数据 a1.sources.r1.selector.mapping.topic_start = c1 # 根据日志类型分数据 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = memory a1.channels.c1.capacity=10000 # channel中存储的最大event数,默认值100。 a1.channels.c1.byteCapacityBufferPercentage=20 #缓冲空间占Channel容量(byteCapacity)的百分比,为event中的头信息保留了空间,默认值20(单位:百分比)。 a1.channels.c2.type = memory a1.channels.c2.capacity=10000 a1.channels.c2.byteCapacityBufferPercentage=20 # configure sink # start-sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = topic_start a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sinks.k1.kafka.flumeBatchSize = 2000 #Producer端单次批量发送的消息条数,该值应该根据实际环境适当调整,增大批量发送消息的条数能够在一定程度上提高性能,但是同时也增加了延迟和Producer端数据丢失的风险。默认值为100。 a1.sinks.k1.kafka.producer.acks = 1 #kafka的ack a1.sinks.k1.channel = c1 # event-sink a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.kafka.topic = topic_event a1.sinks.k2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sinks.k2.kafka.flumeBatchSize = 2000 a1.sinks.k2.kafka.producer.acks = 1 a1.sinks.k2.channel = c2

(3)修改/opt/module/flume/log_position.json的读写权限

复制代码
1
2
3
4
5
[root@hadoop01 module]# mkdir -p /opt/module/flume [root@hadoop01 flume]# touch log_position.json [root@hadoop01 flume]# chmod 777 log_position.json [root@hadoop01 module]# xsync /opt/module/flume/
复制代码
1
2
注意:Json文件的父目录一定要创建好,并改好权限

2.2.4 Flume拦截器

复制代码
1
2
3
4
自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。 ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志 日志类型区分拦截器主要用于,将ic日志和iu日志区分开来,方便发往Kafka的不同Topic。

1)创建Maven工程flume-interceptor
2)创建包名:com.wljs.flume.interceptor
3)在pom.xml文件中添加如下配置

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.6.0-cdh5.8.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

4)在com.wljs.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.wljs.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; public class LogETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1 获取数据 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); // 2 判断数据类型并向Header中赋值 if (log.contains("start")) { if (LogUtils.validateStart(log)){ return event; } }else { if (LogUtils.validateEvent(log)){ return event; } } // 3 返回校验结果 return null; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); if (intercept1 != null){ interceptors.add(intercept1); } } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogETLInterceptor(); } @Override public void configure(Context context) { } } }

4)Flume日志过滤工具类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.lsl.flume.interceptor; import org.apache.commons.lang.math.NumberUtils; public class LogUtils { public static boolean validateEvent(String log) { // 服务器时间 | json // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]} // 1 切割 String[] logContents = log.split("\|"); // 2 校验 if(logContents.length != 2){ return false; } //3 校验服务器时间 if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){ return false; } // 4 校验json if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){ return false; } return true; } public static boolean validateStart(String log) { if (log == null){ return false; } // 校验json if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){ return false; } return true; } }

5)Flume日志类型区分拦截器LogTypeInterceptor

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.lsl.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; public class LogTypeInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 区分日志类型: body header // 1 获取body数据 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); // 2 获取header Map<String, String> headers = event.getHeaders(); // 3 判断数据类型并向Header中赋值 if (log.contains("start")) { headers.put("topic","topic_start"); }else { headers.put("topic","topic_event"); } return event; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); interceptors.add(intercept1); } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } }

6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。

复制代码
1
2
注意:为什么不需要依赖包?因为依赖包在flume的lib目录下面已经存在了。

7)采用root用户将flume-interceptor-1.0-SNAPSHOT.jar包放入到hadoop01的/data/opt/cloudera/parcels/CDH/lib/flume-ng/lib/文件夹下面。

复制代码
1
2
3
[root@hadoop01 lib]# ls | grep interceptor flume-interceptor-1.0-SNAPSHOT.jar

8)分发Flume到hadoop02

复制代码
1
2
[root@hadoop01 lib]$ xsync flume-interceptor-1.0-SNAPSHOT.jar

三、kafka数据

3.1 离线安装

3.1.1 下载

安装jar包:http://archive.cloudera.com/csds/
安装包下载:http://archive.cloudera.com/kafka/parcels/4.0.0/
图片: https://uploader.shimo.im/f/dtHh8T705Xg7xqn2.png

3.1.2 安装

  1. 上传下载下来的安装包至集群路径/data/opt/cloudera/parcel-repo并修改KAFKA-4.0.0-1.4.0.0.p0.1-precise.parcel.sha1 文件名为KAFKA-4.0.0-1.4.0.0.p0.1-precise.parcel.sha
  2. 创建/data/opt/cloudera/csd目录导入Kafka-1.2.0.jar
    图片: https://uploader.shimo.im/f/fJWmqshX0GkV5tWm.png
  3. 点击安装主页上的主机–>Parcel–>检查新Parcel–>选择操作【分发->激活】图片: https://uploader.shimo.im/f/rVRe9S6KIowEpSlz.png
    图片: https://uploader.shimo.im/f/BfQ5jdjVayMJZl2O.png

3.2 kafka消息

3.2.1 查看Kafka Topic列表

kafka-topics --zookeeper hadoop01:2181 --list

3.2.2 创建Kafka Topic

创建:启动日志主题、事件日志主题。
1)创建启动日志主题
kafka-topics --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --replication-factor 2 --partitions 5 --topic topic_start
2)创建事件日志主题
kafka-topics --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --replication-factor 2 --partitions 5 --topic topic_event

3.2.3 删除Kafka Topic

1)删除启动日志主题
bin/kafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic topic_start
2)删除事件日志主题
bin/kafka-topics.sh --delete --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 --topic topic_event

3.2.4 Kafka生产消息

kafka-console-producer --broker-list hadoop01:9092 --topic topic_start

3.2.5 Kafka消费消息

kafka-console-consumer --bootstrap-server hadoop01:9092 --from-beginning --topic topic_start
–from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

3.2.6 查看Kafka Topic详情

kafka-topics --zookeeper hadoop01:2181 --describe --topic topic_start

四、消费kafka中的数据

4.1 Flume消费Kafka数据写到HDFS

1)集群规划
2)Flume配置分析
图片: https://uploader.shimo.im/f/D6DxTn1pZ7ox05to.png
3)Flume的具体配置如下:
(1)在CM管理页面hadoop03上Flume的配置中找到代理名称

复制代码
1
2
a1
复制代码
1
2
在配置文件如下内容(kafka-hdfs)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 # 一批写入 channel 的最大消息数 a1.sources.r2.batchDurationMillis = 2000# 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。 a1.sources.r2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 # 内存中存储 Event 的最大数 a1.channels.c1.transactionCapacity=10000 # source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大) ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/wljs/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true #是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符) a1.sinks.k1.hdfs.roundValue = 10 # 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30 a1.sinks.k1.hdfs.roundUnit = second # 向下舍入的单位,可选值: second 、 minute 、 hour ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/wljs/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 600 #当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 a1.sinks.k1.hdfs.rollSize = 134217728 #当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 a1.sinks.k1.hdfs.rollCount = 0 #当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) a1.sinks.k1.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制 a1.sinks.k2.hdfs.rollInterval = 600 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k2.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制 ## 控制输出文件是原生文件。 ## a1.sinks.k1.hdfs.fileType = CompressedStream ## a1.sinks.k2.hdfs.fileType = CompressedStream ## a1.sinks.k1.hdfs.codeC = lzop ## a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2 4.2 线上配置 ## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 8000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 8000 # 一批写入 channel 的最大消息数 a1.sources.r2.batchDurationMillis = 2000# 一个批次写入 channel 之前的最大等待时间(毫秒)。达到等待时间或者数量达到 batchSize 都会触发写操作。 a1.sources.r2.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 # 内存中存储 Event 的最大数 a1.channels.c1.transactionCapacity=10000 # source 或者 sink 每个事务中存取 Event 的操作数量(不能比 capacity 大) ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/wljs/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- a1.sinks.k1.hdfs.round = true #是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符) a1.sinks.k1.hdfs.roundValue = 10 # 向下舍入(小于当前时间)的这个值的最高倍(单位取决于下面的 hdfs.roundUnit ) 例子:假设当前时间戳是18:32:01,hdfs.roundUnit = minute 如果roundValue=5,则时间戳会取为:18:30 如果roundValue=7,则时间戳会取为:18:28 如果roundValue=10,则时间戳会取为:18:30 a1.sinks.k1.hdfs.roundUnit = minute # 向下舍入的单位,可选值: second 、 minute 、 hour ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/wljs/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 10 a1.sinks.k2.hdfs.roundUnit = minute ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 120 #当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 a1.sinks.k1.hdfs.rollSize = 0 #当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 a1.sinks.k1.hdfs.rollCount = 0 #当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) a1.sinks.k1.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制 a1.sinks.k2.hdfs.rollInterval = 120 a1.sinks.k2.hdfs.rollSize = 0 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k2.hdfs.minBlockReplicas = 1 #最小冗余数,让flume感知不到副本的复制 ## 控制输出文件 ## a1.sinks.k1.hdfs.fileType = CompressedStream ## a1.sinks.k2.hdfs.fileType = CompressedStream ## a1.sinks.k1.hdfs.codeC = lzop ## a1.sinks.k2.hdfs.codeC = lzop ## 控制输出文件是原生文件。 a1.sinks.k2.hdfs.fileType=DataStream #文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数 a1.sinks.k2.hdfs.idleTimeout=65 # 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒 a1.sinks.k2.hdfs.callTimeout=65000 # 允许HDFS操作文件的时间,比如:open、write、flush、close。如果HDFS操作超时次数增加,应该适当调高这个这个值。(毫秒) a1.sinks.k2.hdfs.threadsPoolSize=200 #每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等) a1.sinks.k2.hdfs.fileType=DataStream a1.sinks.k2.hdfs.idleTimeout=65 a1.sinks.k2.hdfs.callTimeout=65000 a1.sinks.k2.hdfs.threadsPoolSize=200 ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2

4.3 日志生成数据传输到HDFS

1)将log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar上传都hadoop01的/opt/module目录
2)分发log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop02

复制代码
1
2
[root@hadoop02 module]# xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

3)在/root/bin目录下创建脚本lg.sh

复制代码
1
2
[root@hadoop02 bin]$ vim lg.sh

4)在脚本中编写如下内容

复制代码
1
2
3
4
5
6
#! /bin/bash for i in hadoop01 hadoop02 do ssh $i "java -classpath /data/opt/module/log-produce.jar com.lsl.appclient.AppMain $1 $2 >/data/opt/module/test.log &" done

5)修改脚本执行权限

复制代码
1
2
[root@hadoop02 bin]$ chmod 777 lg.sh

6)启动脚本

复制代码
1
2
[root@hadoop02 module]$ lg.sh

五、可能出现的问题及解决

5.1 flume

5.1.1 OOM 问题

Flume 启动时的最大堆内存大小默认是 20M,线上环境很容易 OOM,因此需要你在 flume-env.sh 中添加 JVM 启动参数:

复制代码
1
2
JAVA_OPTS="-Xms8192m -Xmx8192m -Xss256k -Xmn2g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"

5.1.2 cdh5.8.0-1.6.0 flume-tairdir问题

taildirSource组件不支持文件改名的。如果文件改名会认为是新文件,就会重新读取,这就导致了日志文件重读!!!

解决:下载1.7flume源码修改编译,上传至flume的lib目录下

最后

以上就是勤恳洋葱最近收集整理的关于cdh5.8.0 flume+kafka用户行为日志数据采集方案详解一、日志模拟二、flume数据采集三、kafka数据四、消费kafka中的数据五、可能出现的问题及解决的全部内容,更多相关cdh5.8.0内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(69)

评论列表共有 0 条评论

立即
投稿
返回
顶部