概述
触发器定义了window何时会被求值以及何时发送求值结果。触发器可以到了特定的时间触发也可以碰到特定的事件触发。例如:观察到事件数量符合一定条件或者观察到了特定的事件。
默认的触发器将会在两种情况下触发
处理时间:机器时间到达处理时间
事件时间:水位线超过了窗口的结束时间
触发器可以访问流的时间属性以及定时器,还可以对state状态编程。所以触发器和process function一样强大。例如我们可以实现一个触发逻辑:当窗口接收到一定数量的元素时,触发器触发。再比如当窗口接收到一个特定元素时,触发器触发。还有就是当窗口接收到的元素里面包含特定模式(5秒钟内接收到了两个同样类型的事件),触发器也可以触发。在一个事件时间的窗口中,一个自定义的触发器可以提前(在水位线没过窗口结束时间之前)计算和发射计算结果。这是一个常见的低延迟计算策略,尽管计算不完全,但不像默认的那样需要等待水位线没过窗口结束时间。
每次调用触发器都会产生一个TriggerResult来决定窗口接下来发生什么。TriggerResult可以取以下结果:
CONTINUE:什么都不做
FIRE:如果window operator有ProcessWindowFunction这个参数,将会调用这个ProcessWindowFunction。如果窗口仅有增量聚合函数(ReduceFunction或者AggregateFunction)作为参数,那么当前的聚合结果将会被发送。窗口的state不变。
PURGE:窗口所有内容包括窗口的元数据都将被丢弃。
FIRE_AND_PURGE:先对窗口进行求值,再将窗口中的内容丢弃。
TriggerResult可能的取值使得我们可以实现很复杂的窗口逻辑。一个自定义触发器可以触发多次,可以计算或者更新结果,可以在发送结果之前清空窗口。
接下来我们看一下Trigger API:
public abstract class Trigger<T, W extends Window>
implements Serializable {
TriggerResult onElement(
long timestamp,
W window,
TriggerContext ctx);
public abstract TriggerResult onProcessingTime(
long timestamp,
W window,
TriggerContext ctx);
public abstract TriggerResult onEventTime(
long timestamp,
W window,
TriggerContext ctx);
public boolean canMerge();
public void onMerge(W window, OnMergeContext ctx);
public abstract void clear(W window, TriggerContext ctx);
}
public interface TriggerContext {
long getCurrentProcessingTime();
long getCurrentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
<S extends State> S getPartitionedState(
StateDescriptor<S, ?> stateDescriptor);
}
public interface OnMergeContext extends TriggerContext {
void mergePartitionedState(
StateDescriptor<S, ?> stateDescriptor
);
}
这里要注意两个地方:清空state和merging合并触发器。
当在触发器中使用per-window state时,这里我们需要保证当窗口被删除时state也要被删除,否则随着时间的推移,window operator将会积累越来越多的数据,最终可能使应用崩溃。
当窗口被删除时,为了清空所有状态,触发器的clear()方法需要需要删掉所有的自定义per-window state,以及使用TriggerContext对象将处理时间和事件时间的定时器都删除。
下面的例子展示了一个触发器在窗口结束时间之前触发。当第一个事件被分配到窗口时,这个触发器注册了一个定时器,定时时间为水位线之前一秒钟。当定时事件执行,将会注册一个新的定时事件,这样,这个触发器每秒钟最多触发一次。
自定义实现: 通过触发器在15秒的窗口内每秒触发一次计算
import com.atguigu.StreamingJob.{SensorReading, SensorSource}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
case class SensorReading(id:String,timestamp :Long,temperature : Double)
object TriggersTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度
env.setParallelism(1)
//设置事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//获取事件源
val stream = env.addSource(new SensorSource)
val setTime = stream.assignAscendingTimestamps(_.timestamp)
//设置事件时间的获取方式
val result = setTime.keyBy(_.id).timeWindow(Time.seconds(15)).trigger(new OneSecondIntervalTrigger ).process(new AllWindom)
result.print()
env.execute()
}
//创建窗口的全量函数 in out key windom
class AllWindom extends ProcessWindowFunction[SensorReading,(String,Double,Double,Long),String,TimeWindow]{
override def process(key: String,
context: Context,
elements: Iterable[SensorReading],
out: Collector[(String, Double, Double,Long)]): Unit = {
var doubles: Iterable[Double] = elements.map(_.temperature)
out.collect( (key,doubles.max,doubles.min,context.window.getEnd))
}
}
}
//设置一秒钟一次的触发器
class OneSecondIntervalTrigger extends Trigger[SensorReading , TimeWindow]{
//回调函数
override def onEventTime(l: Long, //触发定时器的时间,即前文设置的定时时间,默认窗口结束时会调用一次
w: TimeWindow, //窗口
triggerContext: Trigger.TriggerContext): TriggerResult = {
//判断l是否为窗口的结束时间
if(l==w.getEnd){
//触发窗口的计算,并且清空数据
print("=============================")
TriggerResult.FIRE_AND_PURGE
}else{
val t = triggerContext.getCurrentWatermark+(1000-(triggerContext.getCurrentWatermark%1000))
if(t<w.getEnd){
triggerContext.registerEventTimeTimer(t)
}
}
//支触发计算
TriggerResult.FIRE
}
//这是系统时间的 ,不执行业务逻辑
override def onProcessingTime(l: Long,
w: TimeWindow,
triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
//每个窗口的结束时调用
override def clear(w: TimeWindow,
triggerContext: Trigger.TriggerContext): Unit = {
val firstSeen: ValueState[Boolean] = triggerContext
.getPartitionedState(
new ValueStateDescriptor[Boolean](
"firstSeen", classOf[Boolean]
)
)
//注销之前设置的事件
firstSeen.clear()
}
//每个数据调用一次
override def onElement(t: SensorReading,
l: Long,
w: TimeWindow,
triggerContext: Trigger.TriggerContext): TriggerResult = {
//获取分区的状态变量
var firstSeen: ValueState[Boolean] = triggerContext.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen", Types.of[Boolean]))
//每个窗口的第一个数据才会进入到里面设置回调事件的事件
if(!firstSeen.value()){
//获取当前水位线
val t = triggerContext.getCurrentWatermark+(1000-(triggerContext.getCurrentWatermark%1000))
//注册事件时间的回调事件,注册下一秒的事件
triggerContext.registerEventTimeTimer(t)
//注册窗口结束时的事件
triggerContext.registerEventTimeTimer(w.getEnd)
//关闭时间的注册,保证每一秒内的事件不重复注册
firstSeen.update(true)
}
TriggerResult.CONTINUE
}
最后
以上就是喜悦人生为你收集整理的Flink的触发器Trigger介绍和使用的全部内容,希望文章能够帮你解决Flink的触发器Trigger介绍和使用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复