我是靠谱客的博主 称心百褶裙,最近开发中收集的这篇文章主要介绍structuredstreaming需要注意的地方,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

 structuredstreaming在版本1上增加了流式的dataset和df,但有很多原来的操作现在不能使用

import org.apache.hadoop.util.ShutdownHookManager
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter
import org.apache.spark.sql.streaming.Trigger
object structuredStreamingtest {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "D:\hadoop-2.7.6")
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.master("local[3]")
.getOrCreate()
//
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" ")).toDF("value")
// Generate running word count
val wordCounts = words.groupBy("value").count().sort("count")
//
val query = wordCounts.writeStream.outputMode("complete").queryName("aggregates")
//
.option("checkpointLocation", "D:\checkpoint")
.format("memory").trigger(Trigger.ProcessingTime("5 seconds"))
.start()
query.awaitTermination()
}
}

1.streaming DataFrames/Datasets不支持的操作

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

  • Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.(多个流聚合操作不支持,groupby之类的)

  • Limit and take first N rows are not supported on streaming Datasets.(limit和take操作不支持)

  • Distinct operations on streaming Datasets are not supported.(distinct)

  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.(sort只有在聚合操作之后生成一个新的)

  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

  • count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.(不返回静态的dataset)

  • foreach() - Instead use ds.writeStream.foreach(...) (see next section).

  • show() - Instead use the console sink (see next section).(用format配置)

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.

2.最好不要使用checkpoint

经常会出现由于强制关闭导致的序列化失败之类的问题。

最后

以上就是称心百褶裙为你收集整理的structuredstreaming需要注意的地方的全部内容,希望文章能够帮你解决structuredstreaming需要注意的地方所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部