概述
背景:一般情况下, 窗口操作都有默认的窗口触发器,有时候默认的trigger不满足条件,就需要我们自己去定义相应的trigger 去决定处理窗口数据的时间。
Trigger抽象类
触发器接口有五种方法,允许触发器对不同的事件作出反应
onElement()
添加到每个窗口的元素都会调用此方法。
onEventTime()
当注册的事件时间计时器触发时,将调用此方法。
onProcessingTime()
当注册的处理时间计时器触发时,将调用此方法。
onMerge()
与有状态触发器相关,并在两个触发器对应的窗口合并时合并它们的状态,例如在使用会话窗口时。(目前没使用过,了解不多)
clear()
执行删除相应窗口时所需的任何操作。(一般是删除定义的状态、定时器等)
这些trigger都是flink 提供的一些比较常用的触发器
TriggerResult
onElement(),onEventTime(),onProcessingTime()
都要求返回一个TriggerResult
TriggerResult包含以下内容
- CONTINUE:表示啥都不做。
- FIRE:表示触发计算,同时保留窗口中的数据
- PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
- FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。(默认情况下,预先实现的触发器只触发而不清除窗口状态。)
自定义UserCountTrigger,可以参考flink 提供的CountTrigger
/**
* @Description 自定义count触发器
* @auther eamon.yu
* @date 2020/11/23/023 15:29
*/
public class UserCountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
//定义多少条记录就触发一次,windown的聚合函数
private final long maxCount;
//定义状态变量 用于接受数据
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new UserCountTrigger.Sum(), LongSerializer.INSTANCE);
private UserCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
System.out.println("onElement ....");
count.add(1L);
if (count.get() >= maxCount) {
//当记录条数大于maxCount, 进行一次window的触发函数
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
System.out.println("onProcessingTime ....");
// 当窗口关闭时,会触发一次onProcessingTime 这时候, 可能还没到trigger满足条件,所以需要手动触发一次
long end = ((TimeWindow) window).getEnd();
if (time == end - 1L) {
ctx.getPartitionedState(stateDesc).clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, Trigger.TriggerContext ctx) throws Exception {
System.out.println("clear ....");
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
public static <W extends Window> UserCountTrigger<W> of(long maxCount) {
return new UserCountTrigger(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
上面触发器,和flink提供的CountTrigger有点不一样!!!
举个例子,如果数据满3条处理一次,
flink 的CountTrigger, 假设一个窗口内来了5条数据,前三天会当作满足条件进行处理,后面2条由于不满足3条不会进行处理,当到达窗口的关闭时间后,后面两条数据还是不会进行处理,数据就会丢失,下一个窗口又是重新开始, 对于我们的业务来说这样是不正确的
所以需要,当窗口关闭的时候,进行一次触发window的聚合函数,由于当前时间是基于processTime,可以在onProcessingTime 可以根据窗口关闭时间,手动去触发一次。
定义一个flink 程序进行消费数据,统计次数,当条数大于3 进行一次window的触发函数操作
/**
* @Description
* @auther eamon.yu
* @date 2020/11/23/023 11:22
*/
public class TestCountTrigger {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties propsConsumer = new Properties();
propsConsumer.setProperty("bootstrap.servers", "172.18.11.12:9092");
propsConsumer.setProperty("group.id", "trafficwisdom-streaming");
propsConsumer.put("enable.auto.commit", false);
propsConsumer.put("max.poll.records", 1000);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test_order1_ymz", new SimpleStringSchema(), propsConsumer);
consumer.setStartFromLatest();
DataStream<String> stream = env.addSource(consumer);
stream.print();
DataStream<Tuple2<String, Integer>> exposure = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
try {
String[] split = value.split(",");
return Tuple2.of(split[0],Integer.valueOf(split[1]) );
}catch (Exception e){
return null;
}
}
}).filter(Objects::nonNull);
DataStream<Tuple2<String, Integer>> result = exposure.keyBy(0).
timeWindow(Time.minutes(1)).
trigger(UserCountTrigger.of(3)).sum(1);
result.print();
env.execute("test trigger");
}
}
往kafka中插入数据 a,1 五次,运行结果如下
最后
以上就是善良高跟鞋为你收集整理的flink 自定义trigger的全部内容,希望文章能够帮你解决flink 自定义trigger所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复