我是靠谱客的博主 负责金毛,最近开发中收集的这篇文章主要介绍Flume(二)日志采集Flume启动停止脚本项目经验之Flume组件选型消费者Flume配置 采集通道启动/停止脚本,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
目录
日志采集Flume启动停止脚本
项目经验之Flume组件选型
消费者Flume配置
Flume时间戳拦截器
消费者Flume启动停止脚本
采集通道启动/停止脚本
日志采集Flume启动停止脚本
[doudou@hadoop102 bin]$ vim fl.sh
#! /bin/bash
case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1
&"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk
'{print $2}' | xargs -n1 kill -9 "
done
};;
esac
[doudou@hadoop102 bin]$ chmod 777 fl.sh
项目经验之Flume组件选型
FileChannel和MemoryChannel区别
MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
选型:
金融类公司、对钱要求非常准确的公司通常会选择FileChannel
传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。
消费者Flume配置
Flume时间戳拦截器
在com.atguigu.flume.interceptor包下创建TimeStampInterceptor类
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimeStampInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
//获取header
Map<String,String> headers=event.getHeaders();
//获取body中的ts
byte[] body=event.getBody();//获取body
String log=new String(body, StandardCharsets.UTF_8);//转换编译语言
JSONObject jsonObject=JSONObject.parseObject(log);//处理该日志
String ts=jsonObject.getString("ts");//截取ts
//将ts赋值给timestamp
headers.put("timestamp",ts);
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
for (Event event:list){
intercept(event);
}
return list;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimeStampInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
打包上传jar包
[doudou@hadoop104 lib]$ rz -E
[doudou@hadoop104 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
在hadoop104的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
[doudou@hadoop104 conf]$ vim kafka-flume-hdfs.conf
## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.doudouflume.interceptor.TimeStampInterceptor$Builder
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
#控制生成的小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
消费者Flume启动停止脚本
[doudou@hadoop102 bin]$ vim f2.sh
#! /bin/bash
case $1 in
"start"){
for i in hadoop104
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt
2>&1 &"
done
};;
"stop"){
for i in hadoop104
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print $2}' | xargs -n1 kill"
done
};;
esac
[doudou@hadoop102 bin]$ chmod 777 f2.sh
采集通道启动/停止脚本
[doudou@hadoop102 bin]$ vim cluster.sh
#!/bin/bash
case $1 in
"start"){
echo ================== 启动 集群 ==================
#启动 Zookeeper集群
zk.sh start
#启动 Hadoop集群
hdp.sh start
#启动 Kafka采集集群
kf.sh start
#启动 Flume采集集群
f1.sh start
#启动 Flume消费集群
f2.sh start
};;
"stop"){
echo ================== 停止 集群 ==================
#停止 Flume消费集群
f2.sh stop
#停止 Flume采集集群
f1.sh stop
#停止 Kafka采集集群
kf.sh stop
#停止 Hadoop集群
hdp.sh stop
#停止 Zookeeper集群
zk.sh stop
};;
esac
[doudou@hadoop102 bin]$ chmod 777 cluster.sh
最后
以上就是负责金毛为你收集整理的Flume(二)日志采集Flume启动停止脚本项目经验之Flume组件选型消费者Flume配置 采集通道启动/停止脚本的全部内容,希望文章能够帮你解决Flume(二)日志采集Flume启动停止脚本项目经验之Flume组件选型消费者Flume配置 采集通道启动/停止脚本所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复