概述
需求,滑动窗口统计,keyby下过来一条就触发窗口统计,如果没消息过来,按60s触发一次窗口。
只能自定义Trigger
直接上代码
package com.tc.flink.demo.stream;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tc.flink.conf.KafkaConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class WindowCountOrTimeTrigger {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Properties propsConsumer = new Properties();
propsConsumer.setProperty("bootstrap.servers", KafkaConfig.KAFKA_BROKER_LIST);
propsConsumer.setProperty("group.id", "trafficwisdom-streaming");
propsConsumer.put("enable.auto.commit", false);
propsConsumer.put("max.poll.records", 1000);
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<String>("topic-test", new SimpleStringSchema(), propsConsumer);
consumer.setStartFromLatest();
DataStream<String> stream = env.addSource(consumer);
stream.print();
DataStream<Tuple2<String, Integer>> exposure = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(value);
String itemId = jsonObject.getString("itemId");
return new Tuple2<String, Integer>(itemId, 1);
} catch (Exception e) {
return Tuple2.of(null, null);
}
}
}).filter(tuple2 -> tuple2.f0 != null);
DataStream<Tuple2<String, Integer>> result = exposure.keyBy(0).timeWindow(Time.minutes(5)).trigger(TimeCountTrigger.of(1, Time.minutes(1))).sum(1);
result.print();
env.execute();
}
}
自定义的Trigger
package com.tc.flink.demo.stream;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
* A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
@PublicEvolving
public class TimeCountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final long interval;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private final ReducingStateDescriptor<Long> timeStateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
private TimeCountTrigger(long maxCount, long interval) {
this.maxCount = maxCount;
this.interval = interval;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
System.out.println("onMaxCount ....");
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
}
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("onProcessingTime ....");
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
System.out.println("clear ....");
ctx.getPartitionedState(stateDesc).clear();
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeStateDesc);
long timestamp = fireTimestamp.get();
ctx.deleteProcessingTimeTimer(timestamp);
fireTimestamp.clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
ctx.mergePartitionedState(timeStateDesc);
}
@Override
public String toString() {
return "TimeCountTrigger(" + maxCount + ")";
}
/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> TimeCountTrigger<W> of(long maxCount, Time interval) {
return new TimeCountTrigger<>(maxCount, interval.toMilliseconds());
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
private static class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
}
模拟发消息
{"action":"exposure","itemId":"1cdlTUJUCidYgcUQALhCpg==","time":"2019-11-06 16:01:00"}
{"action":"exposure","itemId":"LXFMRmnKqM7JiV75KQt+GQ==","time":"2019-11-06 16:02:00"}
{"action":"exposure","itemId":"LXFMRmnKqM7JiV75KQt+GQ==","time":"2019-11-06 16:03:00"}
统计结果这个样子的
3> {"action":"exposure","itemId":"1cdlTUJUCidYgcUQALhCpg==","time":"2019-11-06 16:01:00"}
onMaxCount ....
1> (1cdlTUJUCidYgcUQALhCpg==,1)
onProcessingTime ....
1> (1cdlTUJUCidYgcUQALhCpg==,1)
3> {"action":"exposure","itemId":"LXFMRmnKqM7JiV75KQt+GQ==","time":"2019-11-06 16:02:00"}
onMaxCount ....
4> (LXFMRmnKqM7JiV75KQt+GQ==,1)
onProcessingTime ....
4> (LXFMRmnKqM7JiV75KQt+GQ==,1)
onProcessingTime ....
1> (1cdlTUJUCidYgcUQALhCpg==,1)
3> {"action":"exposure","itemId":"LXFMRmnKqM7JiV75KQt+GQ==","time":"2019-11-06 16:03:00"}
onMaxCount ....
4> (LXFMRmnKqM7JiV75KQt+GQ==,2)
onProcessingTime ....
onProcessingTime ....
1> (1cdlTUJUCidYgcUQALhCpg==,1)
4> (LXFMRmnKqM7JiV75KQt+GQ==,2)
onProcessingTime ....
onProcessingTime ....
clear ....
clear ....
说明:TimeCountTrigger其实根据org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger和org.apache.flink.streaming.api.windowing.triggers.CountTrigger合并而来。
最后
以上就是清脆自行车为你收集整理的Flink 自定义Trigger的全部内容,希望文章能够帮你解决Flink 自定义Trigger所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复