我是靠谱客的博主 辛勤鸭子,最近开发中收集的这篇文章主要介绍Flink学习26:触发器触发器自定义触发器,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

触发器

 作用:决定何时,触发窗口计算函数,开始计算

每个窗口都有一个默认触发器,也可以自定义触发器。

自定义触发器

示例1:

当流中元素达到5个以后,触发窗口计算。

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.util.Properties

//case class StockPrice(stockId:String, timestamp: Long, price:Double)
//defined the dataSource's type
case class StockPrice(stockId:String, timeStamp:Long, price:Double)

object trigger {

  def main(args: Array[String]): Unit = {

    //create env
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //set parallelism
    env.setParallelism(1)

    //set process time
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

        //for kafka connection
        val kafkaProps = new Properties()

        //kafka's attribute
        kafkaProps.setProperty("bootstrap.servers","10.10.10.162:9092")

        //set the consumer's group
        kafkaProps.setProperty("group.id","gksk-bigdata")

        //create the consumer
        val kafkaSource = new FlinkKafkaConsumer[String]("stockPrice", new SimpleStringSchema, kafkaProps)

        //set offset
        kafkaSource.setStartFromEarliest()

        //auto commit offset
        kafkaSource.setCommitOffsetsOnCheckpoints(true)

        //band data source
        val ds = env.addSource(kafkaSource)

        val stockPriceStream = ds.map(s => s.split(","))
          .map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))

//    //create ds
//    val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23), StockPrice("stock3", 10, 888.23))
//
//    val ds = env.fromCollection(pricesList)
//    //    ds.print()


    val sumedStream = stockPriceStream.keyBy(s => s.stockId)
      .timeWindow(Time.seconds(10))
      .trigger(new MyTrigger(5))
      .reduce((s1, s2) => StockPrice(s1.stockId, s1.timeStamp, s1.price + s2.price))

    sumedStream.print()

    env.execute()

  }


  class MyTrigger extends Trigger[StockPrice, TimeWindow] {

    //to receive the para
    def this(maxCount:Int){
      this()
      this.maxCount = maxCount
    }

    //declare ( if reach max num ,then trigger windows)
    private var maxCount:Long = _

    //get trigger's state
    private lazy val countStateDescriptor = new ReducingStateDescriptor[Long]("count", new Sum, classOf[Long])

    //override on element
    override def onElement(t: StockPrice, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {

      //get the trigger's state
      val countState = triggerContext.getPartitionedState(countStateDescriptor)

      //state add
      countState.add(1L)

      //judge state more than max trigger num
      if(countState.get() >= this.maxCount){

        //reach max num,then clear and trigger window compute
        //clear state
        countState.clear()

        //compute
        TriggerResult.FIRE

      }else{
        TriggerResult.CONTINUE
      }
    }

    override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
      //do nothing, cause we don't need deal process Time window, but need to override func
      TriggerResult.CONTINUE
    }

    override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
      //do nothing
      TriggerResult.CONTINUE
    }

    //clear the state, when window reach max num to trigger the compute
    override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
      println("@--now, window is closeed")
      triggerContext.getPartitionedState(countStateDescriptor).clear()
    }

    //update the state,
    class Sum extends ReduceFunction[Long]{
      override def reduce(t: Long, t1: Long): Long = {
        t+t1
      }
    }
  }



}

最后

以上就是辛勤鸭子为你收集整理的Flink学习26:触发器触发器自定义触发器的全部内容,希望文章能够帮你解决Flink学习26:触发器触发器自定义触发器所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部