1. Trigger的定义
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
* Called for every element that gets added to a pane. The result of this will determine whether
* the pane is evaluated to emit results.
* 添加到窗口的每个元素都会调用该方法
* @param element The element that arrived.
* @param timestamp The timestamp of the element that arrived.
* @param window The window to which the element is being added.
* @param ctx A context object that can be used to register timer callbacks.
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
throws Exception;
* Called when a processing-time timer that was set using the trigger context fires.
* 当注册的ProcessTime的定时器触发时调用该方法
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception;
* Called when an event-time timer that was set using the trigger context fires.
* 当注册的EventTime的定时器触发时调用该方法
* @param time The timestamp at which the timer fired.
* @param window The window for which the timer fired.
* @param ctx A context object that can be used to register timer callbacks.
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception;
* Returns true if this trigger supports merging of trigger state and can therefore be used with
* a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
* 判断该Trigger是否支持合并触发器状态,如果支持则返回true,否则返回false
* <p>If this returns {@code true} you must properly implement {@link #onMerge(Window,
* OnMergeContext)}
public boolean canMerge() {
return false;
* Called when several windows have been merged into one window by the {@link
* org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
* 与有状态触发器相关,并在两个触发器对应的窗口合并时合并它们的状态,例如在使用会话窗口时
* @param window The new window that results from the merge.
* @param ctx A context object that can be used to register timer callbacks and access state.
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
* Clears any state that the trigger might still hold for the given window. This is called when
* a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} and
* {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as well as
* state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}.
* 执行删除相应窗口时所需的任何操作(一般是删除定义的状态、定时器等)
public abstract void clear(W window, TriggerContext ctx) throws Exception;
:基于DeltaFunction和阈值Trigger的触发器,此触发器计算最后触发的数据点与当前到达的数据点之间的增量。如果 delta 高于指定的阈值,它就会触发。
2. EventTimeTrigger的实现
* A {@link Trigger} that fires once the watermark passes the end of the window to which a pane
* belongs.
* @see org.apache.flink.streaming.api.watermark.Watermark
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
public boolean canMerge() {
return true;
public void onMerge(TimeWindow window, OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
public String toString() {
return "EventTimeTrigger()";
* Creates an event-time trigger that fires once the watermark passes the end of the window.
* <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
* trigger window evaluation with just this one element.
public static EventTimeTrigger create() {
return new EventTimeTrigger();
public enum TriggerResult {
* No action is taken on the window.
* 对窗口不做任何处理
CONTINUE(false, false),
* {@code FIRE_AND_PURGE} evaluates the window function and emits the window result.
* 对窗口进行计算,并清除窗口中的数据
FIRE_AND_PURGE(true, true),
* On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,
* though, all elements are retained.
* 对窗口进行计算,但不会清除数据
FIRE(true, false),
* All elements in the window are cleared and the window is discarded, without evaluating the
* window function or emitting any elements.
* 对窗口不做计算,但是会清除窗口中的数据
PURGE(false, true);
// ------------------------------------------------------------------------
private final boolean fire;
private final boolean purge;
TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
public boolean isFire() {
return fire;
public boolean isPurge() {
return purge;
3. 简单案例
1. 数据格式
2. UserBehavior
* @author Yankee
* @program IntelliJ IDEA
* @description JavaBean
* @since 2021/7/16
public class UserBehavior {
* 用户Id
private Long userId;
* itemId
private Long itemId;
* 分类Id
private Integer categoryId;
* 用户行为
private String behavior;
* 时间戳
private Long timestamp;
public UserBehavior() {}
public UserBehavior(
Long userId,
Long itemId,
Integer categoryId,
String behavior,
Long timestamp) {
this.userId = userId;
this.itemId = itemId;
this.categoryId = categoryId;
this.behavior = behavior;
this.timestamp = timestamp;
public Long getUserId() {
return userId;
public void setUserId(Long userId) {
this.userId = userId;
public Long getItemId() {
return itemId;
public void setItemId(Long itemId) {
this.itemId = itemId;
public Integer getCategoryId() {
return categoryId;
public void setCategoryId(Integer categoryId) {
this.categoryId = categoryId;
public String getBehavior() {
return behavior;
public void setBehavior(String behavior) {
this.behavior = behavior;
public Long getTimestamp() {
return timestamp;
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
public String toString() {
return "UserBehavior{" +
"userId=" + userId +
", itemId=" + itemId +
", categoryId=" + categoryId +
", behavior='" + behavior + ''' +
", timestamp=" + timestamp +
3. UserVisitorCount
* @author Yankee
* @version 1.0
* @description JavaBean
* @date 2021/12/29 11:24
public class UserVisitorCount {
private String uv;
private String time;
private Integer count;
public UserVisitorCount() {}
public UserVisitorCount(String uv, String time, Integer count) {
this.uv = uv;
this.time = time;
this.count = count;
public String getUv() {
return uv;
public void setUv(String uv) {
this.uv = uv;
public String getTime() {
return time;
public void setTime(String time) {
this.time = time;
public Integer getCount() {
return count;
public void setCount(Integer count) {
this.count = count;
public String toString() {
return "UserVisitorCount{" +
"uv='" + uv + ''' +
", time='" + time + ''' +
", count=" + count +
4. 自定义ProcessWindowFunction实现计算逻辑
public class UservistorProcessWindowFunction extends ProcessWindowFunction<UserBehavior, UserVisitorCount, String, TimeWindow> {
public void process(
String s,
ProcessWindowFunction<UserBehavior, UserVisitorCount, String, TimeWindow>.Context context,
Iterable<UserBehavior> elements,
Collector<UserVisitorCount> out) throws Exception {
// 定义HashSet用于去重,这块可以考虑使用bloomfilter代替
HashSet<Long> uids = new HashSet<>();
// 获取数据
for (UserBehavior element : elements) {
// 输出
out.collect(new UserVisitorCount("uv",
new Timestamp(context.window().getEnd()).toString(), uids.size()));
5. 自定义触发器实现需求逻辑
* @Description 自定义Trigger
* @Date 2022/4/17 22:44
* @Author yankee
public class UservistorTrigger<W extends Window> extends Trigger<Object, W> {
private static final Logger LOG = LoggerFactory.getLogger(UservistorTrigger.class);
// 定义时间时间
private final long interval;
private final long maxCount;
// 定义一个状态描述器:时间和数量
private final ReducingStateDescriptor<Long> intervalDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
private final ReducingStateDescriptor<Long> maxCountDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private UservistorTrigger(long interval, long maxCount) {
this.interval = interval;
this.maxCount = maxCount;
* 创建自定义触发器
* @param interval 时间间隔
* @param maxCount 数量间隔
* @return {@link UservistorTrigger}
public static <W extends Window> UservistorTrigger<W> of(Time interval, long maxCount) {
return new UservistorTrigger<>(interval.toMilliseconds() - 1, maxCount);
public TriggerResult onElement(
Object element,
long timestamp,
W window,
TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// 当watermark越过窗口结束时间,触发计算
return TriggerResult.FIRE_AND_PURGE;
} else {
// 注册窗口关闭的定时器
// 获取状态
ReducingState<Long> intervalState = ctx.getPartitionedState(intervalDesc);
ReducingState<Long> maxCountState = ctx.getPartitionedState(maxCountDesc);
// 数量间隔的状态直接+1
// 判断存储时间间隔的状态是否为空,如果为空则注册下一个定时器
if (intervalState.get() == null) {
registerNextTimer(timestamp, window, ctx, intervalState);
if (maxCountState.get() >= maxCount) {
// 清除状态并触发计算
return TriggerResult.FIRE;
return TriggerResult.CONTINUE;
public TriggerResult onProcessingTime(
long time,
W window,
TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
public TriggerResult onEventTime(
long time,
W window,
TriggerContext ctx) throws Exception {
if (time == window.maxTimestamp()) {
// 触发计算并清除窗口数据
return TriggerResult.FIRE_AND_PURGE;
// 获取maxCount状态
ReducingState<Long> maxCountState = ctx.getPartitionedState(maxCountDesc);
Long count = maxCountState.get();
// 获取interval状态
ReducingState<Long> intervalState = ctx.getPartitionedState(intervalDesc);
Long interval = intervalState.get();
if (interval != null && interval == time) {
// 注册下一个定时器
registerNextTimer(time, window, ctx, intervalState);
// 判断如果此时count已经触发,则定时器不触发计算
if (count != null) {
LOG.info("定时器: {}, 触发计算。", new Timestamp(time));
return TriggerResult.FIRE;
return TriggerResult.CONTINUE;
public void clear(W window, TriggerContext ctx) throws Exception {
// 清除数量间隔的状态
// 时间间隔的状态涉及定时器,所以也要删除定时器
ReducingState<Long> intervalState = ctx.getPartitionedState(intervalDesc);
Long timer = intervalState.get();
if (timer != null) {
public boolean canMerge() {
return true;
public void onMerge(W window, OnMergeContext ctx) throws Exception {
// 合并数量间隔的状态
// 合并时间间隔的状态
Long interval = ctx.getPartitionedState(intervalDesc).get();
if (interval != null) {
public String toString() {
return "UservistorTrigger(" + interval + ", " + maxCount + ")";
private static class Min implements ReduceFunction<Long> {
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
private static class Sum implements ReduceFunction<Long> {
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
* 注册下一个定时器
* @param time 时间
* @param window 窗口类型
* @param ctx {@link TriggerContext}
* @param intervalState {@link ReducingState}状态
* @throws Exception
private void registerNextTimer(
long time, W window, TriggerContext ctx,
ReducingState<Long> intervalState) throws Exception {
// 在下一个timer时间和窗口结束时间取最小值
long nextTimer = Math.min(time + interval, window.maxTimestamp());
LOG.info("注册定时器:{}", new Timestamp(nextTimer));
6. 总体实现逻辑
* @Description 计算24小时内UserVisitor,每隔1000条或者30分钟输出一次
* @Date 2022/4/15 09:12
* @Author yankee
public class UserVistorInOneDayByCountAndTime {
private static final Logger LOG = LoggerFactory.getLogger(UserVistorInOneDayByCountAndTime.class);
public static void main(String[] args) throws Exception {
// 获取流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
// kafka配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "uservistor");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 从kafka读取数据
DataStreamSource<String> uservistor = env.addSource(new FlinkKafkaConsumer<String>(
new SimpleStringSchema(),
// 提取数据中的时间作为EventTime
WatermarkStrategy<UserBehavior> userBehaviorWatermarkStrategy = WatermarkStrategy
.withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
public long extractTimestamp(UserBehavior element, long recordTimestamp) {
return element.getTimestamp() * 1000;
// 加工数据并输出
.map(new MapFunction<String, UserBehavior>() {
public UserBehavior map(String value) throws Exception {
String[] datas = value.split(",");
return new UserBehavior(Long.parseLong(datas[0]), Long.parseLong(datas[1]),
Integer.parseInt(datas[2]), datas[3], Long.parseLong(datas[4]));
}).filter(data -> "pv".equals(data.getBehavior()))
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(UservistorTrigger.of(Time.minutes(30), 10000L))
.process(new UservistorProcessWindowFunction())
// 提交执行
发表评论 取消回复