我是靠谱客的博主 糟糕猫咪,最近开发中收集的这篇文章主要介绍flink自定义trigger详解,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

适用的场景解释:

[1]中有句话是这样的:

"其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现。"

这句话的意思是,默认的自带的trigger一般是基于EventTime的。

那么这1000 个元素可能跨度是一小时,也可能跨度是两小时,对吧

但是显然默认的Trigger只能是盯着EventTime(时间戳)来决定是否触发计算,并不能根据元素个数进行触发。

也就是说,默认的Trigger盯着的跨度是"时间差"。而不是"个数差"

讲人话就是:

①例如Flink的Trigger默认每隔一天输出统计数据,

②但是不支持默认每隔一千个订单输出统计数据。

但是注意这里的一千个统计数据可能超过一天,甚至超过一周,耗时可能不固定。

因为你想啊,代码都是要把逻辑写死的对吧?

一千个订单可能一开始耗时一周,后来耗时一个月。那程序要怎么根据变化的时间来锁定一千个订单触发一次?

显然做不到,这个时候我们就希望锁定"个数间隔"、“个数差”,这个时候就需要自定义Trigger

 

官方文档说明:

 

下面是官方文档[4]中Triggers这一节的内容概括

.

需要override的函数函数作用
onElement()数据(element)被加入window的时候会调用该函数
onEventTime() 

当一个注册的Event-Time定时器触发

onProcessingTime() 当一个注册的Processing-Time定时器触发
onMerge()

与有状态触发器(stateful triggers)和当两个窗口整合的时候整合(merge)状态相关。

例如使用session windows

clear()window清理数据需要

 

前面三个用来设定调用事件(invocation event)以后如何操作,

所以这些"操作"必须是一个TriggerResult

也就是说,前三个函数返回的TriggerResult可以是下面几种选择:

返回的TriggerResult作用
CONTINUE什么都不做
FIRE触发计算
PURGE删除窗口中的所有数据
FIRE_AND_PURG触发计算后删除窗口中所有数据

然后是Fire and Purge这一节的内容:

触发计算时,返回的一定是FIRE或者FIRE_AND_PURG(这个话仅仅是来自官方文档的翻译,其实Intellij提示的选项并不仅仅是上面几个)

 

 

具体示范代码参考[5]即可


private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);
 
    /**
     * 窗口最大数据量
     */
    private int maxCount;
    /**
     * event time / process time
     */
    private TimeCharacteristic timeType;
    /**
     * 用于储存窗口当前数据量的状态对象
     */
    private ReducingStateDescriptor<Long> countStateDescriptor =
            new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);
 
 
    public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {
 
        this.maxCount = maxCount;
        this.timeType = timeType;
    }
 
 
    private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
        clear(window, ctx);
        return TriggerResult.FIRE_AND_PURGE;
    }
 
 
    @Override
    public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
        countState.add(1L);
 
        if (countState.get() >= maxCount) {
            LOG.info("fire with count: " + countState.get());
            return fireAndPurge(window, ctx);
        }
        if (timestamp >= window.getEnd()) {
            LOG.info("fire with tiem: " + timestamp);
            return fireAndPurge(window, ctx);
        } else {
            return TriggerResult.CONTINUE;
        }
    }
 
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        if (timeType != TimeCharacteristic.ProcessingTime) {
            return TriggerResult.CONTINUE;
        }
 
        if (time >= window.getEnd()) {
            return TriggerResult.CONTINUE;
        } else {
            LOG.info("fire with process tiem: " + time);
            return fireAndPurge(window, ctx);
        }
    }
 
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        if (timeType != TimeCharacteristic.EventTime) {
            return TriggerResult.CONTINUE;
        }
 
        if (time >= window.getEnd()) {
            return TriggerResult.CONTINUE;
        } else {
            LOG.info("fire with event tiem: " + time);
            return fireAndPurge(window, ctx);
        }
    }
 
    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
        countState.clear();
    }
 
    /**
     * 计数方法
     */
    class Sum implements ReduceFunction<Long> {
 
        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}

 

 

Reference:

[1]flink自定义trigger-实现窗口随意输出

[2]Flink 自定义Trigger

[3]Flink 自定义trigger

[4]flink官方文档-窗口

[5]Flink 自定义触发器

最后

以上就是糟糕猫咪为你收集整理的flink自定义trigger详解的全部内容,希望文章能够帮你解决flink自定义trigger详解所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(63)

评论列表共有 0 条评论

立即
投稿
返回
顶部