概述
Foreach详解
Foreach操作允许我们对输出数据做任何操作。使用该操作必须要实现ForeachWriter接口,该接口的方法会在每次触发有结果数据输出的时候调用。后面会给出详细的说明。这里先说一下注意事项。
- writer必须是可序列化的,因为他会被序列化然后发送到executor执行。
- open,process,close这三个方法都会在executor端调用。
- 仅当open函数被调用的时候,writer就必须进行初始化(例如,打开链接,开启事务等)。请注意,如果有初始化是在对象创建的时候做的,那么该初始化会在driver端执行,因为对象是在driver端创建的,这个可能会超出你的预期。
- version和partition是open的两个参数,该参数唯一代表了一组需要输出的行。Version是一个随着每次triger单调递增的ID。Partition是代表输出分区的id,因为输出是分布式的,会被多个executor执行。
- open函数可以根据version和partition来决定是否写改组行序列。相应的他能够返回true(继续写)和false(无需写)。一旦返回false,process函数就不会处理任何行。举个简单的例子,假设某次触发出现了部分分区失败,但是部分分区已经提交了数据到数据库。这个时候就可以根据存储在数据库的元数据,writer就可以识别已经提交的分区然后open函数返回false跳过该次提交。
- 无论何时open函数被调用,close也会相应的被调用,除非jvm由于错误异常退出。即使open返回 false,该规则也不变。如果在处理和写数据的时候出现了错误,close也将会出错。清除open中创建的状态,比如链接,事务等就是你的责任了,要保证没有资源泄漏。
7.package bigdata.spark.StructuredStreaming
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{ForeachWriter, SparkSession}
class TestForeachWriter extends ForeachWriter[String] {
override def open(partitionId: Long, version: Long): Boolean = {
println("partitionId ==========> "+partitionId)
println("version ==============> "+version)
true
}
override def process(value: String): Unit = {
println(value)
}
override def close(errorOrNull: Throwable): Unit = {
}
}
object TestForeachSink {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
.set("yarn.resourcemanager.hostname", "mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"
,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar"))
val spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.config(sparkConf)
.getOrCreate()
val input = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mt-mdh.local:9093")
.option("subscribe", "StructuredSource1")
.load()
import spark.implicits._
input.groupBy().count().map(_.toString)
.writeStream
.outputMode(OutputMode.Update())
.foreach(new TestForeachWriter())
.start()
.awaitTermination()
}
}
最后
以上就是不安超短裙为你收集整理的(7)Foreach详解的全部内容,希望文章能够帮你解决(7)Foreach详解所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复