概述
创建流式DataFrame和流式Datasets
Streaming DataFrames可以通过SparkSession.readStream()返回的DataStreamReader接口(Scala / Java / Python文档)创建。
Input Sources
常见的内置Sources
- File source : 读取指定目录下的文件作为流数据,支持的文件格式有:text、csv、json、parquet、orc等
- Kafka source(常用): 从kafka读取数据
- Socket source (测试使用):从套接字连接读取UTF-8编码的文本数据
Source | Options | Fault-tolerant | Notes |
---|---|---|---|
File source | 1. **path:**路径到输入目录,并且与所有文件格式通用。2. **maxFilesPerTrigger:**每个时间间隔触发器中要考虑的最大新文件数(默认值:无最大值)3. **latestFirst:**是否首先处理最新的新文件,当有大量的文件积压(default:false)时很有用。4. **fileNameOnly:**是否仅根据文件名而不是完整路径检查新文件(默认值:false)。将此设置为“true”,以下文件将被视为相同的文件,因为它们的文件名“dataset.txt”是相同的:· “file:///dataset.txt” · “s3://a/dataset.txt” | Yes | 支持glob路径,但不支持多个逗号分隔的poths/ globs。 |
Socket Source | **host:**主机连接,必须指定 **port:**要连接的端口,必须指定 | No | |
Kafka Source | 详细查看kafka整合教程 | Yes |
Some examples:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.4</version>
package example8
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
object StructuredStreamingSource {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("StructuredNetworkWordCount")
val sc = new SparkContext(conf)
sc.setLogLevel("FATAL")
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
// =====================================CSV===========================================
/*
val df = spark.readStream
.format("csv")
//.option("sep",";")
.schema(new StructType().add("id", "integer").add("name", "string").add("salary", "double"))
.csv("/Users/gaozhy/data/csv")
*/
// =====================================CSV===========================================
// =====================================json===========================================
/*
val df = spark.readStream
.format("json")
.schema(new StructType().add("id","integer").add("name","string").add("salary","float"))
.json("/Users/gaozhy/data/json")
*/
// =====================================json===========================================
//df.createOrReplaceTempView("t_user")
/*
spark.sql("select * from t_user")
.writeStream
.outputMode("append")
.format("console")
.start()
.awaitTermination()
*/
// =====================================kafka===========================================
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "spark:9092")
.option("startingOffsets", """{"bz":{"0":-2}}""") // 指定偏移量消费
.option("subscribe", "baizhi")
.load()
val kafka = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition")
.as[(String, String, String, Int)]
kafka.createOrReplaceTempView("t_kafka")
// spark.sql("select * from t_kafka")
// .writeStream
// .format("console")
// .outputMode("append")
// .start()
// .awaitTermination()
spark.sql("select count(*) from t_kafka group by partition")
.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
}
}
Output Modes
下面是几种输出模式:
- Append mode (default) - 这是默认模式,其中只有从上次触发后添加到结果表的新行将被输出到sink。 只有那些添加到“结果表”中并且从不会更改的行的查询才支持这一点。 因此,该模式保证每行只能输出一次(假定容错sink)。 例如,只有select,where,map,flatMap,filter,join等的查询将支持Append模式。
- Complete mode -每个触发后,整个结果表将被输出到sink。 聚合查询支持这一点。
- Update mode - (自Spark 2.1.1以来可用)只有结果表中自上次触发后更新的行才会被输出到sink。
不同类型的流式查询支持不同的输出模式。 以下是兼容性信息。
Query Type | Supported Output Modes | 备注 |
---|---|---|
没有聚合的查询 | Append, Update | 不支持完整模式,因为将所有数据保存在结果表中是不可行的。 |
有聚合的查询:使用watermark对event-time进行聚合 | Append, Update, Complete | 附加模式使用watermark 来降低旧聚合状态。 但是,窗口化聚合的输出会延迟“withWatermark()”中指定的晚期阈值,因为模式语义可以在结果表中定义后才能将结果表添加到结果表中(即在watermark 被交叉之后)。 有关详细信息,请参阅后期数据部分。更新模式使用水印去掉旧的聚合状态。完全模式不会丢弃旧的聚合状态,因为根据定义,此模式保留结果表中的所有数据。 |
有聚合的查询:其他聚合 | Complete, Update | 由于没有定义watermark (仅在其他类别中定义),旧的聚合状态不会被丢弃。不支持附加模式,因为聚合可以更新,从而违反了此模式的语义。 |
Output Sinks
有几种类型的内置输出sinks
- File sink - Stores the output to a directory
- Kafka sink - Stores the output to one or more topics in Kafka.
- Foreach sink - Runs arbitrary(任意) computation on the records in the output.
- Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger
sink | Supported Output Modes | Options | Fault-tolerant | Notes |
---|---|---|---|---|
File Sink | Append | path:输出目录的路径,必须指定。 maxFilesPerTrigger:每个触发器中要考虑的最大新文件数(默认值:无最大值) latestFirst:是否首先处理最新的新文件,当有大量的文件积压(default:false)时很有用 有关特定于文件格式的选项,请参阅DataFrameWriter(Scala / Java / Python)中的相关方法。 例如。 对于“parquet”格式选项请参阅DataFrameWriter.parquet() | yes | 支持对分区表的写入。 按时间划分可能有用。 |
Foreach Sink | Append, Update, Compelete | None | 取决于ForeachWriter的实现 | 更多细节在下一节 |
Console Sink | Append, Update, Complete | numRows:每次触发打印的行数(默认值:20)truncate:输出太长是否截断(默认值:true) | no | |
Memory Sink | Append, Complete | None | 否。但在Complete模式下,重新启动的查询将重新创建整个表。 | 查询名就是表名 |
package com.baizhi
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.streaming.OutputMode
import redis.clients.jedis.Jedis
/**
* 如何通过不同的sink构建结构化流计算结构的输出
*/
object SparkStructuredStreamingForOutputSink {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("input source")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
//--------------------------------kafka【重点】-----------------------------------
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "HadoopNode01:9092,HadoopNode02:9092,HadoopNode03:9092")
.option("startingOffsets", """{"bz":{"0":-2}}""") // 指定消费baizhi topic的0号分区的earliest消费方式
// 指定偏移量消费 In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
.option("subscribe", "bz")
.load
// kafka record 转换为要求的类型
// val ds = df
// .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS INT)", "CAST(timestamp AS LONG)")
// .as[(String, String, String, Int, Long)] // key value topic partition Long
//
// ds.createOrReplaceTempView("t_kafka")
//
// val text = spark.sql("select key as k,value as v,topic as t,partition as p, timestamp as ts from t_kafka")
//--------------------------------kafka【重点】-----------------------------------
// text
// .writeStream
// .format("console")
// .outputMode(OutputMode.Append())
// .start()
// .awaitTermination()
//===================================文件【输出模式:只支持Append】===========================================
// text
// .writeStream
// .format("json") // 文件格式 CSV JSON parquet ORC等
// .outputMode(OutputMode.Append())
// .option("checkpointLocation","hdfs://Spark:9000/checkpoint1") // 检查点path 用以故障恢复
// .option("path","file:///D://result") // path支持本地和HDFS文件系统路径
// .start()
// .awaitTermination()
// text
// .writeStream
// .format("csv") // 文件格式 CSV JSON parquet ORC等
// .outputMode(OutputMode.Append())
// .option("checkpointLocation", "hdfs://Spark:9000/checkpoint2") // 检查点path 用以故障恢复
// .option("path", "file:///D://result2") // path支持本地和HDFS文件系统路径
// .start()
// .awaitTermination()
//===================================Kafka【输出模式:Append | Updated | Completed】=========================
// val ds = df
// .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS INT)", "CAST(timestamp AS LONG)")
// .as[(String, String, String, Int, Long)].flatMap(_._2.split(" ")).map((_, 1)) // key value topic partition Long
//
// ds.createOrReplaceTempView("t_kafka")
//
// val text = spark.sql("select _1 as word,count(_2) as num from t_kafka group by _1")
//
// text
// // .selectExpr("CAST(k AS STRING) as key", "CAST(v AS STRING) as value") // 对输出到kafka的数据定义key 和 value信息
// .selectExpr("CAST(word AS STRING) as key", "CAST(num AS STRING) as value") // 对输出到kafka的数据定义key 和 value信息
// .writeStream
// .format("kafka") // 文件格式 CSV JSON parquet ORC等
// // .outputMode(OutputMode.Append())
// .outputMode(OutputMode.Update())
// .option("checkpointLocation", "hdfs://Spark:9000/checkpoint4") // 检查点path 用以故障恢复
// .option("kafka.bootstrap.servers", "HadoopNode01:9092,HadoopNode02:9092,HadoopNode03:9092") // kafka集群信息
// .option("topic", "result") // 指定计算结果的保存topic
// .start()
// .awaitTermination()
/*
[root@HadoopNode02 kafka_2.11-2.2.0]# bin/kafka-console-consumer.sh --topic result --bootstrap-server HadoopNode01:9092,HadoopNode02:9092,HadoopNode03:9092 --property print.key=true
*/
//===================================Foreach【输出模式:Append | Updated | Completed】==================================
// 将计算结果输出到redis中
//kafka record 转换为要求的类型
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS INT)", "CAST(timestamp AS LONG)")
.as[(String, String, String, Int, Long)].flatMap(_._2.split(" ")).map((_, 1)) // key value topic partition Long
ds.createOrReplaceTempView("t_kafka")
val text = spark.sql("select _1 as word,count(_2) as num from t_kafka group by _1")
text
.writeStream
.outputMode(OutputMode.Update())
.foreach(new ForeachWriter[Row] {
/**
* 打开方法
*
* @param partitionId 分区序号
* @param epochId
* @return Boolean true 创建一个连接 对这一行数据进行处理操作
* false 不会创建连接 跳过这一行数据
*/
override def open(partitionId: Long, epochId: Long): Boolean = true
/**
* 处理方法
*
* @param value resultTable 行对象
*/
override def process(value: Row): Unit = {
val word = value.getString(0)
val count = value.getLong(1).toString
val jedis = new Jedis("Spark", 6379)
jedis.set(word, count)
jedis.close()
}
override def close(errorOrNull: Throwable): Unit = if (errorOrNull != null) errorOrNull.printStackTrace()
}) // 对resultTable中的每一行记录应用写出规则
.option("checkpointLocation", "hdfs://Spark:9000/checkpoint4") // 检查点path 用以故障恢复
.start()
.awaitTermination()
}
}
最后
以上就是专注曲奇为你收集整理的spark 结构化流,创建流式DataFrame和流式Datasets的全部内容,希望文章能够帮你解决spark 结构化流,创建流式DataFrame和流式Datasets所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复