概述
1,Trigger基础:
Trigger窗口触发器自带的类型有:
EventTimeTrigger
ProcessTimeTrigger
CountTrigger 等等
如果不满足自己的业务要求,可以自定义实现触发器:
简单案例:
package com.learning.window; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.AllWindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import java.util.Properties; /** * todo TriggerResult 的几个操作 CONTINUE(false, false),FIRE_AND_PURGE(true, true),FIRE(true, false), PURGE(false, true); * <p> * todo 1, CONTINUE 不触发,等待 * todo 2, FIRE 触发 数据保留 * todo 3,PURGE 不触发,数据清空 * todo 4,FIRE_AND_PURGE 触发 清除数据 */ public class TriggerDemo extends Trigger<Object, TimeWindow> { private static int flag = 0; // todo 进入窗口每个元素都触发 @Override public TriggerResult onElement(Object element, long l, TimeWindow timeWindow, TriggerContext ctx) throws Exception { ctx.registerProcessingTimeTimer(timeWindow.maxTimestamp()); if (flag > 9) { flag = 0; return TriggerResult.FIRE; //统计数量为10的时候 触发 } else { flag++; } //进入的元素 System.out.println("onElement : " + element); return TriggerResult.CONTINUE; } // todo 针对ProcessingTime进行触发操作 @Override public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { return TriggerResult.FIRE; } // todo 针对EventTime进行触发操作 @Override public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(timeWindow.maxTimestamp()); } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception { long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } public static TriggerDemo create() { return new TriggerDemo(); } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9093"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<>("test", new SimpleStringSchema(), properties); AllWindowedStream<String, TimeWindow> stream = env .addSource(kafkaConsumer010) .timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.seconds(20)) .trigger(TriggerDemo.create()); stream.sum(0).print(); env.execute("Flink Streaming Java API Skeleton"); } }
对了,这里做个笔记:
window.getEnd跟 window.maxTimestamp区别:
2,简单案例代码:
package com.learning.window; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * todo flink 自定义触发器实现带超时时间的 CountWindow * */ public class CountTriggerWithTimeout<T> extends Trigger<T, TimeWindow> { private static Logger logger = LoggerFactory.getLogger(CountTriggerWithTimeout.class); /** * 窗口最大数据量 */ private int maxCount; /** * event time / process time */ private TimeCharacteristic timeType; /** * 用于储存窗口当前数据量的状态对象 */ private ReducingStateDescriptor<Long> countStateDescriptor = new ReducingStateDescriptor("counter", new Sum(), LongSerializer.INSTANCE); public CountTriggerWithTimeout(int maxCount, TimeCharacteristic timeType) { this.maxCount = maxCount; this.timeType = timeType; } private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; } //进入窗口的每个元素都会调用该方法。 @Override public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); countState.add(1L); if (countState.get() >= maxCount) { logger.info("fire with count: " + countState.get()); return fireAndPurge(window, ctx); } if (timestamp >= window.getEnd()) { logger.info("fire with tiem: " + timestamp); return fireAndPurge(window, ctx); } else { return TriggerResult.CONTINUE; } } //ProcessingTime触发的时候会被调用。 @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if (timeType != TimeCharacteristic.ProcessingTime) { return TriggerResult.CONTINUE; } if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { logger.info("fire with process tiem: " + time); return fireAndPurge(window, ctx); } } //EventTime 事件触发的时候被调用。 @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if (timeType != TimeCharacteristic.EventTime) { return TriggerResult.CONTINUE; } if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { logger.info("fire with event tiem: " + time); return fireAndPurge(window, ctx); } } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor); countState.clear(); } /** * 计数方法 */ class Sum implements ReduceFunction<Long> { @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } //todo main方法执行 public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> streams = env.fromElements("aaa"); streams.timeWindowAll(Time.seconds(10)) .trigger( new CountTriggerWithTimeout(1000, TimeCharacteristic.ProcessingTime) ) //todo 做业务操作 .process(new XxxxWindowProcessFunction()) //todo 输出 .addSink(new XxxSinkFunction()) .name("Xxx"); try { env.execute("aaa"); } catch (Exception e) { e.printStackTrace(); } } }
最后
以上就是能干蜗牛为你收集整理的实战理解之Flink Trigger的全部内容,希望文章能够帮你解决实战理解之Flink Trigger所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复