我是靠谱客的博主 隐形纸飞机,最近开发中收集的这篇文章主要介绍flume使用自定义inteceptor,以及遇到 java.lang.InstantiationException,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

主要内容

  1. flume conf 配置:拦截器+选择器
  2. 自定义拦截器
  3. 自定义拦截器使用方式与可能遇到的问题

 

1 flume 配置

#interceptor 注意 自定义的interceptor后面的"$Builder"一定不能忘记
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type= com.china.flume.interceptor.LogEtlInterceptor$Builder
a1.sources.r1.interceptors.i2.type= com.china.flume.interceptor.LogTypeInterceptor$Builder
# 选择器,按照一定规则选择出信息后,有两种分发方式 multiplexing(指定特定的 sink) replicating(发给所有的 sink)
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic-start = c1
a1.sources.r1.selector.mapping.topic-event = c2

2 自定义拦截器

  1. 实现 
    org.apache.flume.interceptor.Interceptor 接口
  2. 重写对应的方法:
    intercept,initialize等
    

           其中 intercept 为拦截的主要逻辑

 

    3. 实现内部静态接口 Interceptor.Builder 一定是静态的,否则会无法初始化

具体初始化调用的为 InterceptorBuilderFactory.newInstance ==> Class 类里面的newInstance

具体样例代码

package com.china.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* create by l 2020/8/5 下午5:09
*/
public class LogEtlInterceptor implements Interceptor {
@Override
public void initialize() {}
// 主要处理逻辑
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
if(log.contains("start")){
boolean b = LogUtils.validateStart(log);
if(b){
return event;
}
}else {
if(LogUtils.validateEvent(log)){
return event;
}
}
return null;
}
@Override
public List<Event> intercept(List<Event> events) {
for (final Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {}
// 重新指定 builder,注意,一定是静态的
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogEtlInterceptor();
}
@Override
public void configure(Context context) {
}
}
}

 

    3 自定义拦截器使用方式

打 jar 包,(可以不带依赖)上传到对应的flume/lib 目录;flume 指定配置文件

可能遇到的错误

=============
20/08/05 17:27:07 INFO source.DefaultSourceFactory: Creating instance of source r1, type TAILDIR
20/08/05 17:27:07 ERROR channel.ChannelProcessor: Could not instantiate Builder. Exception follows.
java.lang.InstantiationException: com.china.flume.interceptor.LogEtlInterceptor$Builder
        at java.lang.Class.newInstance(Class.java:427)
        at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
        at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
        at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodException: com.china.flume.interceptor.LogEtlInterceptor$Builder.<init>()
        at java.lang.Class.getConstructor0(Class.java:3082)
        at java.lang.Class.newInstance(Class.java:412)
        ... 14 more
20/08/05 17:27:07 ERROR node.AbstractConfigurationProvider: Source r1 has been removed due to an error during configuration
org.apache.flume.FlumeException: Interceptor.Builder not constructable.
        at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:119)
        at org.apache.flume.channel.ChannelProcessor.configure(ChannelProcessor.java:82)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:348)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:101)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InstantiationException: com.china.flume.interceptor.LogEtlInterceptor$Builder
        at java.lang.Class.newInstance(Class.java:427)
        at org.apache.flume.interceptor.InterceptorBuilderFactory.newInstance(InterceptorBuilderFactory.java:50)
        at org.apache.flume.channel.ChannelProcessor.configureInterceptors(ChannelProcessor.java:111)
        ... 12 more
Caused by: java.lang.NoSuchMethodException: com.china.flume.interceptor.LogEtlInterceptor$Builder.<init>()
        at java.lang.Class.getConstructor0(Class.java:3082)
        at java.lang.Class.newInstance(Class.java:412)
        ... 14 more
==============

上面我遇到的这个报错为实现 implements Interceptor.Builder 没有添加 static,导致 init 失败,java 反射还得再看看了

 

最后

以上就是隐形纸飞机为你收集整理的flume使用自定义inteceptor,以及遇到 java.lang.InstantiationException的全部内容,希望文章能够帮你解决flume使用自定义inteceptor,以及遇到 java.lang.InstantiationException所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(40)

评论列表共有 0 条评论

立即
投稿
返回
顶部