概述
前言
学习Spark的Structured Streaming(结构化流)。
官网:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
1. 快速示例
博主这里仍然使用的是Spark 集群运行。
假设从侦听TCP套接字的数据服务器接收的文本数据的运行字数。
- 编写代码
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession._
// 创建SparkSession
val spark = SparkSession.builder.appName("StructedStreaming").getOrCreate()
// 创建流式DataFrame
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
// 先转换为字符串数据集,然后分割
val words = lines.as[String].flatMap(_.split(" "))
// 按值分组并计数
val wordsCount = words.groupBy("value").count()
// 输出到控制台, 并启动
val query = wordsCount.writeStream.outputMode("complete").format("console").start()
// 防止查询处于活动状态时退出
query.awaitTermination()
- 先启动传输套接字
在master 节点输入:
nc -lk 9999
3. 启动Spark 应用
终端运行 spark-shell,进入scala的shell,复制代码:
在WebUI 下,也可以看到相应的Job 情况:
4. 测试数据
在nc 中输入数据:
可以看到shell中正在执行计算步骤:
计算结果:
再次输入:
你可以看到输出结果包括了之前的数据,是因为输出时设置了选项为 “complete”。
有如下三种输出模式:
也称为“增量查询”:该查询将先前运行的计数与新数据相结合,以计算更新的计数。
如,官网图例:
2. 创建DataFrame和流式数据集
使用本地文件系统,监控一个本地目录,按照csv格式解析:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.types.StructType
// 创建schema
val userSchema = new StructType().add("name", "string").add("age","integer")
// 创建
val csvDF = spark.readStream.option("sep",";").schema(userSchema).csv("file:///home/hadoop/test_data/")
// 输出模式只能选择append
val query = csvDF.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
在/home/hadoop 下创建 test_data目录:
复制代码到shell,运行:
输入测试数据:
结果:
完!
最后
以上就是繁荣海燕为你收集整理的11-Structured Streaming -- Scala版本的全部内容,希望文章能够帮你解决11-Structured Streaming -- Scala版本所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复