概述
Spark Streaming
def main(args: Array[String]): Unit = {
// 1. 创建 Context
val conf = new SparkConf()
.setAppName("updateStateBykey")
.setMaster("local[6]")
val ssc = new StreamingContext(conf, Seconds(1))
ssc.sparkContext.setLogLevel("ERROR")
// 2. 读取tcp数据生成 DStream,
val source = ssc.socketTextStream(
hostname = "192.168.0.242",
port = 9998,
storageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
)
// 3. 词频统计
val wordsTuple = source.flatMap(_.split(" "))
.map((_, 1))
// 4. 全局聚合,累计每个批次得结果
ssc.checkpoint("checkpoint")
def updateFunc(newValue: Seq[Int], runningValue: Option[Int]): Option[Int] = {
// newValue : 对应当前批次中 Key 对应的所有 Value
// runningValue : 当前的中间结果
val currBatchValue = newValue.sum
val state = runningValue.getOrElse(0) + currBatchValue
Some(state)
}
val result = wordsTuple.updateStateByKey[Int](updateFunc _)
// 5. 输出
result.print()
ssc.start()
ssc.awaitTermination()
}
/**
* -------------------------------------------
* Time: 1603871685000 ms
* -------------------------------------------
* (hadoop,1)
* (hello,1)
*
* -------------------------------------------
* Time: 1603871686000 ms
* -------------------------------------------
* (hadoop,1)
* (hello,2)
* (spark,1)
*
* -------------------------------------------
* Time: 1603871692000 ms
* -------------------------------------------
* (hadoop,1)
* (ok,1)
* (hello,2)
* (spark,2)
*/
Structured Streaming
最后
以上就是幸福路灯为你收集整理的Spark 流(微批)处理(4)的全部内容,希望文章能够帮你解决Spark 流(微批)处理(4)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复