概述
文章目录
- 一、前言
- 二、Transformations on DStreams
- 三、 Window Operations(窗口操作)
- 四、Output Operations on DStreams(输出操作)
一、前言
Spark Streaming是核心Spark API的扩展,它支持对实时数据流进行可伸缩、高吞吐量和容错的流处理。数据可以从Kafka、Flume、Kinesis或TCP套接字等多个源获取,也可以使用map、reduce、join和window等高级函数表示的复杂算法进行处理。最后,可以将处理过的数据推送到文件系统、数据库和实时仪表板。事实上,您可以将Spark的机器学习和图形处理算法应用于数据流。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
离散流或DStream是Spark Streaming提供的基本抽象。它表示连续的数据流,无论是从源接收的输入数据流,还是通过转换输入流生成的经过处理的数据流。在内部,DStream由一系列连续的RDDs表示,RDDs是Spark对不可变的分布式数据集的抽象。DStream中的每个RDD都包含来自某个时间间隔的数据。在内部,DStream表示为RDDs序列。
DStream中的算子分为两大类:Transformations 和Output
二、Transformations on DStreams
与RDDs类似,转换允许修改来自输入DStream的数据。DStreams支持普通Spark RDD上可用的许多转换。下面是一些常见的算子。
Transformation | 含义 |
---|---|
map(func) | 通过函数func传递源DStream的每个元素,返回一个新的DStream。 |
flatMap(func) | 类似于map,但是每个输入项可以映射到0或多个输出项。 |
filter(func) | 通过只选择func返回true的源DStream的记录来返回一个新的DStream。 |
repartition(numPartitions) | 重分区,通过创建或多或少的分区来更改此DStream中的并行度级别。 |
union(otherStream) | 返回一个新的DStream,它包含源DStream和其他DStream中的元素的联合。 |
count() | 通过计算源DStream的每个RDD中的元素数量,返回一个新的单元素RDD DStream。 |
reduce(func) | 使用func函数(函数接受两个参数并返回一个参数)聚合源DStream的每个RDD中的元素,从而返回单元素RDDs的新DStream。这个函数应该是结合律和交换律的,这样才能并行计算。 |
countByValue() | 当对K类型的元素的DStream调用时,返回一个新的(K, Long)对的DStream,其中每个键的值是它在源DStream的每个RDD中的频率。 |
reduceByKey(func, [numTasks]) | 当对(K, V)对的DStream调用时,返回一个新的(K, V)对的DStream,其中每个键的值使用给定的reduce函数进行聚合。注意:默认情况下,这将使用Spark的默认并行任务数量(本地模式为2,在集群模式下,该数量由config属性Spark .default.parallelism决定)来进行分组。我们可以传递一个可选的numTasks参数来设置不同数量的任务。 |
join(otherStream, [numTasks]) | 当调用两个(K, V)和(K, W)对的DStream时,返回一个新的(K, (V, W))对的DStream,其中包含每个Key的所有元素对。 |
cogroup(otherStream, [numTasks]) | 当调用(K, V)和(K, W)对的DStream时,返回一个新的(K, Seq[V], Seq[W])元组DStream。 |
transform(func) | 通过将RDD-to-RDD函数应用于源DStream的每个RDD,返回一个新的DStream。它可以用于应用DStream API中没有公开的任何RDD操作。例如将数据流中的每个批处理与另一个数据集连接的功能并不直接在DStream API中公开。但是你可以很容易地使用transform来实现这一点。这带来了非常强大的可能性。例如,可以通过将输入数据流与预先计算的垃圾信息(也可能是使用Spark生成的)结合起来进行实时数据清理 |
updateStateByKey(func) | 返回一个新的“state”DStream,其中每个Key的状态通过将给定的函数应用于Key的前一个状态和Key的新值来更新。这可以用于维护每个Key的任意状态数据。要使用它,您需要执行两个步骤:(1).定义状态——状态可以是任意数据类型;(2).定义状态更新函数——用函数指定如何使用输入流中的前一个状态和新值更新状态。 |
三、 Window Operations(窗口操作)
Spark Streaming还提供了窗口计算,允许你在滑动的数据窗口上应用转换。
如图所示:
每当窗口滑过源DStream时,属于该窗口的源RDDs就被组合起来并对其进行操作,从而生成窗口化DStream的RDDs。在这种特定的情况下,操作应用于最后3个时间单位的数据,并以2个时间单位进行幻灯片显示。这表明任何窗口操作都需要指定两个参数:
(1).窗口长度(windowLength)-窗口的持续时间(图中为3)。
(2).滑动间隔(slideInterval)——执行窗口操作的间隔(图中为2)。
这两个参数必须是批处理间隔的倍数.
一些常见的窗口操作如下所示。所有这些操作都使用上述两个参数——windowLength和slideInterval。
Transformation | 含义 |
---|---|
window(windowLength, slideInterval) | 返回一个新的DStream,它是基于源DStream的窗口批次计算的。 |
countByWindow(windowLength, slideInterval) | 返回流中元素的滑动窗口计数。 |
reduceByWindow(func, windowLength, slideInterval) | 返回一个新的单元素流,该流是使用func在滑动间隔上聚合流中的元素创建的。这个函数应该是结合律和交换律的,这样才能并行地正确计算。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 当对(K, V)对的DStream调用时,返回一个新的(K, V)对的DStream,其中每个Key的值使用给定的reduce函数func在滑动窗口中分批聚合。注意:默认情况下,这将使用Spark的默认并行任务数量(本地模式为2,在集群模式下,该数量由config属性Spark .default.parallelism决定)来进行分组。您可以传递一个可选的numTasks参数来设置不同数量的任务。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 上面reduceByKeyAndWindow()的一个更有效的版本,其中每个窗口的reduce值是使用前一个窗口的reduce值增量计算的。这是通过减少进入滑动窗口的新数据和“反向减少”离开窗口的旧数据来实现的。例如,在窗口滑动时“添加”和“减去”键的计数。但是,它只适用于“可逆约简函数”,即具有相应“逆约简”函数的约简函数(取invFunc参数)。与reduceByKeyAndWindow类似,reduce任务的数量可以通过一个可选参数进行配置。注意,必须启用checkpoint才能使用此操作。 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 当对(K, V)对的DStream调用时,返回一个新的(K, Long)对的DStream,其中每个Key的值是它在滑动窗口中的频率。与reduceByKeyAndWindow类似,reduce任务的数量可以通过一个可选参数进行配置。 |
四、Output Operations on DStreams(输出操作)
输出操作允许将DStream的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许外部系统使用转换后的数据,因此它们触发所有DStream转换的实际执行(类似于RDDs的操作)。目前,定义了以下输出操作:
Output Operation | 含义 |
---|---|
print() | 在运行流应用程序的驱动程序节点上打印DStream中每批数据的前10个元素。这对于开发和调试非常有用。这在Python API中称为pprint()。 |
saveAsTextFiles(prefix, [suffix]) | 将此DStream的内容保存为文本文件。每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix- time_in_ms [.suffix]”。 |
saveAsObjectFiles(prefix, [suffix]) | 将此DStream的内容保存为序列化Java对象的sequencefile。每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix- time_in_ms [.suffix]”。这在Python API中是不可用的。 |
saveAsHadoopFiles(prefix, [suffix]) | 将这个DStream的内容保存为Hadoop文件。每个批处理间隔的文件名是根据前缀和后缀生成的:“prefix- time_in_ms [.suffix]”。这在Python API中是不可用的。 |
foreachRDD(func) | 对流生成的每个RDD应用函数func的最通用输出操作符。这个函数应该将每个RDD中的数据推送到外部系统,例如将RDD保存到文件中,或者通过网络将其写入数据库。请注意,函数func是在运行流应用程序的驱动程序进程中执行的,其中通常会有RDD操作,这将强制流RDDs的计算。在func中创建远程连接时可以使用foreachPartition 替换foreach操作以降低系统的总体吞吐量。 |
示例请参考官网:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
最后
以上就是从容羽毛为你收集整理的【Spark Streaming】(三)DStream 算子详解的全部内容,希望文章能够帮你解决【Spark Streaming】(三)DStream 算子详解所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复