概述
适用的场景解释:
[1]中有句话是这样的:
"其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现。"
这句话的意思是,默认的自带的trigger一般是基于EventTime的。
那么这1000 个元素可能跨度是一小时,也可能跨度是两小时,对吧
但是显然默认的Trigger只能是盯着EventTime(时间戳)来决定是否触发计算,并不能根据元素个数进行触发。
也就是说,默认的Trigger盯着的跨度是"时间差"。而不是"个数差"
讲人话就是:
①例如Flink的Trigger默认每隔一天输出统计数据,
②但是不支持默认每隔一千个订单输出统计数据。
但是注意这里的一千个统计数据可能超过一天,甚至超过一周,耗时可能不固定。
因为你想啊,代码都是要把逻辑写死的对吧?
一千个订单可能一开始耗时一周,后来耗时一个月。那程序要怎么根据变化的时间来锁定一千个订单触发一次?
显然做不到,这个时候我们就希望锁定"个数间隔"、“个数差”,这个时候就需要自定义Trigger
官方文档说明:
下面是官方文档[4]中Triggers这一节的内容概括
.
需要override的函数 | 函数作用 |
onElement() | 数据(element)被加入window的时候会调用该函数 |
onEventTime() | 当一个注册的Event-Time定时器触发 |
onProcessingTime() | 当一个注册的Processing-Time定时器触发 |
onMerge() | 与有状态触发器(stateful triggers)和当两个窗口整合的时候整合(merge)状态相关。 例如使用session windows |
clear() | window清理数据需要 |
前面三个用来设定调用事件(invocation event)以后如何操作,
所以这些"操作"必须是一个TriggerResult
也就是说,前三个函数返回的TriggerResult可以是下面几种选择:
返回的TriggerResult | 作用 |
CONTINUE | 什么都不做 |
FIRE | 触发计算 |
PURGE | 删除窗口中的所有数据 |
FIRE_AND_PURG | 触发计算后删除窗口中所有数据 |
然后是Fire and Purge这一节的内容:
触发计算时,返回的一定是FIRE或者FIRE_AND_PURG(这个话仅仅是来自官方文档的翻译,其实Intellij提示的选项并不仅仅是上面几个)
具体示范代码参考[5]即可
private static Logger LOG = LoggerFactory.getLogger(CountTriggerWithTimeout.class);
/**
* 窗口最大数据量
*/
private int maxCount;
/**
* event time / process time
*/
private TimeCharacteristic timeType;
/**
* 用于储存窗口当前数据量的状态对象
*/
private ReducingStateDescriptor<Long> countStateDescriptor =
new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE);
public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) {
this.maxCount = maxCount;
this.timeType = timeType;
}
private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception {
clear(window, ctx);
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.add(1L);
if (countState.get() >= maxCount) {
LOG.info("fire with count: " + countState.get());
return fireAndPurge(window, ctx);
}
if (timestamp >= window.getEnd()) {
LOG.info("fire with tiem: " + timestamp);
return fireAndPurge(window, ctx);
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.ProcessingTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
LOG.info("fire with process tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
if (timeType != TimeCharacteristic.EventTime) {
return TriggerResult.CONTINUE;
}
if (time >= window.getEnd()) {
return TriggerResult.CONTINUE;
} else {
LOG.info("fire with event tiem: " + time);
return fireAndPurge(window, ctx);
}
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);
countState.clear();
}
/**
* 计数方法
*/
class Sum implements ReduceFunction<Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
Reference:
[1]flink自定义trigger-实现窗口随意输出
[2]Flink 自定义Trigger
[3]Flink 自定义trigger
[4]flink官方文档-窗口
[5]Flink 自定义触发器
最后
以上就是糟糕猫咪为你收集整理的flink自定义trigger详解的全部内容,希望文章能够帮你解决flink自定义trigger详解所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复