概述
本文总结Spark Structured Streaming Source、Sink。
Source
File Source
从目录中读取文件作为数据流。支持csv
、json
、text
、parquet
、orc
格式。以csv文件为例如下:
// 启动自动推断Schema
spark.conf.set("spark.sql.streaming.schemaInference",true)
// 读取csv文件
spark
.readStream
.format("csv")
// 首行是header
.option("header", "true")
// 丢弃异常列
.option("mode","dropmalformed")
// 每次Trigger,最多读取的新文件的个数为2
.option("maxFilesPerTrigger",2)
// 文件目录
.load("src/main/resources/apps/fileSource")
注意:
-
文件必须被原子性地放入目录,通常可通过mv操作实现。
-
默认需要显式指定Schema,设置
spark.sql.streaming.schemaInference=true
启用自动推断Schema。 -
默认, 当目录下的子目录名为
key=value
格式时,会自动推断分区,即将key
作为分区列列名,value
作为分区列的值添加到DataFrame中。 -
maxFilesPerTrigger
选项: 每次Trigger,最多读取的新文件的个数。 -
每种格式:
csv
、json
、text
、parquet
、orc
可用的选项可从DataStreamReader
中找到。
Kafka Source
从Kafka读取数据,支持Kafka 0.10.0及以上版本。参考: Spark Structured Streaming 读写Kafka与Exactly-once语义。
Socket Source
从TCP Socket读取数据。不保证容错。仅用于测试。如下:
spark
.readStream
.format("socket")
.option("host","localhost")
.option("port",9999)
.load()
Rate Source
以指定的速率(行/秒)生成数据。可用于测试或压测。如下:
spark
.readStream
.format("rate")
// 速率,即每秒数据条数。默认1。
.option("rowsPerSecond","10")
// 多长时间后达到指定速率。默认0。
.option("rampUpTime",50)
// 生成的数据的分区数(并行度)。默认Spark并行度。
.option("numPartitions",5)
.load()
注意:
-
受限于
numPartitions
和CPU核数,生产数据的速度不一定能达到指定速率。 -
生成的DataFrame的Schema,包含两列: timestamp(Timestamp类型,数据产生的时间)、value(Long类型,从0开始递增)。
Sink
File Sink
将结果数据输出到目录。支持csv
、json
、text
、parquet
、orc
格式。以parquet格式为例如下。
resultTable
.writeStream
.format("parquet")
.option("path", "src/main/resources/apps/fileSink/parquet/data")
.option("checkpointLocation","src/main/resources/apps/fileSink/parquet/checkpoint")
.outputMode("append")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
注意:
-
输出的
parquet
、orc
文件默认采用snappy压缩。 -
File Sink只支持Append输出模式。
-
每种格式:
csv
、json
、text
、parquet
、orc
可用的选项可从DataFrameWriter
中找到。
Kafka Sink
将结果数据输出到Kafka一个或多个Topic。参考: Spark Structured Streaming 读写Kafka与Exactly-once语义。
Console Sink
将结果数据输出到控制台。由于每次Trigger后都会收集全部数据并存储在Driver内存中用于输出到控制台,因此只适合少数据量下的测试。如下。
resultTable
.writeStream
.format("console")
.option("truncate","false")
.outputMode("append")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
Memory Sink
将结果数据保存在Driver内存的一个表中。只适合少数据量。如下:
val query=resultTable
.writeStream
.format("memory")
// queryName表名
.option("queryName","t_memory_table")
.outputMode("append")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
// 每隔2秒查询一次内存中的表
while (true){
Thread.sleep(2 * 1000)
spark.sql("select * from t_memory_table").show(truncate = false)
}
query.awaitTermination()
Foreach Sink
自定义输出: 以行为单位进行处理,即逐行处理。
注意:
-
用
Foreach Sink
输出数据,主要是实现ForeachWriter
接口。 -
每个Batch的每个Partition都会创建一个
ForeachWriter
实例。 -
当open方法返回false时,该分区中的数据会被跳过。
ForeachWriter
接口如下:
abstract class ForeachWriter[T] extends Serializable {
//获取连接
def open(partitionId: Long, epochId: Long): Boolean
//处理每行
def process(value: T): Unit
//释放连接
def close(errorOrNull: Throwable): Unit
}
以输出到Mysql为例,如下:
resultTable
.writeStream
.foreach(new ForeachWriter[Row] {
val host = "localhost"
val port = 3306
val database = "bigdata"
val user = "bigdata"
val password = "123456"
val url = s"jdbc:mysql://$host:$port/$database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
var connection: Connection = _
var preparedStatement: PreparedStatement = _
var batchCount = 0
// 每个Batch每个Partition打开连接或从连接池获取连接
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(url, user, password)
connection.setAutoCommit(false)
val sql = "INSERT INTO t_result_table (timestamp,value) values (?,?)"
preparedStatement = connection.prepareStatement(sql)
connection !=null && !connection.isClosed && preparedStatement !=null
}
// 处理每一条数据
override def process(row: Row): Unit = {
val timestamp = row.getTimestamp(0)
val value = row.getLong(1)
preparedStatement.setTimestamp(1,timestamp)
preparedStatement.setLong(2,value)
preparedStatement.addBatch()
batchCount +=1
if(batchCount >= 50){
preparedStatement.executeBatch()
connection.commit()
batchCount =0
}
}
// 每个Batch每个Partition关闭连接或将连接归还到连接池
override def close(errorOrNull: Throwable): Unit = {
preparedStatement.executeBatch()
connection.commit()
batchCount = 0
}
})
.outputMode("append")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
ForeachBatch Sink
自定义输出: 以Batch为单位进行处理。只适用于微批模式,不适用于连续处理模式(即Trigger不能为Trigger.Continuous)。
以输出到Mysql为例,如下:
resultTable
.writeStream
.foreachBatch((batchDF:DataFrame,batchID:Long)=>{
if(!batchDF.head(1).isEmpty){
logger.info("BatchID: "+batchID)
batchDF
.write
.format("jdbc")
.option("checkpointLocation", "bigdata/data/spark/foreachBatchSink/checkpoint")
.option("url", "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable", "t_result_table")
.option("user", "bigdata")
.option("password", "123456")
.option("batchsize",50)
.mode(SaveMode.Append)
.save()
}
})
.outputMode("append")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
Sink与容错
Sink | 容错 |
---|---|
File Sink | Exactly Once |
Kafka Sink | At Least Once |
Console Sink | 不具备容错 |
Memory Sink | 不具备容错。当输出模式为Completa时,在重启程序后会基于Checkpoint重建整张表。 |
Foreach Sink | 开发者自己控制容错 |
ForeachBatch Sink | 开发者自己控制容错 |
最后
以上就是平淡秋天为你收集整理的Spark Structured Streaming Source、SinkSourceSink的全部内容,希望文章能够帮你解决Spark Structured Streaming Source、SinkSourceSink所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复