我是靠谱客的博主 悲凉店员,这篇文章主要介绍大数据日志收集框架之Flume实战,现在分享给大家,希望可以做个参考。

目录

一、环境准备

1、安装包下载

2、安装flume

3、修改配置文件

 二、环境变量配置

1、配置java环境变量

2、配置flume环境变量

三、Flume source

1、netcat source

2、avro source

3、exec source

4、spooldir Source

5、thrift source

6、JMS Source

7、Kafka Source

8、NetCat TCP Source

9、NetCat UDP Source

9、Sequence Generator Source

10、Syslog TCP Source

11、Multiport Syslog TCP Source

12、Syslog UDP Source

13、HTTP Source

14、Stress Source

15、Avro Legacy Source

16、Thrift Legacy Source

17、Custom Source

18、Scribe Source

四、Flume sink

1、hdfs sink

2、hive sink

3、logger sink

4、avro sink

5、Thrift Sink

6、IRC Sink

7、File Roll Sink

8、Null Sink

9、HBase1Sink

10、HBase2Sink

10、AsyncHBaseSink

11、MorphlineSolrSink

12、ElasticSearchSink

13、Kite Dataset Sink

14、Kafka Sink

15、TSL Kafka Sink

16、HTTP Sink

17、Custom Sink

18、自定义source和sink

五、Flume 通道

1、Memory Channel

2、JDBC Channel

3、Kafka Channel

4、TSL Kafka Channel

5、File Channel

6、Spillable Memory Channel

7、Pseudo Transaction Channel

六、Flume 通道选择器

1、Replicating Channel Selector (default)

2、Multiplexing Channel Selector

3、Custom Channel Selector

七、Flume Sink 处理器

1、Default Sink Processor

2、Failover Sink Processor

3、Load balancing Sink Processor

4、Body Text Serializer

八、Flume 事件序列化

1、Body Text Serializer

2、“Flume Event” Avro Event Serializer

3、Avro Event Serializer

九、Flume 拦截器

1、default interceptor

2、Timestamp Interceptor

3、Host Interceptor

4、Static Interceptor

5、Remove Header Interceptor

6、UUID Interceptor

7、Morphline Interceptor

8、Search and Replace Interceptor

9、Regex Filtering Interceptor

10、Regex Extractor Interceptor

十、Flume 配置

1、Environment Variable Config Filter

2、External Process Config Filter

3、Hadoop Credential Store Config Filter

4、Log4J Appender

5、Load Balancing Log4J Appender


一、环境准备

flume官方文档:Documentation — Apache Flume

1、安装包下载

jdk1.8:Java Downloads | Oracle

flume1.9.0:Download — Apache Flume

2、安装flume

复制代码
1
tar zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/
复制代码
1
ln -s apache-flume-1.9.0-bin flume

3、修改配置文件

复制代码
1
cd /usr/local/flume/conf
复制代码
1
2
3
cp flume-conf.properties.template flume-conf.properties cp flume-env.ps1.template flume-env.ps1 cp flume-env.sh.template flume-env.sh

 二、环境变量配置

1、配置java环境变量

复制代码
1
2
export JAVA_HOME=/usr/java/jdk1.8.0_241-amd64 export PATH=$PATH:$JAVA_HOME/bin

2、配置flume环境变量

复制代码
1
2
export FLUME_HOME=/usr/local/flume export PATH=$PATH:$FLUME_HOME/bin

三、Flume source

1、netcat source

/usr/local/flume 目录下创建 example.conf 文件,文件内容如下

source类型为监控端口,sink类型为日志输出,channel类型为内存,channel的最大存储event数量为1000,每次source发送或者sink接收event的数量为100

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动flume agent,配置文件为 example.conf ,agent名称为 a1 ,以日志形式在控制台显示接收source消息 

复制代码
1
flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

也可以使用命令简令, -c 指定flume的配置目录,-f 指定定义组件的配置文件 -n 指定组件中agent的名称,-Dflume.root.logger=INFO,console为flume的运行日志

复制代码
1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example.conf -n a1 -Dflume.root.logger=INFO,console
复制代码
1
telnet localhost 44444

 效果如图 ,sink监听本机44444端口,使用telnet向本机44444端口发送消息模拟source端发送消息,可以看到sink端以控制台日志的形式接收了source端的消息发送

 

flume还支持配置文件使用环境变量,仅限于值使用,变量也可以通过 conf/flume-env.sh 文件配置

将 example.conf source监听的端口 修改为 

复制代码
1
a1.sources.r1.port = ${BIND_PORT}

需要添加参数 -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

复制代码
1
BIND_PORT=44444 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example.conf -n a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

2、avro source

在flume根目录新建 example文件夹,将 example.conf 文件复制为 netcat_source.conf 文件

复制代码
1
cd $FLUME_HOME && mkdir example
复制代码
1
mv $FLUME_HOME/example.conf $FLUME_HOME/example && cp $FLUME_HOME/example.conf $FLUME_HOME/example/netcat_source.conf
复制代码
1
cd $FLUME_HOME/example && cp example.conf && vim avro_source.conf

修改 avro_source.conf 为

复制代码
1
2
3
a1.sources.r1.type = avro a1.sources.r1.bind = ${BIND_IP} a1.sources.r1.port = ${BIND_PORT}

启动 Agent 

复制代码
1
BIND_IP=localhost BIND_PORT=55555 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example/avro_source.conf -n a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

启动 Avro Client

复制代码
1
flume-ng avro-client -c $FLUME_HOME/conf -H localhost -p 55555 -F /etc/profile

 

3、exec source

复制代码
1
cd $FLUME_HOME/example && cp example.conf exec_tail_source.conf && vim exec_tail_source.conf

复制 example.conf 文件为 exec_tail_source.conf,修改以下内容为

复制代码
1
2
3
4
a1.sources.r1.type = exec a1.sources.r1.bind = ${BIND_IP} a1.sources.r1.port = ${BIND_PORT} a1.sources.r1.command = tail -F ${FLUME_HOME}/example/test.log

启动 Agent 

复制代码
1
BIND_IP=localhost BIND_PORT=55555 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example/exec_tail_source.conf -n a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

向监控的文件写入数据

复制代码
1
ping 127.0.0.1 >> ${FLUME_HOME}/example/test.log
复制代码
1
tail -F ${FLUME_HOME}/example/test.log

 

4、spooldir Source

复制代码
1
cd $FLUME_HOME/example && cp example.conf spooldir_source.conf && vim spooldir_source.conf

复制 example.conf 文件为 exec_tail_source.conf,修改以下内容为

复制代码
1
2
3
4
5
6
a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = ${FLUME_HOME}/example/test_spooldir a1.sources.r1.fileSuffix = .csv a1.sources.r1.fileHeader = true # a1.sources.r1.bind = ${BIND_IP} # a1.sources.r1.port = ${BIND_PORT}

启动 Agent 

复制代码
1
flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example/spooldir_source.conf -n a1 -Dflume.root.logger=INFO,console

写入文件

复制代码
1
2
3
4
cd $FLUME_HOME/example/test_spooldir echo 111 >> 1.txt echo 222 >> 2.txt ll

5、thrift source

复制代码
1
cd $FLUME_HOME/example && cp example.conf thrift_source.conf && vim thrift_source.conf

复制 example.conf 文件为 thrift.conf,修改以下内容为

复制代码
1
2
3
a1.sources.r1.type = thrift a1.sources.r1.bind = ${BIND_IP} a1.sources.r1.port = ${BIND_PORT}

启动 Agent 

复制代码
1
BIND_IP=0.0.0.0 BIND_PORT=55555 flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/example/thrift_source.conf -n a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

发送数据

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import java.nio.charset.Charset; import java.util.Arrays; //import org.apache.flume.api.SecureRpcClientFactory; public class MyFlumeRpcClient01 { public static void main(String[] args) { // String hostname = "127.0.0.1"; String hostname = "192.168.0.181"; int port = 55555; System.out.println((null==args) + "t" + (String.valueOf(args.length))); System.out.println(Arrays.toString(args)); if(null!=args && args.length!=0) { hostname = args[0]; port = Integer.valueOf(args[1]); } System.out.println(hostname); System.out.println(port); RpcClient client = RpcClientFactory.getThriftInstance(hostname, port); Event event; for(int i=0;i<10;i++) { event = EventBuilder.withBody(String.valueOf(System.currentTimeMillis()), Charset.forName("UTF-8")); try { client.append(event); Thread.sleep(600); } catch (EventDeliveryException | InterruptedException e) { e.printStackTrace(); client.close(); client = RpcClientFactory.getThriftInstance(hostname, port); } } client.close(); } }

Maven配置

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- flume配置 --> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk --> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.9.0</version> </dependency>

Maven打包

复制代码
1
mvn clean package -DskipTest

 执行 java -cp 命令

复制代码
1
java -cp real-time-1.0-jar-with-dependencies.jar com.xtd.java.flume.MyFlumeRpcClient01

flume控制台接收thrift发送的时间戳数据 

6、JMS Source

复制代码
1
2
3
4
5
6
7
8
9
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = jms a1.sources.r1.channels = c1 a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory a1.sources.r1.connectionFactory = GenericConnectionFactory a1.sources.r1.providerURL = tcp://mqserver:61616 a1.sources.r1.destinationName = BUSINESS_DATA a1.sources.r1.destinationType = QUEUE

7、Kafka Source

复制代码
1
2
3
4
5
6
7
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id
复制代码
1
2
3
4
5
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used

8、NetCat TCP Source

复制代码
1
2
3
4
5
6
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1

9、NetCat UDP Source

复制代码
1
2
3
4
5
6
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = netcatudp a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 6666 a1.sources.r1.channels = c1

9、Sequence Generator Source

复制代码
1
2
3
4
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.channels = c1

10、Syslog TCP Source

复制代码
1
2
3
4
5
6
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1

11、Multiport Syslog TCP Source

复制代码
1
2
3
4
5
6
7
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = multiport_syslogtcp a1.sources.r1.channels = c1 a1.sources.r1.host = 0.0.0.0 a1.sources.r1.ports = 10001 10002 10003 a1.sources.r1.portHeader = port

12、Syslog UDP Source

复制代码
1
2
3
4
5
6
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = syslogudp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost a1.sources.r1.channels = c1

13、HTTP Source

复制代码
1
2
3
4
5
6
7
8
9
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = http a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 a1.sources.r1.handler = org.example.rest.RestHandler a1.sources.r1.handler.nickname = random props a1.sources.r1.HttpConfiguration.sendServerVersion = false a1.sources.r1.ServerConnector.idleTimeout = 300

14、Stress Source

复制代码
1
2
3
4
5
6
a1.sources = stresssource-1 a1.channels = memoryChannel-1 a1.sources.stresssource-1.type = org.apache.flume.source.StressSource a1.sources.stresssource-1.size = 10240 a1.sources.stresssource-1.maxTotalEvents = 1000000 a1.sources.stresssource-1.channels = memoryChannel-1

15、Avro Legacy Source

复制代码
1
2
3
4
5
6
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource a1.sources.r1.host = 0.0.0.0 a1.sources.r1.bind = 6666 a1.sources.r1.channels = c1

16、Thrift Legacy Source

复制代码
1
2
3
4
5
6
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource a1.sources.r1.host = 0.0.0.0 a1.sources.r1.bind = 6666 a1.sources.r1.channels = c1

17、Custom Source

复制代码
1
2
3
4
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.example.MySource a1.sources.r1.channels = c1

18、Scribe Source

复制代码
1
2
3
4
5
6
a1.sources = r1 a1.channels = c1 a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource a1.sources.r1.port = 1463 a1.sources.r1.workerThreads = 5 a1.sources.r1.channels = c1

四、Flume sink

1、hdfs sink

复制代码
1
2
3
4
5
6
7
8
9
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute

2、hive sink

hive sink需要定义的内容比较多,根据表的字段,分区以及分隔符的不同设置相应与之变化,如下hive建表

复制代码
1
2
3
4
create table weblogs ( id int , msg string ) partitioned by (continent string, country string, time string) clustered by (id) into 5 buckets stored as orc;

 hive_sink.conf

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
a1.channels = c1 a1.channels.c1.type = memory a1.sinks = k1 a1.sinks.k1.type = hive a1.sinks.k1.channel = c1 a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083 a1.sinks.k1.hive.database = logsdb a1.sinks.k1.hive.table = weblogs a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M a1.sinks.k1.useLocalTimeStamp = false a1.sinks.k1.round = true a1.sinks.k1.roundValue = 10 a1.sinks.k1.roundUnit = minute a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = "t" a1.sinks.k1.serializer.serdeSeparator = 't' a1.sinks.k1.serializer.fieldnames =id,,msg

3、logger sink

复制代码
1
2
3
4
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1

4、avro sink

复制代码
1
2
3
4
5
6
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545

5、Thrift Sink

复制代码
1
2
3
4
5
6
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = thrift a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = 10.10.10.10 a1.sinks.k1.port = 4545

6、IRC Sink

复制代码
1
2
3
4
5
6
7
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = irc a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = irc.yourdomain.com a1.sinks.k1.nick = flume a1.sinks.k1.chan = #flume

7、File Roll Sink

复制代码
1
2
3
4
5
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume

8、Null Sink

复制代码
1
2
3
4
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = null a1.sinks.k1.channel = c1

9、HBase1Sink

复制代码
1
2
3
4
5
6
7
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hbase a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.k1.channel = c1

10、HBase2Sink

复制代码
1
2
3
4
5
6
7
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hbase2 a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer a1.sinks.k1.channel = c1

10、AsyncHBaseSink

复制代码
1
2
3
4
5
6
7
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = asynchbase a1.sinks.k1.table = foo_table a1.sinks.k1.columnFamily = bar_cf a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer a1.sinks.k1.channel = c1

11、MorphlineSolrSink

复制代码
1
2
3
4
5
6
7
8
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink a1.sinks.k1.channel = c1 a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf # a1.sinks.k1.morphlineId = morphline1 # a1.sinks.k1.batchSize = 1000 # a1.sinks.k1.batchDurationMillis = 1000

12、ElasticSearchSink

复制代码
1
2
3
4
5
6
7
8
9
10
11
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = elasticsearch a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 a1.sinks.k1.indexName = foo_index a1.sinks.k1.indexType = bar_type a1.sinks.k1.clusterName = foobar_cluster a1.sinks.k1.batchSize = 500 a1.sinks.k1.ttl = 5d a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer a1.sinks.k1.channel = c1

13、Kite Dataset Sink

14、Kafka Sink

复制代码
1
2
3
4
5
6
7
8
a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy

15、TSL Kafka Sink

复制代码
1
2
3
4
5
6
7
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.sinks.sink1.kafka.topic = mytopic a1.sinks.sink1.kafka.producer.security.protocol = SSL # optional, the global truststore can be used alternatively a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

16、HTTP Sink

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = http a1.sinks.k1.channel = c1 a1.sinks.k1.endpoint = http://localhost:8080/someuri a1.sinks.k1.connectTimeout = 2000 a1.sinks.k1.requestTimeout = 2000 a1.sinks.k1.acceptHeader = application/json a1.sinks.k1.contentTypeHeader = application/json a1.sinks.k1.defaultBackoff = true a1.sinks.k1.defaultRollback = true a1.sinks.k1.defaultIncrementMetrics = false a1.sinks.k1.backoff.4XX = false a1.sinks.k1.rollback.4XX = false a1.sinks.k1.incrementMetrics.4XX = true a1.sinks.k1.backoff.200 = false a1.sinks.k1.rollback.200 = false a1.sinks.k1.incrementMetrics.200 = true

17、Custom Sink

复制代码
1
2
3
4
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = org.example.MySink a1.sinks.k1.channel = c1

18、自定义source和sink

flume开发者文档:Flume 1.9.0 Developer Guide — Apache Flume

五、Flume 通道

1、Memory Channel

复制代码
1
2
3
4
5
6
a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000

2、JDBC Channel

复制代码
1
2
a1.channels = c1 a1.channels.c1.type = jdbc

3、Kafka Channel

复制代码
1
2
3
4
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer

4、TSL Kafka Channel

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 a1.channels.channel1.kafka.topic = channel1 a1.channels.channel1.kafka.consumer.group.id = flume-consumer a1.channels.channel1.kafka.producer.security.protocol = SSL # optional, the global truststore can be used alternatively a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore> a1.channels.channel1.kafka.consumer.security.protocol = SSL # optional, the global truststore can be used alternatively a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

5、File Channel

复制代码
1
2
3
4
a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data

6、Spillable Memory Channel

复制代码
1
2
3
4
5
6
7
a1.channels = c1 a1.channels.c1.type = SPILLABLEMEMORY a1.channels.c1.memoryCapacity = 10000 a1.channels.c1.overflowCapacity = 1000000 a1.channels.c1.byteCapacity = 800000 a1.channels.c1.checkpointDir = /mnt/flume/checkpoint a1.channels.c1.dataDirs = /mnt/flume/data

7、Pseudo Transaction Channel

复制代码
1
2
a1.channels = c1 a1.channels.c1.type = org.example.MyChannel

六、Flume 通道选择器

1、Replicating Channel Selector (default)

复制代码
1
2
3
4
5
a1.sources = r1 a1.channels = c1 c2 c3 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 c3 a1.sources.r1.selector.optional = c3

2、Multiplexing Channel Selector

复制代码
1
2
3
4
5
6
7
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4

3、Custom Channel Selector

复制代码
1
2
3
a1.sources = r1 a1.channels = c1 a1.sources.r1.selector.type = org.example.MyChannelSelector

七、Flume Sink 处理器

1、Default Sink Processor

复制代码
1
2
3
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance

2、Failover Sink Processor

复制代码
1
2
3
4
5
6
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000

3、Load balancing Sink Processor

复制代码
1
2
3
4
5
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random

4、Body Text Serializer

复制代码
1
2
3
4
5
6
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false

八、Flume 事件序列化

1、Body Text Serializer

复制代码
1
2
3
4
5
6
a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume a1.sinks.k1.sink.serializer = text a1.sinks.k1.sink.serializer.appendNewline = false

2、“Flume Event” Avro Event Serializer

复制代码
1
2
3
4
5
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = avro_event a1.sinks.k1.serializer.compressionCodec = snappy

3、Avro Event Serializer

复制代码
1
2
3
4
5
6
a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder a1.sinks.k1.serializer.compressionCodec = snappy a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

九、Flume 拦截器

1、default interceptor

复制代码
1
2
3
4
5
6
7
8
9
10
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i1.hostHeader = hostname a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d a1.sinks.k1.channel = c1

2、Timestamp Interceptor

复制代码
1
2
3
4
5
6
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp

3、Host Interceptor

复制代码
1
2
3
4
a1.sources = r1 a1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = host

4、Static Interceptor

复制代码
1
2
3
4
5
6
7
8
a1.sources = r1 a1.channels = c1 a1.sources.r1.channels = c1 a1.sources.r1.type = seq a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = datacenter a1.sources.r1.interceptors.i1.value = NEW_YORK

5、Remove Header Interceptor

6、UUID Interceptor

7、Morphline Interceptor

复制代码
1
2
3
4
a1.sources.avroSrc.interceptors = morphlineinterceptor a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

8、Search and Replace Interceptor

复制代码
1
2
3
4
5
6
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Remove leading alphanumeric characters in an event body. a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+ a1.sources.avroSrc.interceptors.search-replace.replaceString =
复制代码
1
2
3
4
5
6
a1.sources.avroSrc.interceptors = search-replace a1.sources.avroSrc.interceptors.search-replace.type = search_replace # Use grouping operators to reorder and munge words on a line. a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+) a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

9、Regex Filtering Interceptor

10、Regex Extractor Interceptor

复制代码
1
2
3
4
5
a1.sources.r1.interceptors.i1.regex = (\d):(\d):(\d) a1.sources.r1.interceptors.i1.serializers = s1 s2 s3 a1.sources.r1.interceptors.i1.serializers.s1.name = one a1.sources.r1.interceptors.i1.serializers.s2.name = two a1.sources.r1.interceptors.i1.serializers.s3.name = three
复制代码
1
2
3
4
5
a1.sources.r1.interceptors.i1.regex = ^(?:\n)?(\d\d\d\d-\d\d-\d\d\s\d\d:\d\d) a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

十、Flume 配置

1、Environment Variable Config Filter

复制代码
1
2
3
4
5
6
7
8
9
a1.sources = r1 a1.channels = c1 a1.configfilters = f1 a1.configfilters.f1.type = env a1.sources.r1.channels = c1 a1.sources.r1.type = http a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value Secret123

2、External Process Config Filter

复制代码
1
2
3
4
5
6
7
8
9
10
11
a1.sources = r1 a1.channels = c1 a1.configfilters = f1 a1.configfilters.f1.type = external a1.configfilters.f1.command = /usr/bin/passwordResolver.sh a1.configfilters.f1.charset = UTF-8 a1.sources.r1.channels = c1 a1.sources.r1.type = http a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value Secret123
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
a1.sources = r1 a1.channels = c1 a1.configfilters = f1 a1.configfilters.f1.type = external a1.configfilters.f1.command = /usr/bin/generateUniqId.sh a1.configfilters.f1.charset = UTF-8 a1.sinks = k1 a1.sinks.k1.type = file_roll a1.sinks.k1.channel = c1 a1.sinks.k1.sink.directory = /var/log/flume/agent_${f1['agent_name']} # will be /var/log/flume/agent_1234

3、Hadoop Credential Store Config Filter

复制代码
1
2
3
4
5
6
7
8
9
10
a1.sources = r1 a1.channels = c1 a1.configfilters = f1 a1.configfilters.f1.type = hadoop a1.configfilters.f1.credential.provider.path = jceks://file/<path_to_jceks file> a1.sources.r1.channels = c1 a1.sources.r1.type = http a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value from the credential store

4、Log4J Appender

复制代码
1
2
3
4
5
6
7
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 log4j.appender.flume.UnsafeMode = true # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume
复制代码
1
2
3
4
5
6
7
8
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = example.com log4j.appender.flume.Port = 41414 log4j.appender.flume.AvroReflectionEnabled = true log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume

5、Load Balancing Log4J Appender

复制代码
1
2
3
4
5
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume
复制代码
1
2
3
4
5
6
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 log4j.appender.out2.Selector = RANDOM # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume
复制代码
1
2
3
4
5
6
7
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432 log4j.appender.out2.Selector = ROUND_ROBIN log4j.appender.out2.MaxBackoff = 30000 # configure a class's logger to output to the flume appender log4j.logger.org.example.MyClass = DEBUG,flume

最后

以上就是悲凉店员最近收集整理的关于大数据日志收集框架之Flume实战的全部内容,更多相关大数据日志收集框架之Flume实战内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(72)

评论列表共有 0 条评论

立即
投稿
返回
顶部