我是靠谱客的博主 会撒娇睫毛膏,这篇文章主要介绍Flume详细配置,现在分享给大家,希望可以做个参考。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
Flume: ===================== Flume是一种分布式的、可靠的、可用的服务,可以有效地收集、聚合和移动大量的日志数据。 它有一个基于流数据的简单而灵活的体系结构。 它具有健壮性和容错能力,具有可调的可靠性机制和许多故障转移和恢复机制。 它使用一个简单的可扩展数据模型,允许在线分析应用程序。 source:源 对channel而言,相当于生产者,通过接收各种格式数据发送给channel进行传输 channel:通道 相当于数据缓冲区,接收source数据发送给sink sink:沉槽 对channel而言,相当于消费者,通过接收channel数据通过指定数据类型发送到指定位置 Event: =============== flume传输基本单位: head + body flume安装: ================ 1、解压 2、符号链接 3、配置环境变量并使其生效 4、修改配置文件 1)重命名flume-env.ps1.template为flume-env.ps1 2)重命名flume-env.sh.template为flume-env.sh 3)修改flume-env.sh,配置jdk目录,添加 export JAVA_HOME=/soft/jdk 5、flume 查看版本 flume-ng version flume使用: ========================= //flume可以将配置文件写在zk上 //flume运行命令 flume-ng agent -n a1 -f xxx.conf /flume-ng agent -n xx -f xxx.conf agent: a1 source: s1 channel:c1 sink: n1 使用方法: 1、编写配置文件r_nc.conf # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、启动flume,指定配置文件 flume-ng agent -n a1 -f r_nc.conf 3、启动另一个会话,进行测试 nc localhost 8888 //用户手册 http://flume.apache.org/FlumeUserGuide.html 后台运行程序: ============================================= ctrl + z :将程序放在后台运行 =====> [1]+ Stopped flume-ng agent -n a1 -f r_nc.conf 通过 bg %1 的方式将程序后台运行 通过jobs查看后台任务 通过 fg %1 的方式将程序放在前台运行

复制代码

 

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
flume: 海量日志数据的收集、聚合和移动 flume-ng agent -n a1 -f xxx.conf source 相对于channel是生产者 //netcat channel 类似于缓冲区 //memory sink 相对于channel是消费者 //logger Event: header + body k v data source: ============================================ 1、序列(seq)源:多用作测试 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = seq # 总共发送的事件个数 a1.sources.r1.totalEvents = 1000 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、压力(stress)源:多用作负载测试 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = org.apache.flume.source.StressSource # 单个事件大小,单位:byte a1.sources.r1.size = 10240 # 事件总数 a1.sources.r1.maxTotalEvents = 1000000 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3、滚动目录(Spooldir)源:监听指定目录新文件产生,并将新文件数据作为event发送 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = spooldir # 设置监听目录 a1.sources.r1.spoolDir = /home/centos/spooldir # 通过以下配置指定消费完成后文件后缀 #a1.sources.r1.fileSuffix = .COMPLETED # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 4、exec源 //通过执行linux命令产生新数据 //典型应用 tail -F (监听一个文件,文件增长的时候,输出追加数据) //不能保证数据完整性,很可能丢失数据 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = exec # 配置linux命令 a1.sources.r1.command = tail -F /home/centos/readme.txt # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 5、Taildir源 //监控目录下文件 //文件类型可通过正则指定 //有容灾机制 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = TAILDIR # 设置source组 可设置多个 a1.sources.r1.filegroups = f1 # 设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件 a1.sources.r1.filegroups.f1 = /home/centos/taildir/.* # 设置定位文件的位置 # a1.sources.r1.positionFile ~/.flume/taildir_position.json # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 sink: ==================================== 1、fileSink //多用作数据收集 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = file_roll # 配置目标文件夹 a1.sinks.k1.sink.directory = /home/centos/file # 设置滚动间隔,默认30s,设为0则不滚动,成为单个文件 a1.sinks.k1.sink.rollInterval = 0 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、hdfsSink //默认以seqFile格式写入 //k:LongWritable //v: BytesWritable // # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = hdfs # 配置目标文件夹 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/ # 配置文件前缀 a1.sinks.k1.hdfs.filePrefix = events- # 滚动间隔,秒 a1.sinks.k1.hdfs.rollInterval = 0 # 触发滚动文件大小,byte a1.sinks.k1.hdfs.rollSize = 1024 # 配置使用本地时间戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true # 配置输出文件类型,默认SequenceFile # DataStream文本格式,不能设置压缩编解码器 # CompressedStream压缩文本格式,需要设置编解码器 a1.sinks.k1.hdfs.fileType = DataStream # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3、hiveSink: //hiveserver帮助:hive --service help //1、hive --service metastore 启动hive的metastore服务,metastore地址:thrift://localhost:9083 //2、将hcatalog的依赖放在/hive/lib下,cp hive-hcatalog* /soft/hive/lib (位置/soft/hive/hcatalog/share/hcatalog) //3、创建hive事务表 //SET hive.support.concurrency=true; SET hive.enforce.bucketing=true; SET hive.exec.dynamic.partition.mode=nonstrict; SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; SET hive.compactor.initiator.on=true; SET hive.compactor.worker.threads=1; //create table myhive.weblogs(id int, name string, age int) clustered by(id) into 2 buckets row format delimited fields terminated by 't' stored as orc tblproperties('transactional'='true'); # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = hive a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083 a1.sinks.k1.hive.database = myhive a1.sinks.k1.hive.table = weblogs a1.sinks.k1.useLocalTimeStamp = true #输入格式,DELIMITED和json #DELIMITED 普通文本 #json json文件 a1.sinks.k1.serializer = DELIMITED #输入字段分隔符,双引号 a1.sinks.k1.serializer.delimiter = "," #输出字段分隔符,单引号 a1.sinks.k1.serializer.serdeSeparator = 't' #字段名称,","分隔,不能有空格 a1.sinks.k1.serializer.fieldnames =id,name,age # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 4、hbaseSink //SimpleHbaseEventSerializer将rowKey和col设置了默认值,不能自定义 //RegexHbaseEventSerializer可以手动指定rowKey和col字段名称 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = hbase a1.sinks.k1.table = flume_hbase a1.sinks.k1.columnFamily = f1 a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer # 配置col正则手动指定 # rowKeyIndex手动指定rowKey,索引以0开头 a1.sinks.k1.serializer.colNames = ROW_KEY,name,age a1.sinks.k1.serializer.regex = (.*),(.*),(.*) a1.sinks.k1.serializer.rowKeyIndex=0 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 5、asynchbaseSink //异步hbaseSink //异步机制,写入速度快 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = asynchbase a1.sinks.k1.table = flume_hbase a1.sinks.k1.columnFamily = f1 a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 channel:缓冲区 ===================================== 1、memorychannel a1.channels.c1.type = memory # 缓冲区中存留的最大event个数 a1.channels.c1.capacity = 1000 # channel从source中每个事务提取的最大event数 # channel发送给sink每个事务发送的最大event数 a1.channels.c1.transactionCapacity = 100 2、fileChannel: //检查点和数据存储在默认位置时,当多个channel同时开启 //会导致文件冲突,引发其他channel会崩溃 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/centos/flume/checkpoint a1.channels.c1.dataDirs = /home/centos/flume/data # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 memoryChannel:快速,但是当设备断电,数据会丢失 FileChannel: 速度较慢,即使设备断电,数据也不会丢失 Avro =============================================== source # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4444 # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 *********************************************************************************************** *启动avro客户端,发送数据: * * flume-ng avro-client -H localhost -p 4444 -R ~/avro/header.txt -F ~/avro/user0.txt * * 指定ip 指定端口 指定header文件 指定数据文件 * *********************************************************************************************** sink # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/centos/taildir/.* # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.bind = 192.168.23.101 a1.sinks.k1.port = 4444 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 Flume跃点: ===================================== 1、将s101的flume发送到其他节点 xsync.sh /soft/flume xsync.sh /soft/apache-flume-1.8.0-bin/ 2、切换到root用户,分发环境变量文件 su root xsync.sh /etc/profile exit 3、配置文件 1)配置s101 //hop.conf 设置source:avro 设置sink: hdfs # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4444 # 配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/hop/%y-%m-%d/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollSize = 1024 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.fileType = DataStream # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2)配置s102-s104 //hop2.conf 设置source:taildir 设置sink: avro # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/centos/taildir/.* # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.23.101 a1.sinks.k1.port = 4444 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 4、在s102-s104创建~/taildir文件夹 xcall.sh "mkdir ~/taildir" 5、启动s101的flume flume-ng agent -n a1 -f /soft/flume/conf/hop.conf 6、分别启动s102-s104的flume,并将其放在后台运行 flume-ng agent -n a1 -f /soft/flume/conf/hop2.conf & 7、进行测试,分别在s102-s104的taildir中创建数据,观察hdfs数据情况 s102]$ echo 102 > taildir/1.txt s103]$ echo 103 > taildir/1.txt s104]$ echo 104 > taildir/1.txt interceptor:拦截器 ================================== 是source端组件:负责修改或删除event 每个source可以配置多个拦截器 ===> interceptorChain 1、Timestamp Interceptor //时间戳拦截器 + header # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 # 给拦截器起名 a1.sources.r1.interceptors = i1 # 指定拦截器类型 a1.sources.r1.interceptors.i1.type = timestamp # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 2、Static Interceptor //静态拦截器 + header 3、Host Interceptor //主机拦截器 + header 4、设置拦截器链: # 将agent组件起名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sources.r1.interceptors = i1 i2 i3 a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i2.type = host a1.sources.r1.interceptors.i3.type = static a1.sources.r1.interceptors.i3.key = location a1.sources.r1.interceptors.i3.value = NEW_YORK # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 channel selector:通道挑选器 ==================================== 是source端组件:负责将event发送到指定的channel,相当于分区 当一个source设置多个channel时,默认以副本形式向每个channel发送一个event拷贝 1、replication副本通道挑选器 //默认挑选器,source将所有channel发送event副本 //设置source x 1, channel x 3, sink x 3 // nc memory file # 将agent组件起名 a1.sources = r1 a1.sinks = k1 k2 k3 a1.channels = c1 c2 c3 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 8888 a1.sources.r1.selector.type = replicating # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 100 # 配置sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/centos/file1 a1.sinks.k1.sink.rollInterval = 0 a1.sinks.k2.type = file_roll a1.sinks.k2.sink.directory = /home/centos/file2 a1.sinks.k2.sink.rollInterval = 0 a1.sinks.k3.type = file_roll a1.sinks.k3.sink.directory = /home/centos/file3 a1.sinks.k3.sink.rollInterval = 0 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 c2 c3 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3 2、Multiplexing 多路复用通道挑选器 //选择avro源发送文件 # 将agent组件起名 a1.sources = r1 a1.sinks = k1 k2 k3 a1.channels = c1 c2 c3 # 配置source a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4444 # 配置通道挑选器 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = country a1.sources.r1.selector.mapping.CN = c1 a1.sources.r1.selector.mapping.US = c2 a1.sources.r1.selector.default = c3 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 100 # 配置sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /home/centos/file1 a1.sinks.k1.sink.rollInterval = 0 a1.sinks.k2.type = file_roll a1.sinks.k2.sink.directory = /home/centos/file2 a1.sinks.k2.sink.rollInterval = 0 a1.sinks.k3.type = file_roll a1.sinks.k3.sink.directory = /home/centos/file3 a1.sinks.k3.sink.rollInterval = 0 # 绑定channel-source, channel-sink a1.sources.r1.channels = c1 c2 c3 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3 1、创建file1 file2 file3文件夹,家目录 mkdir file1 file2 file3 2、创建文件夹country,并放入头文件和数据 创建头文件CN.txt、US.txt、OTHER.txt CN.txt ===> country CN US.txt ===> country US OTHER.txt ===> country OTHER 创建数据 1.txt 1.txt ====> helloworld 3、运行flume flume-ng agent -n a1 -f /soft/flume/selector_multi.conf 4、运行Avro客户端 flume-ng avro-client -H localhost -p 4444 -R ~/country/US.txt -F ~/country/1.txt ===> 查看file2 flume-ng avro-client -H localhost -p 4444 -R ~/country/CN.txt -F ~/country/1.txt ===> 查看file1 flume-ng avro-client -H localhost -p 4444 -R ~/country/OTHER.txt -F ~/country/1.txt ===> 查看file3 sinkProcessor ================================= sink Runner 运行一个 sink Group sink Group 是由一个或多个 sink 构成 sink Runner 告诉 sink Group 处理下一批 event sink Group 含有一个 sink Processor , 负责指定一个 sink 来处理这批数据 2、failover 容灾 //将所有sink设置一个优先级 //数量越大,优先级越高 //当数据传入时,优先级最高的sink负责处理 //当sink挂掉,次高优先级的sink被激活,继续处理数据 //channel和sink必须一对一 a1.sources = r1 a1.sinks = s1 s2 s3 a1.channels = c1 c2 c3 # Describe/configure the source a1.sources.r1.type = seq a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = s1 s2 s3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.s1 = 5 a1.sinkgroups.g1.processor.priority.s2 = 10 a1.sinkgroups.g1.processor.priority.s3 = 15 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Describe the sink a1.sinks.s1.type = file_roll a1.sinks.s1.sink.directory = /home/centos/file1 a1.sinks.s2.type = file_roll a1.sinks.s2.sink.directory = /home/centos/file2 a1.sinks.s3.type = file_roll a1.sinks.s3.sink.directory = /home/centos/file3 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c2.type = memory a1.channels.c3.type = memory # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 c3 a1.sinks.s1.channel = c1 a1.sinks.s2.channel = c2 a1.sinks.s3.channel = c3 Event事件是由Source端封装输入数据的字节数组得来的 Event event = EventBuilder.withBody(body); Sink中的process方法返回两种状态: 1、READY //一个或多个event成功分发 2、BACKOFF //channel中没有数据提供给sink flume中事务的生命周期: tx.begin() //开启事务,之后执行操作 tx.commit() //提交事务,操作完成后由此提交 tx.rollback() //回滚事务,出现异常可以采取回滚措施 tx.close() //关闭事务,最后一定要关闭事务

最后

以上就是会撒娇睫毛膏最近收集整理的关于Flume详细配置的全部内容,更多相关Flume详细配置内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部