概述
1、需求:统计每天的人员出现次数,按天开窗,并且每10s进行一次输出。
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Duration;
import java.util.Random;
public class TriggerTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(100l);
env.addSource(new SourceFunction<Tuple2<String, Long>>() {
boolean flag = true;
@Override
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
String[] s = {"张三", "王五", "李四", "秋英"};
while (flag) {
Thread.sleep(1000);
int i = new Random().nextInt(4);
ctx.collect(new Tuple2<String, Long>(s[i], System.currentTimeMillis()));
}
}
@Override
public void cancel() {
flag = false;
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String,Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
})).map(new MapFunction<Tuple2<String, Long>, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(Tuple2<String, Long> stringLongTuple2) throws Exception {
System.out.println(stringLongTuple2.f0 + stringLongTuple2.f1);
return new Tuple3<String, Long, Integer>(stringLongTuple2.f0,stringLongTuple2.f1,1);
}
}).keyBy(new KeySelector<Tuple3<String, Long, Integer>, String>() {
@Override
public String getKey(Tuple3<String, Long, Integer> stringLongIntegerTuple3) throws Exception {
return stringLongIntegerTuple3.f0;
}
}).window(TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8)))
.trigger(new Trigger<Tuple3<String, Long, Integer>, TimeWindow>() {
// 10s触发一次
private final long interval = 10 * 1000l;
//每次调用add()方法,就调用一次MinTime.reduce()方法,ReducingState也可以换成ValueState,但是要把MinTime.reduce()方法体写到onElement()和onEventTime()方法体中
private final ReducingStateDescriptor<Long> lastTimeStateDescoriptor = new ReducingStateDescriptor<Long>("lastTime",new MinTime(),Long.class);
// 每个元素都执行该方法
@Override
public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// 水位大于等于该窗口最后时间戳时触发计算
if(window.maxTimestamp() <= ctx.getCurrentWatermark()){
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
}
ReducingState<Long> lastTimeState = ctx.getPartitionedState(lastTimeStateDescoriptor);
Long lastTime = lastTimeState.get();
// 第一条数据进入时执行
if(lastTime == null){
// 当前10s触发时段的开始时间戳
Long start = timestamp - (timestamp % interval);
// 当前10s触发时段的触发时间
Long nextTimer = start + interval;
ctx.registerEventTimeTimer(nextTimer);
// 这里触发ReducingState -> MinTime.class -> reduce()
lastTimeState.add(nextTimer);
}
return TriggerResult.CONTINUE;
}
// 处理时间触发
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
// 事件时间触发
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
// 判断是否最后触发窗口计算
if(time == window.maxTimestamp()){
return TriggerResult.FIRE;
}
// 10s触发时
ReducingState<Long> lastTimeState = ctx.getPartitionedState(lastTimeStateDescoriptor);
Long lastTime = lastTimeState.get();
Long nextTimer = lastTime + interval;
if(time == lastTime){
// 此处可以保证如果10s内无数据,依然10s触发一次
lastTimeState.clear();
lastTimeState.add(nextTimer);
ctx.registerEventTimeTimer(nextTimer);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> lastTimeState = ctx.getPartitionedState(lastTimeStateDescoriptor);
Long lastTime = lastTimeState.get();
if(lastTime != null){
ctx.deleteEventTimeTimer(lastTime);
lastTimeState.clear();
}
}
}).reduce(new ReduceFunction<Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> reduce(Tuple3<String, Long, Integer> stringLongIntegerTuple3, Tuple3<String, Long, Integer> t1) throws Exception {
return new Tuple3<String, Long, Integer>(stringLongIntegerTuple3.f0,stringLongIntegerTuple3.f1,stringLongIntegerTuple3.f2 + t1.f2);
}
}).print();
env.execute("trigger");
}
private static class MinTime implements ReduceFunction<Long>{
@Override
public Long reduce(Long aLong, Long t1) throws Exception {
// 合并时,将所有fire时间戳中最低的一个作为新的fire时间戳
return Math.min(aLong,t1);
}
}
}
以上代码可以使用.trigger(ContinuousEventTimeTrigger.of(Time.seconds(10))),效果一样。
最后
以上就是糟糕星星为你收集整理的Flink Trigger实例的全部内容,希望文章能够帮你解决Flink Trigger实例所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复