我是靠谱客的博主 不安超短裙,最近开发中收集的这篇文章主要介绍(7)Foreach详解,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Foreach详解

Foreach操作允许我们对输出数据做任何操作。使用该操作必须要实现ForeachWriter接口,该接口的方法会在每次触发有结果数据输出的时候调用。后面会给出详细的说明。这里先说一下注意事项。

  1. writer必须是可序列化的,因为他会被序列化然后发送到executor执行。
  2. open,process,close这三个方法都会在executor端调用。
  3. 仅当open函数被调用的时候,writer就必须进行初始化(例如,打开链接,开启事务等)。请注意,如果有初始化是在对象创建的时候做的,那么该初始化会在driver端执行,因为对象是在driver端创建的,这个可能会超出你的预期。
  4. version和partition是open的两个参数,该参数唯一代表了一组需要输出的行。Version是一个随着每次triger单调递增的ID。Partition是代表输出分区的id,因为输出是分布式的,会被多个executor执行。
  5. open函数可以根据version和partition来决定是否写改组行序列。相应的他能够返回true(继续写)和false(无需写)。一旦返回false,process函数就不会处理任何行。举个简单的例子,假设某次触发出现了部分分区失败,但是部分分区已经提交了数据到数据库。这个时候就可以根据存储在数据库的元数据,writer就可以识别已经提交的分区然后open函数返回false跳过该次提交。
  6. 无论何时open函数被调用,close也会相应的被调用,除非jvm由于错误异常退出。即使open返回 false,该规则也不变。如果在处理和写数据的时候出现了错误,close也将会出错。清除open中创建的状态,比如链接,事务等就是你的责任了,要保证没有资源泄漏。
7.package bigdata.spark.StructuredStreaming

import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{ForeachWriter, SparkSession}


class TestForeachWriter extends ForeachWriter[String] {

  override def open(partitionId: Long, version: Long): Boolean = {
    println("partitionId ==========> "+partitionId)
    println("version ==============> "+version)
    true
  }

  override def process(value: String): Unit = {
    println(value)
  }

  override def close(errorOrNull: Throwable): Unit = {

  }
}

object TestForeachSink {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
      .set("yarn.resourcemanager.hostname", "mt-mdh.local")
      .set("spark.executor.instances","2")
      .set("spark.default.parallelism","4")
      .set("spark.sql.shuffle.partitions","4")
      .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
        ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
        ,"/opt/jars/kafka-clients-0.10.2.2.jar"
        ,"/opt/jars/kafka_2.11-0.10.2.2.jar"
        ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))


    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .config(sparkConf)
      .getOrCreate()

    val input = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "mt-mdh.local:9093")
      .option("subscribe", "StructuredSource1")
      .load()

    import spark.implicits._
    input.groupBy().count().map(_.toString)
      .writeStream
      .outputMode(OutputMode.Update())
      .foreach(new TestForeachWriter())
      .start()
      .awaitTermination()
  }
}

 

最后

以上就是不安超短裙为你收集整理的(7)Foreach详解的全部内容,希望文章能够帮你解决(7)Foreach详解所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(70)

评论列表共有 0 条评论

立即
投稿
返回
顶部