概述
Flink学习笔记-Trigger窗口触发
- Flink提供的Triggers
- 自定义Trigger
窗口的计算触发依赖于窗口触发器,每种类型的窗口都有对应的窗口触发机制,都有一个默认的窗口触发器,触发器的作用就是去控制什么时候来触发我们的聚合方法。Flink内部定义实现了如EventTimeTrigger、ProcessTimeTrigger、CountTrigger以及DeltaTrigger等等。一般地,每种触发器对应于不同的Window Assigner,例如EventTime类型的Windows对应的触发器是EventTimeTrigger,工作原理是判断当前的Watermark是否超过了窗口的EndTime,如果超过则触发对窗口内数据的计算,否则不触发计算。
Flink提供的Triggers
- EventTimeTrigger:通过对比Watermark和窗口的Endtime确定是否触发窗口计算,如果Watermark大于Window EndTime则触发,否则不触发,窗口将继续等待。
- ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
- ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
- ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
- CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
- DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
- PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
自定义Trigger
继承实现Trigger抽象类去完成自定义触发器,然后在DataStream中调用trigger方法传入自定义的Trigger即可。
package myLearn;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
* @author qingh.yxb
* @since 2019/8/10
*/
public class MyTrigger extends Trigger<Tuple2<String, Integer>, TimeWindow> {
private static final long serialVersionUID = 1L;
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("total", Integer.class);
@Override
public TriggerResult onElement(Tuple2<String, Integer> element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ValueState<Integer> sumState = ctx.getPartitionedState(stateDescriptor);
if (null == sumState.value()) {
sumState.update(0);
}
sumState.update(element.f1 + sumState.value());
if (sumState.value() >= 2) {
//这里可以选择手动处理状态
// 默认的trigger发送是TriggerResult.FIRE 不会清除窗口数据
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("清理窗口状态 窗口内保存值为" + ctx.getPartitionedState(stateDescriptor).value());
ctx.getPartitionedState(stateDescriptor).clear();
}
}
上述自定义Trigger例子会在Tuple f1数据叠加到100后触发计算并清除状态数据。
自定义Trigger需要实现的方法说明:
- OnElement :每一个数据进入窗口都会触发。
- OnEventTime :根据接入窗口的EventTime进行触发操作
- OnProcessTime : 根据接入窗口的ProcessTime进行触发操作
- Clear : 执行窗口及状态数据的清除方法。
窗口触发方法返回结果的类型:
- CONTINUE : 不进行操作,等待。
- FIRE : 触发计算且数据保留。
- PRUGE : 窗口内部数据清除且不触发计算。
- FIRE_AND_PURGE : 触发计算并清除对应的数据。
最后
以上就是细心长颈鹿为你收集整理的Flink学习笔记-Trigger窗口触发的全部内容,希望文章能够帮你解决Flink学习笔记-Trigger窗口触发所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复