概述
1. 介绍
本文主要介绍开窗后自定义窗口触发器,触发器触发包括两部分:
1.当窗口内的数据量达到一定数量后触发
2.当事件事件达到窗口最大时间戳后触发
2. 环境
Flink: 1.13.1
java:1.8.251
scala:2.11.12
3.Trigger类中主要方法
1)onElement() 为添加到窗格中的每个元素调用。此方法的结果将决定是否对窗格进行触发。
2)onProcessingTime() Flink使用的是处理时间语义时使用,该方法用来以时间作为条件触发窗口。
3)onEventTime() Flink使用的是事件时间语义时使用,该方法用来以时间作为条件触发窗口。
4)onMerge() 当多个窗口合并为一个窗口时调用(没用过)。
3)clear() 对当前窗口进行一些清理操作,比如状态值,定时器。
4.TriggerResult
1)CONTINUE:跳过,什么操作都不执行
2)FIRE:触发窗口并保留窗口内的数据
3)PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。
4)FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。(默认情况下,预先实现的触发器只触发而不清除窗口状态。)
5.代码实现
注册水位线及开窗
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
//此处是加载自己生成的数据源
val dataSource = env.addSource(new MyTradingSource)
//注册水位线:以交易时间作为事件时间
//注意:水位线的时间戳要求是毫秒级的,常见时间戳为秒级,需要乘1000
val keyedStream = dataSource
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[TradingObject] {
override def extractTimestamp(element: TradingObject, recordTimestamp: Long): Long = {
element.tradingTime
}
})
)//按照订单货物的类型进行分组
.keyBy(_.name)
//使用基于EventTime的滚动窗口,窗口间隔为5s
keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(new CustomTrigger)
.process(new MyProcessWindowFunc)
.print()
env.execute()
自定义Trigger及Process
class CustomTrigger extends Trigger[TradingObject, Window] {
//状态值:聚合状态,用来统计创久内的数据量
private val number = new ReducingStateDescriptor[Long]("number", new reduceFunc, classOf[Long])
override def onElement(element: TradingObject, timestamp: Long, window: Window, ctx: Trigger.TriggerContext): TriggerResult = {
//获取状态
val count = ctx.getPartitionedState(number)
//每条数据进来后状态值+1
count.add(1l)
ctx.registerEventTimeTimer(window.maxTimestamp())
//如果该窗口内的数据量>=5 则触发该窗口,状态值清空
if (count.get() >= 5) {
count.clear()
TriggerResult.FIRE_AND_PURGE
}else{
TriggerResult.CONTINUE
}
}
override def onProcessingTime(time: Long, window: Window, ctx: Trigger.TriggerContext): TriggerResult = {
//因为使用的语义时事件时间,所以处理时间语义不做任何处理
TriggerResult.CONTINUE
}
override def onEventTime(time: Long, window: Window, ctx: Trigger.TriggerContext): TriggerResult = {
//当达到窗口最大值时则触发窗口计算
if (time ==window.maxTimestamp()) TriggerResult.FIRE_AND_PURGE else TriggerResult.CONTINUE
}
override def clear(window: Window, ctx: Trigger.TriggerContext): Unit = {
//清除操作
ctx.deleteEventTimeTimer(window.maxTimestamp())
ctx.getPartitionedState(number).clear()
}
}
//聚合状态计算方式
class reduceFunc extends ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = value1 + value2
}
//自定义Process,对该窗口内的数据按照货物类型进行聚合操作
class MyProcessWindowFunc extends ProcessWindowFunction[TradingObject, String, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[TradingObject], out: Collector[String]): Unit = {
val iterator = elements.iterator
var sumPrice = 0.0
while (iterator.hasNext) {
val tradingObject = iterator.next()
sumPrice = sumPrice + tradingObject.price
//println(key, sumPrice)
}
out.collect("商品名称:" + key + " " + "总价格:" + sumPrice)
//out.collect("1111")
}
}
最后
以上就是落寞板栗为你收集整理的Flink自定义Trigger的全部内容,希望文章能够帮你解决Flink自定义Trigger所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复