工作中再次使用到了Flume,记录下flume配置过程中的一些问题。
涉及到的知识点:
flume,exec source,file channel,avro sink,cumtom sink开发
一、引言
首先,为什么使用flume肯定不需要再说明了,成熟的技术框架,
各大公司都采用的技术解决方案。现在的FlumeNG是个轻量级的
工具,脱离hadoop,用作别的日志收集,实时计算也是很好用。
相关用户文档:https://flume.apache.org/FlumeUserGuide.html
二、基本概念
Flume中有以下几个核心概念:
Event,一个数据单元,带有可选消息头,如果自己写custom source
或者sink的话会接触到。
Flow:Event从源点到目的点的迁移抽象
Agent:一个独立的Flume进程,包含Source,Channel,Sink组件
Source:用来消费传递给它的Events,它是整个flume的入口
Channel:中转Event的临时存储,保存Source传递过来的数据,并
导入到Sink
Sink:负责移除Events,收集数据或者转发到下一个流动环节中去。必然是重点。
Flume的可靠性设计:
sink只有在event被转发或者被存储到指定的容器中时才会移除channel中的数据
所有的操作都是事务操作。所以只要放入到Flume中的数据,可以确保数据能够
从Flume中流出。 针对异常恢复,Channel可以支持文件管道,通过文件对channel
的数据进行暂存。
下面介绍自己项目过程中使用的技术方案:
NginxLog--->Flume Agent-->Collector-->RabbitMQ/HDFS
web服务器集群中部署Agent,读取日志,然后传递到下一个采集中转机中
中转机再将日志发送给消息队列RabbitMQ和写入HDFS存储。
中转机使用load_balance配置。
对于Source,选用了execSource,
当然为了容错考虑,用网上那种tail -F肯定是不行的
设计时需要考虑,程序宕机了怎么恢复。所以采用的技术方案是用python实现一个
更好的日志文件监控,把处理的进度写到一个文件存下来,每次启动时
加载配置文件看处理到哪了,然后继续处理。
代码是在github上找的一个改的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163#coding:utf-8 #!/usr/bin/python import sys, os, glob, time import json import traceback from stat import * from urllib import unquote import hashlib def sha1_dict(d): j = json.dumps(d, sort_keys=True) return hashlib.sha1(j).hexdigest() def parseLineLogic(line): listparam=line.split('##') if len(listparam) != 6: return dict={} dict['logtime']=listparam[0] dict['ip']=listparam[1] (dict['type'],dict['url'],httpversion)=listparam[3].split(' ') dict['agent']=listparam[4] param = listparam[5] if (len(param) < 4): param = dict['url'].split('?')[1] paramdict = {} paramlist = param.split('&') for val in paramlist: (key,value) = val.split('=') paramdict[key] = value if 'data' in paramdict: paramdict['data'] = json.loads(unquote(paramdict['data'])) dict['param'] = paramdict dict['hashid'] = sha1_dict(dict) print json.dumps(dict) class WatchedFile: def __init__(self, path, inode, length): self.path = path self.inode = inode self.length = length self.fd = open(path, "r") self.newdata = True self.offset = 0 self.data = bytearray("") def close(self): self.fd.close() def reset(self): self.close() self.fd = open(self.path, "r") self.offset = 0 self.length = 0 self.data = bytearray("") def __repr__(self): return "WatchedFile(path = %s, inode = %d, offset = %d, length = %d, newdata = '%s')" % (self.path, self.inode, self.offset, self.length, str(self.newdata)) statusFile = 'tail.status' def loadWatchFilesStatus(): FileStatus = {} try: statusF = open(statusFile,'r') for line in statusF.readlines(): FileObj = json.loads(line) #print json.dumps(FileObj) watchFile = WatchedFile(path=FileObj['path'], inode=FileObj['inode'], length=FileObj['length']) watchFile.newdata = True if FileObj['offset'] < FileObj['length'] else False watchFile.offset = FileObj['offset'] watchFile.fd.seek(watchFile.offset) FileStatus[FileObj['inode']] = watchFile except: #traceback.print_exc() pass #print 'files load:%d' %len(FileStatus) return FileStatus def saveWatchedFilesStatus(Files): statusF = open(statusFile,'w') for obFile in Files.values(): #print obFile dt = {} dt['inode'] = obFile.inode dt['path'] = obFile.path dt['offset'] = obFile.offset dt['length'] = obFile.length dt['newdata'] = obFile.newdata statusF.write(json.dumps(dt)+'n') statusF.close() def tail(pattern, processor): watchedFiles = loadWatchFilesStatus() #watchedFiles = {} while True: # file all items matching the pattern for path in glob.iglob(pattern): try: stat = os.stat(path) # only watch regular files if S_ISREG(stat.st_mode): if stat.st_ino in watchedFiles: #print 'in watchedFies' # update length for files already being watched #print stat watchedFile = watchedFiles[stat.st_ino] if stat.st_size > watchedFile.length: watchedFile.newdata = True elif stat.st_size < watchedFile.length: watchedFile.reset() if stat.st_size > 0: watchedFile.newdata = True watchedFile.length = stat.st_size #print watchedFile else: #print 'add watch file',path,stat.st_ino,stat.st_size watchedFiles[stat.st_ino] = WatchedFile(path, stat.st_ino, stat.st_size) except OSError: # thrown by either os.stat or open pass # for watchedFile in watchedFiles.values(): # if not watchedFile.newdata: for watchedFile in watchedFiles.values(): if watchedFile.newdata: length = watchedFile.length - watchedFile.offset if length > 0: data = watchedFile.fd.read(length) if data: watchedFile.data += bytearray(data) watchedFile.offset += processor(watchedFile.path, watchedFile.data) watchedFile.newdata = False # remove files which no longer exist inodes = watchedFiles.keys() for inode in inodes: watchedFile = watchedFiles[inode] if not os.path.isfile(watchedFile.path): watchedFile.close() del watchedFiles[inode] saveWatchedFilesStatus(watchedFiles) try: time.sleep(1) except KeyboardInterrupt: sys.exit(0) #break def line_processor(path, buff): offset = 1 bytesRead = 0 #print buff while offset > 0: offset = buff.find("n") if offset > 0: offset += 1 # include n line = buff[:offset] del buff[:offset] #print "%s=%s" % (line.strip(), path) try: parseLineLogic(str(line)) except: # traceback.print_exc() pass bytesRead += offset return bytesRead if __name__ == '__main__': statusFile = sys.argv[2] tail(sys.argv[1], line_processor)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59class MQConstants { public final static String CONFIG_GroupName = "msgGroupName"; public final static String CONFIG_Topic = "msgTopic"; public final static String CONFIG_Tag = "msgTag"; public final static String CONFIG_Key = "msgKey"; } //继承自AbstractSink接口,并支持配置 public class LogSink extends AbstractSink implements Configurable { private static final Logger log = LoggerFactory.getLogger(LogSink.class); private String _GroupName; private String _Topic; private String _Tag; private String _Key; public LogSink(){ } //读取配置 @Override public void configure(Context context) { _GroupName = context.getString(MQConstants.CONFIG_GroupName,"gateDataProducerGroup"); _Topic = context.getString(MQConstants.CONFIG_Topic, "tradeDataTopic"); _Tag = context.getString(MQConstants.CONFIG_Tag, "gateData"); _Key = context.getString(MQConstants.CONFIG_Key, "gateData"); } @Override public synchronized void stop() { super.stop(); } private void resetConnection(){ } //实现Process过程,从代码可以看出,sink的处理都是在事务中的 @Override public Status process() throws EventDeliveryException { Transaction tx = getChannel().getTransaction(); try { tx.begin(); Event e = getChannel().take(); if(e==null){ tx.rollback(); return Status.BACKOFF; } try { ProducerFactory.send(_GroupName,_Topic,_Tag,_Key,e.getBody()); tx.commit(); } catch(Exception ex){ log.error(this.getName() + " - Exception while publishing...", ex); throw ex; } return Status.READY; } catch (Exception ex) { tx.rollback(); if(log.isErrorEnabled()) log.error(this.getName() + " - Exception while publishing...", ex); return Status.BACKOFF; } finally { tx.close(); } } }
代码中需要替换的就是ProducerFactory的发送逻辑了。可以换成自己要的别的代码
1ProducerFactory.send(_GroupName,_Topic,_Tag,_Key,e.getBody());
1工程依赖的jar包为 flume-core-x.x.x.jar
附:过程中遇到的hdfsSink找不到class的问题,由于自己的flume使用的是官网下的,
hadoop使用的cdh版本,运行的时候发现找不到hadoop的相关class,这里最简单的
做法就是把hadoop share里common的jar包复制到flume的lib目录下。
当然指定flume-env.sh里面指定FLUME_CLASSPATH也是可以的
参考文章:http://www.cnblogs.com/cswuyg/p/4498804.html
flume的一些思考:
本人在设计flume过程中对一些概念的理解吧
source层:
avrosource 监听一个网络端口,通过avro协议来收取数据。通常用于flumeagent之间传输
Thriftsource 监听一个网络接口用Thrift协议来传输数据,thrift是fb开源的rpc工具,适用性就更
广泛了,之前有在百度使用过用来接收别人传输的数据。
ExecSource 这个用来监听单个文件,或者自己生产数据,抓取网络数据也可以用。
但是exec有个严重的问题就是运行的程序无法知道数据是否被channel接收了。
所以更好的方案就是自己自定义customsource来实现对应的方法。
spooling directory source这个自己一开始也比较关注,但是使用条件要求太高
要实时性就要平凡的动日志文件,而且出错了就停止了,很蛋疼
所以说考虑要收集的源头,如果是文件就用exec,spoolingdirectory,或者custom source
如果是网络数据就用thriftsource,netcatsource,或者httpsource,avrosource
sink层:
sink是持久化,或者消费数据,或者转发数据的环节,这里没啥好选的,
用了什么技术就用什么sink,没有的可以去github上搜搜别人的代码
channel层
这里主要考虑filechannel和memorychannel,为了安全肯定使用filechannel好些
但是memory速度快。调试的时候一个ctrl+c数据就拜拜
Flume监控的各个字段的含义
网上相关资料太少,只能通过源码来了解相关字段的意义
SinkCounter的使用:
这里参照hdfsSink中官方的写法
在configure的时候初始化
sinkCounter = new SinkCounter(getName());
在start的时候
调用sinkCounter.start();
结束的时候调用sinkCounter.stop();
然后可以给这个几个计数器加值
private static final String COUNTER_CONNECTION_CREATED = "sink.connection.creation.count"; //打开链接(hdfs在第一次传输数据时会调用)
private static final String COUNTER_CONNECTION_CLOSED = "sink.connection.closed.count"; //关闭链接
private static final String COUNTER_CONNECTION_FAILED = "sink.connection.failed.count";//链接失败,超时
private static final String COUNTER_BATCH_EMPTY = "sink.batch.empty"; //批量处理数据时,没有拿到消息
private static final String COUNTER_BATCH_UNDERFLOW = "sink.batch.underflow"//批量处理输出时,没有超过设置的batchSize;
private static final String COUNTER_BATCH_COMPLETE = "sink.batch.complete";//批量处理数据时,取到了batchSize的数据量
private static final String COUNTER_EVENT_DRAIN_ATTEMPT = "sink.event.drain.attempt";尝试写入数据
private static final String COUNTER_EVENT_DRAIN_SUCCESS = "sink.event.drain.sucess"; //成功写入数据条数
最后
以上就是包容火最近收集整理的关于[大数据]flume日志收集的全部内容,更多相关[大数据]flume日志收集内容请搜索靠谱客的其他文章。
发表评论 取消回复