概述
主要内容
- flume conf 配置:拦截器+选择器
- 自定义拦截器
- 自定义拦截器使用方式与可能遇到的问题
1 flume 配置
#interceptor 注意 自定义的interceptor后面的"$Builder"一定不能忘记
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type= com.china.flume.interceptor.LogEtlInterceptor$Builder
a1.sources.r1.interceptors.i2.type= com.china.flume.interceptor.LogTypeInterceptor$Builder
# 选择器,按照一定规则选择出信息后,有两种分发方式 multiplexing(指定特定的 sink) replicating(发给所有的 sink)
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic-start = c1
a1.sources.r1.selector.mapping.topic-event = c2
2 自定义拦截器
- 实现
org.apache.flume.interceptor.Interceptor 接口
- 重写对应的方法:
intercept,initialize等
其中 intercept 为拦截的主要逻辑
3. 实现内部静态接口 Interceptor.Builder 一定是静态的,否则会无法初始化
具体初始化调用的为 InterceptorBuilderFactory.newInstance ==> Class 类里面的newInstance
具体样例代码
package com.china.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* create by l 2020/8/5 下午5:09
*/
public class LogEtlInterceptor implements Interceptor {
@Override
public void initialize() {}
// 主要处理逻辑
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
if(log.contains("start")){
boolean b = LogUtils.validateStart(log);
if(b){
return event;
}
}else {
if(LogUtils.validateEvent(log)){
return event;
}
}
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
for (final Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {}
// 重新指定 builder,注意,一定是静态的
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogEtlInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
3 自定义拦截器使用方式
打 jar 包,(可以不带依赖)上传到对应的flume/lib 目录;flume 指定配置文件
可能遇到的错误
=============
20/08/05 17:27:07 INFO source.DefaultSourceFactory: Creating instance of source r1, type TAILDIR
20/08/05 17:27:07 ERROR channel.ChannelProcessor: Could not instantiate Builder. Exception follows.
java.lang.InstantiationException: com.china.flume.interceptor.LogEtlInterceptor$Builder
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: com.china.flume.interceptor.LogEtlInterceptor$Builder.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
20/08/05 17:27:07 ERROR node.AbstractConfigurationProvider: Source r1 has been removed due to an error during configuration
org.apache.flume.FlumeException: Interceptor.Builder not constructable.
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:119)
at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InstantiationException: com.china.flume.interceptor.LogEtlInterceptor$Builder
at java.lang.Class.newInstance(Class.java:427)
at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
... 12 more
Caused by: java.lang.NoSuchMethodException: com.china.flume.interceptor.LogEtlInterceptor$Builder.<init>()
at java.lang.Class.getConstructor0(Class.java:3082)
at java.lang.Class.newInstance(Class.java:412)
... 14 more
==============
上面我遇到的这个报错为实现 implements Interceptor.Builder 没有添加 static,导致 init 失败,java 反射还得再看看了
最后
以上就是隐形纸飞机为你收集整理的flume使用自定义inteceptor,以及遇到 java.lang.InstantiationException的全部内容,希望文章能够帮你解决flume使用自定义inteceptor,以及遇到 java.lang.InstantiationException所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复