我是靠谱客的博主 斯文玫瑰,最近开发中收集的这篇文章主要介绍Trigger(触发器),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Trigger(触发器)

流式查询的触发器定义了流式数据处理的时间, 流式查询根据触发器的不同, 可以是根据固定的批处理间隔进行微批处理查询, 也可以是连续的查询.

Trigger TypeDescription
unspecified (default)没有显示的设定触发器, 表示使用 micro-batch mode, 尽可能快的处理每个批次的数据. 如果无数据可用, 则处于阻塞状态, 等待数据流入
Fixed interval micro-batches 固定周期的微批处理查询会在微批处理模式下执行, 其中微批处理将以用户指定的间隔执行. 1. 如果以前的微批处理在间隔内完成, 则引擎会等待间隔结束, 然后开启下一个微批次 2. 如果前一个微批处理在一个间隔内没有完成(即错过了间隔边界), 则下个微批处理会在上一个完成之后立即启动(不会等待下一个间隔边界) 3. 如果没有新数据可用, 则不会启动微批次. 适用于流式数据的批处理作业
One-time micro-batch 一次性微批次查询将在所有可用数据上执行一次微批次处理, 然后自行停止. 如果你希望定期启动集群, 然后处理集群关闭期间产生的数据, 然后再关闭集群. 这种情况下很有用. 它可以显著的降低成本. 一般用于非实时的数据分析
Continuous with fixed checkpoint interval (experimental 2.3 引入) 连续处理以超低延迟处理数据
// 1. 默认触发器
val query: StreamingQuery = df.writeStream
    .outputMode("append")
    .format("console")
    .start()
// 2. 微批处理模式
val query: StreamingQuery = df.writeStream
        .outputMode("append")
        .format("console")
        .trigger(Trigger.ProcessingTime("2 seconds"))
        .start

// 3. 只处理一次. 处理完毕之后会自动退出
val query: StreamingQuery = df.writeStream
        .outputMode("append")
        .format("console")
        .trigger(Trigger.Once())
        .start()

// 4. 持续处理
val query: StreamingQuery = df.writeStream
    .outputMode("append")
    .format("console")
    .trigger(Trigger.Continuous("1 seconds"))
    .start

连续处理模式(Continuous processing)

连续处理是2.3 引入, 它可以实现低至 1ms 的处理延迟. 并实现了至少一次(at-least-once)的语义.

微批处理模式虽然实现了严格一次(exactly-once)的语义, 但是最低有 100ms 的延迟.

对有些类型的查询, 可以切换到这个模式, 而不需要修改应用的逻辑.(不用更改 df/ds 操作)

若要切换到连续处理模式, 只需要更改触发器即可.

spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .option("")

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()

连续处理模式支持的查询

  1. 操作: 支持 select, map, flatMap, mapPartitions, etc. 和 selections (where, filter, etc.). 不支持聚合操作
  2. 数据源:
    • kafka 所有选项都支持
    • rate source
  3. sink
    • 所有的 kafka 参数都支持
    • memory sink
    • console sink

最后

以上就是斯文玫瑰为你收集整理的Trigger(触发器)的全部内容,希望文章能够帮你解决Trigger(触发器)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部