概述
窗口触发器
1. 为什么要有触发器
- 决定何时触发窗口后续的逻辑执行。每个窗口都有一个默认的触发器,时间窗口默认watermark超过EndTime就触发计算
窗口类型 | 触发器 | 触发时机 |
---|---|---|
EventTime(Tumblng/Sliding/Session) | EventTimeTrigger | 一旦Watermark没过窗口的EndTime,该窗口触发 |
ProcessingTime(Tumblng/Sliding/Session) | ProcessingTimeTrigger | 一旦系统时间没过窗口的EndTime,该窗口触发 |
GlobalWindow | NeverTrigger | 永不触发 |
2. 触发器怎么使用
- Flink中定义了Trigger抽象类,任何trigger必须继承Trigger类,Flink官方提供了几种常用的trigger实现,同时,用户可以根据需求自定义trigger。
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
/*
*
* 窗口中每来一条数据被调用一次
*/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
/*
* 在一个ProcessingTime定时器触发的时候调用
*/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
/*
* 在一个EventTime定时器触发的时候调用
*/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
public boolean canMerge() {
return false;
}
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
public abstract void clear(W window, TriggerContext ctx) throws Exception;
}
- 前三方法决定着如何通过返回一个TriggerResult枚举类来操作输入事件
CONTINUE:什么都不做
FIRE:触发计算
PURE:清除窗口的元素
FIRE_AND_PURE:触发计算和清除窗口元素
5. ProcessingTimeTrigger 源码分析
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private ProcessingTimeTrigger() {}
/*
*
*在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())将会注册一个ProcessingTime定时器,时间参数是window.maxTimestamp(),也就是窗口的最终时间,当时间到达这个窗口最终时间,定时器触发并调用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,触发窗口中数据的计算。
*/
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
// only register a timer if the time is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the time is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "ProcessingTimeTrigger()";
}
/**
* Creates a new trigger that fires once system time passes the end of the window.
*/
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
}
- 需要注意的是ProcessingTimeTrigger类只会在窗口的最终时间到达的时候触发窗口函数的计算,计算完成后并不会清除窗口中的数据,这些数据存储在内存中,除非调用PURGE或FIRE_AND_PURGE,否则数据将一直存在内存中。实际上,Flink中提供的Trigger类,除了PurgingTrigger类,其他的都不会对窗口中的数据进行清除。
最后
以上就是温柔缘分为你收集整理的Flink 窗口触发器的全部内容,希望文章能够帮你解决Flink 窗口触发器所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复