我是靠谱客的博主 包容火,最近开发中收集的这篇文章主要介绍[大数据]flume日志收集,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

工作中再次使用到了Flume,记录下flume配置过程中的一些问题。

涉及到的知识点:

flume,exec source,file channel,avro sink,cumtom sink开发


一、引言

首先,为什么使用flume肯定不需要再说明了,成熟的技术框架,

各大公司都采用的技术解决方案。现在的FlumeNG是个轻量级的

工具,脱离hadoop,用作别的日志收集,实时计算也是很好用。

相关用户文档:https://flume.apache.org/FlumeUserGuide.html


二、基本概念

Flume中有以下几个核心概念:

Event,一个数据单元,带有可选消息头,如果自己写custom source

或者sink的话会接触到。

Flow:Event从源点到目的点的迁移抽象

Agent:一个独立的Flume进程,包含Source,Channel,Sink组件

Source:用来消费传递给它的Events,它是整个flume的入口

Channel:中转Event的临时存储,保存Source传递过来的数据,并

导入到Sink

Sink:负责移除Events,收集数据或者转发到下一个流动环节中去。必然是重点。


Flume的可靠性设计:

sink只有在event被转发或者被存储到指定的容器中时才会移除channel中的数据

所有的操作都是事务操作。所以只要放入到Flume中的数据,可以确保数据能够

从Flume中流出。 针对异常恢复,Channel可以支持文件管道,通过文件对channel

的数据进行暂存。


下面介绍自己项目过程中使用的技术方案:

NginxLog--->Flume Agent-->Collector-->RabbitMQ/HDFS

web服务器集群中部署Agent,读取日志,然后传递到下一个采集中转机中

中转机再将日志发送给消息队列RabbitMQ和写入HDFS存储。

中转机使用load_balance配置。


对于Source,选用了execSource,

当然为了容错考虑,用网上那种tail -F肯定是不行的

设计时需要考虑,程序宕机了怎么恢复。所以采用的技术方案是用python实现一个

更好的日志文件监控,把处理的进度写到一个文件存下来,每次启动时

加载配置文件看处理到哪了,然后继续处理。

代码是在github上找的一个改的

#coding:utf-8
#!/usr/bin/python
import sys, os, glob, time
import json
import traceback
from stat import *
from urllib import unquote
import hashlib
def sha1_dict(d):
j = json.dumps(d, sort_keys=True)
return hashlib.sha1(j).hexdigest()
def parseLineLogic(line):
listparam=line.split('##')
if len(listparam) != 6:
return
dict={}
dict['logtime']=listparam[0]
dict['ip']=listparam[1]
(dict['type'],dict['url'],httpversion)=listparam[3].split(' ')
dict['agent']=listparam[4]
param = listparam[5]
if (len(param) < 4):
param = dict['url'].split('?')[1]
paramdict = {}
paramlist = param.split('&')
for val in paramlist:
(key,value) = val.split('=')
paramdict[key] = value
if 'data' in paramdict:
paramdict['data'] = json.loads(unquote(paramdict['data']))
dict['param'] = paramdict
dict['hashid'] =
sha1_dict(dict)
print json.dumps(dict)
class WatchedFile:
def __init__(self, path, inode, length):
self.path = path
self.inode = inode
self.length = length
self.fd = open(path, "r")
self.newdata = True
self.offset = 0
self.data = bytearray("")
def close(self):
self.fd.close()
def reset(self):
self.close()
self.fd = open(self.path, "r")
self.offset = 0
self.length = 0
self.data = bytearray("")
def __repr__(self):
return "WatchedFile(path = %s, inode = %d, offset = %d, length = %d, newdata = '%s')"

% (self.path, self.inode, self.offset, self.length, str(self.newdata))
statusFile = 'tail.status'
def loadWatchFilesStatus():
FileStatus = {}
try:
statusF = open(statusFile,'r')
for line in statusF.readlines():
FileObj = json.loads(line)
#print json.dumps(FileObj)
watchFile = WatchedFile(path=FileObj['path'], inode=FileObj['inode'], length=FileObj['length'])
watchFile.newdata = True if FileObj['offset'] < FileObj['length'] else False
watchFile.offset = FileObj['offset']
watchFile.fd.seek(watchFile.offset)
FileStatus[FileObj['inode']] = watchFile
except:
#traceback.print_exc()
pass
#print 'files load:%d' %len(FileStatus)
return FileStatus
def saveWatchedFilesStatus(Files):
statusF = open(statusFile,'w')
for obFile in Files.values():
#print obFile
dt = {}
dt['inode'] = obFile.inode
dt['path'] = obFile.path
dt['offset'] = obFile.offset
dt['length'] = obFile.length
dt['newdata'] = obFile.newdata
statusF.write(json.dumps(dt)+'n')
statusF.close()
def tail(pattern, processor):
watchedFiles = loadWatchFilesStatus()
#watchedFiles = {}
while True:
# file all items matching the pattern
for path in glob.iglob(pattern):
try:
stat = os.stat(path)
# only watch regular files
if S_ISREG(stat.st_mode):
if stat.st_ino in watchedFiles:
#print 'in watchedFies'
# update length for files already being watched
#print stat
watchedFile = watchedFiles[stat.st_ino]
if stat.st_size > watchedFile.length:
watchedFile.newdata = True
elif stat.st_size < watchedFile.length:
watchedFile.reset()
if stat.st_size > 0:
watchedFile.newdata = True
watchedFile.length = stat.st_size
#print watchedFile
else:
#print 'add watch file',path,stat.st_ino,stat.st_size
watchedFiles[stat.st_ino] = WatchedFile(path, stat.st_ino, stat.st_size)
except OSError:
# thrown by either os.stat or open
pass
#
for watchedFile in watchedFiles.values():
#
if not watchedFile.newdata:
for watchedFile in watchedFiles.values():
if watchedFile.newdata:
length = watchedFile.length - watchedFile.offset
if length > 0:
data = watchedFile.fd.read(length)
if data:
watchedFile.data += bytearray(data)
watchedFile.offset += processor(watchedFile.path, watchedFile.data)
watchedFile.newdata = False
# remove files which no longer exist
inodes = watchedFiles.keys()
for inode in inodes:
watchedFile = watchedFiles[inode]
if not os.path.isfile(watchedFile.path):
watchedFile.close()
del watchedFiles[inode]
saveWatchedFilesStatus(watchedFiles)
try:
time.sleep(1)
except KeyboardInterrupt:
sys.exit(0)
#break
def line_processor(path, buff):
offset = 1
bytesRead = 0
#print buff
while offset > 0:
offset = buff.find("n")
if offset > 0:
offset += 1 # include n
line = buff[:offset]
del buff[:offset]
#print "%s=%s" % (line.strip(), path)
try:
parseLineLogic(str(line))
except:
#
traceback.print_exc()
pass
bytesRead += offset
return bytesRead
if __name__ == '__main__':
statusFile = sys.argv[2]
tail(sys.argv[1], line_processor)

之后的AvroSink,AvroSource,FileChannel都比较简单没什么好说的了。

在说下CustomSink的实现,由于项目多人开发,接口人给了一个java发消息的接口
所以只能自己实现CustomSink了
其实这个也很简单

class MQConstants {
public final static String CONFIG_GroupName = "msgGroupName";
public final static String CONFIG_Topic = "msgTopic";
public final static String CONFIG_Tag = "msgTag";
public final static String CONFIG_Key = "msgKey";
}
//继承自AbstractSink接口,并支持配置
public class LogSink extends AbstractSink implements Configurable {
private static final Logger log = LoggerFactory.getLogger(LogSink.class);
private String _GroupName;
private String _Topic;
private String _Tag;
private String _Key;
public LogSink(){
}
//读取配置
@Override
public void configure(Context context) {
_GroupName = context.getString(MQConstants.CONFIG_GroupName,"gateDataProducerGroup");
_Topic = context.getString(MQConstants.CONFIG_Topic, "tradeDataTopic");
_Tag = context.getString(MQConstants.CONFIG_Tag, "gateData");
_Key = context.getString(MQConstants.CONFIG_Key, "gateData");
}
@Override
public synchronized void stop() {
super.stop();
}
private void resetConnection(){
}
//实现Process过程,从代码可以看出,sink的处理都是在事务中的
@Override
public Status process() throws EventDeliveryException {
Transaction tx = getChannel().getTransaction();
try {
tx.begin();
Event e = getChannel().take();
if(e==null){
tx.rollback();
return Status.BACKOFF;
}
try {
ProducerFactory.send(_GroupName,_Topic,_Tag,_Key,e.getBody());
tx.commit();
} catch(Exception ex){
log.error(this.getName() + " - Exception while publishing...", ex);
throw ex;
}
return Status.READY;
} catch (Exception ex) {
tx.rollback();
if(log.isErrorEnabled())
log.error(this.getName() + " - Exception while publishing...", ex);
return Status.BACKOFF;
} finally {
tx.close();
}
}
}

代码中需要替换的就是ProducerFactory的发送逻辑了。可以换成自己要的别的代码
ProducerFactory.send(_GroupName,_Topic,_Tag,_Key,e.getBody());
工程依赖的jar包为 flume-core-x.x.x.jar


附:过程中遇到的hdfsSink找不到class的问题,由于自己的flume使用的是官网下的,

hadoop使用的cdh版本,运行的时候发现找不到hadoop的相关class,这里最简单的

做法就是把hadoop share里common的jar包复制到flume的lib目录下。

当然指定flume-env.sh里面指定FLUME_CLASSPATH也是可以的

参考文章:http://www.cnblogs.com/cswuyg/p/4498804.html


flume的一些思考:

本人在设计flume过程中对一些概念的理解吧

source层:

avrosource 监听一个网络端口,通过avro协议来收取数据。通常用于flumeagent之间传输

Thriftsource 监听一个网络接口用Thrift协议来传输数据,thrift是fb开源的rpc工具,适用性就更

广泛了,之前有在百度使用过用来接收别人传输的数据。

ExecSource 这个用来监听单个文件,或者自己生产数据,抓取网络数据也可以用。

但是exec有个严重的问题就是运行的程序无法知道数据是否被channel接收了。

所以更好的方案就是自己自定义customsource来实现对应的方法。

spooling directory source这个自己一开始也比较关注,但是使用条件要求太高

要实时性就要平凡的动日志文件,而且出错了就停止了,很蛋疼

所以说考虑要收集的源头,如果是文件就用exec,spoolingdirectory,或者custom source

如果是网络数据就用thriftsource,netcatsource,或者httpsource,avrosource


sink层:

sink是持久化,或者消费数据,或者转发数据的环节,这里没啥好选的,

用了什么技术就用什么sink,没有的可以去github上搜搜别人的代码


channel层

这里主要考虑filechannel和memorychannel,为了安全肯定使用filechannel好些

但是memory速度快。调试的时候一个ctrl+c数据就拜拜



Flume监控的各个字段的含义

网上相关资料太少,只能通过源码来了解相关字段的意义

SinkCounter的使用:

这里参照hdfsSink中官方的写法

在configure的时候初始化

sinkCounter = new SinkCounter(getName());

在start的时候

调用sinkCounter.start();

结束的时候调用sinkCounter.stop();

然后可以给这个几个计数器加值

    private static final String COUNTER_CONNECTION_CREATED = "sink.connection.creation.count";  //打开链接(hdfs在第一次传输数据时会调用)
    private static final String COUNTER_CONNECTION_CLOSED = "sink.connection.closed.count"; //关闭链接
    private static final String COUNTER_CONNECTION_FAILED = "sink.connection.failed.count";//链接失败,超时
    private static final String COUNTER_BATCH_EMPTY = "sink.batch.empty";  //批量处理数据时,没有拿到消息
    private static final String COUNTER_BATCH_UNDERFLOW = "sink.batch.underflow"//批量处理输出时,没有超过设置的batchSize;
    private static final String COUNTER_BATCH_COMPLETE = "sink.batch.complete";//批量处理数据时,取到了batchSize的数据量
    private static final String COUNTER_EVENT_DRAIN_ATTEMPT = "sink.event.drain.attempt";尝试写入数据
    private static final String COUNTER_EVENT_DRAIN_SUCCESS = "sink.event.drain.sucess"; //成功写入数据条数



最后

以上就是包容火为你收集整理的[大数据]flume日志收集的全部内容,希望文章能够帮你解决[大数据]flume日志收集所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(76)

评论列表共有 0 条评论

立即
投稿
返回
顶部