概述
Kafka日志收集实现
- 使用github.com/Shopify/sarama连接kafka,并往其中写数据
- 使用github.com/hpcloud/tail读取日志文件
- 使用zookeeper做集群管理
- 使用ini做配置文件解析
关于Kafka原理及工作流程见https://blog.csdn.net/wzb_wzt/article/details/107367245
初始化kafka连接、以及往kafka发送数据的方法
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
//专门往kafka写日志的模块
//sarama v1.20之后的版本加入了zstd压缩算法,需要用到cgo,在Windows平台编译时会报错
var (
client sarama.SyncProducer
)
// Init 初始化生产者
func Init(addrs []string)(err error){
config:=sarama.NewConfig()
config.Producer.RequiredAcks=sarama.WaitForAll //ACK反馈机制,all(需要leader 和follow都确认了)
config.Producer.Partitioner=sarama.NewRandomPartitioner//指定写往哪个分区 新选出一个partition
config.Producer.Return.Successes=true//成功交付的信息将在success channel返回
//连接kafka
client, err = sarama.NewSyncProducer(addrs, config)
if err != nil {
fmt.Printf("producer close the err is %v",err)
return
}
//defer client.Close()
//不需要关闭 日志每时都在产生,所以不需要关闭
return
}
func SendToKafka(topic,data string)(err error){
//构造一个消息
msg:=&sarama.ProducerMessage{}
msg.Topic=topic
msg.Value=sarama.StringEncoder(data)
//发送消息
message, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed err:",err)
return err
}
fmt.Printf("pid:%v;offset:%vn",message,offset)
return
}
初始化日志文件,读取文件数据到通道
package taillog
import (
"github.com/hpcloud/tail"
)
var (
tailObj *tail.Tail
)
//Init 打开日志文件初始化
func Init(fileName string)(err error){
config := tail.Config{
ReOpen:
true,//重新打开
切换文件时,创建新文件
Follow:
true,//跟随文件
Location:
&tail.SeekInfo{Offset: 0, Whence: 2},//从文件哪个地方开始读
MustExist: false,//文件不存在不报错
Poll:
true,//
}
tailObj, err = tail.TailFile(fileName, config)
if err != nil {
return
}
return
}
//ReadChan 循环读取文件
func ReadChan() <-chan *tail.Line{
return tailObj.Lines
}
主函数调用
- 加载配置文件信息;采用ini来读取配置信息并与结构体绑定
- 初始化kafka连接
- 初始化日志文件
- 循环读取文件信息并写入Kafka
//配置文件
[kafka]
addrs=127.0.0.1:9092
topic=web_log
[taillog]
path=./my.log
//配置文件的结构体
//整个配置文件
package config
type AppConf struct {
KafkaConf `ini:"kafka"`
TailConf `ini:"taillog"`
}
//Kafka 配置文件结构体
type KafkaConf struct {
Address string `ini:"addrs"`
Topic string`ini:"topic"`
}
//Tail 配置文件结构体
type TailConf struct {
Path string `ini:"path"`
}
package main
import (
"fmt"
"gopkg.in/ini.v1"
"log"
"logagent/config"
"logagent/kafka"
"logagent/taillog"
"time"
)
//logAgent 入口程序
var (
iniFile =new(config.AppConf)
)
func run(){
//1读取文件
for{
select {
//2发送到kafka
case line:=<-taillog.ReadChan():
kafka.SendToKafka(iniFile.Topic,line.Text)
default:
time.Sleep(time.Second)
}
}
}
func main(){
//0.加载配置文件
err := ini.MapTo(iniFile, "./config/logagent.ini")
if err != nil {
log.Fatal(err)
}
//1.初始化kafka连接
err = kafka.Init([]string{iniFile.Address})
if err != nil {
fmt.Println("init kafka failed ,err:",err)
return
}
fmt.Println("init kafka success!!")
//2.打开日志文件准备收集日志
err = taillog.Init(iniFile.Path)
if err != nil {
fmt.Println("init tail failed ,err:",err)
return
}
fmt.Println("init tail success!!")
run()
}
最后
以上就是自由电脑为你收集整理的Kafka日志收集简单实现的全部内容,希望文章能够帮你解决Kafka日志收集简单实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复