Foreach详解
Foreach操作允许我们对输出数据做任何操作。使用该操作必须要实现ForeachWriter接口,该接口的方法会在每次触发有结果数据输出的时候调用。后面会给出详细的说明。这里先说一下注意事项。
- writer必须是可序列化的,因为他会被序列化然后发送到executor执行。
- open,process,close这三个方法都会在executor端调用。
- 仅当open函数被调用的时候,writer就必须进行初始化(例如,打开链接,开启事务等)。请注意,如果有初始化是在对象创建的时候做的,那么该初始化会在driver端执行,因为对象是在driver端创建的,这个可能会超出你的预期。
- version和partition是open的两个参数,该参数唯一代表了一组需要输出的行。Version是一个随着每次triger单调递增的ID。Partition是代表输出分区的id,因为输出是分布式的,会被多个executor执行。
- open函数可以根据version和partition来决定是否写改组行序列。相应的他能够返回true(继续写)和false(无需写)。一旦返回false,process函数就不会处理任何行。举个简单的例子,假设某次触发出现了部分分区失败,但是部分分区已经提交了数据到数据库。这个时候就可以根据存储在数据库的元数据,writer就可以识别已经提交的分区然后open函数返回false跳过该次提交。
- 无论何时open函数被调用,close也会相应的被调用,除非jvm由于错误异常退出。即使open返回 false,该规则也不变。如果在处理和写数据的时候出现了错误,close也将会出错。清除open中创建的状态,比如链接,事务等就是你的责任了,要保证没有资源泄漏。
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
607.package bigdata.spark.StructuredStreaming import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.sql.{ForeachWriter, SparkSession} class TestForeachWriter extends ForeachWriter[String] { override def open(partitionId: Long, version: Long): Boolean = { println("partitionId ==========> "+partitionId) println("version ==============> "+version) true } override def process(value: String): Unit = { println(value) } override def close(errorOrNull: Throwable): Unit = { } } object TestForeachSink { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]") .set("yarn.resourcemanager.hostname", "mt-mdh.local") .set("spark.executor.instances","2") .set("spark.default.parallelism","4") .set("spark.sql.shuffle.partitions","4") .setJars(List("/Users/meitu/Desktop/sparkjar/bigdata.jar" ,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar" ,"/opt/jars/kafka-clients-0.10.2.2.jar" ,"/opt/jars/kafka_2.11-0.10.2.2.jar" ,"/opt/jars/spark-sql-kafka-0-10_2.11-2.0.2.jar")) val spark = SparkSession .builder .appName("StructuredKafkaWordCount") .config(sparkConf) .getOrCreate() val input = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "mt-mdh.local:9093") .option("subscribe", "StructuredSource1") .load() import spark.implicits._ input.groupBy().count().map(_.toString) .writeStream .outputMode(OutputMode.Update()) .foreach(new TestForeachWriter()) .start() .awaitTermination() } }
最后
以上就是不安超短裙最近收集整理的关于(7)Foreach详解的全部内容,更多相关(7)Foreach详解内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复