概述
Spark2.3中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink,
如果有其他的需求我们只有通过ForeachSink自定义sink,这篇文件主要以写入到Redis和Mysql为例。
要使用ForeachSink自定义sink,必须实现ForeachWriter[T](),包括open(),process(),close()三个方法:
class redisSink extends ForeachWriter[Row](){
override def open(partitionId: Long, version: Long): Boolean ={
//这个方法进行一些初始化,如redis,获取连接
}
override def process(value: Row): Unit ={
//具体的处理逻辑,写数据到数据库中
}
override def close(errorOrNull: Throwable): Unit = {
//关闭连接
}
在每个batch中,这三个方法各调用一次,相当每批数据调用一次。下面是写入到redis和mysql的具体实现:
写入到Redis:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
import java.sql.Timestamp
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object writeToRedis {
def main(args:Array[String]):Unit= {
//获取sparkSession对象
val spark: SparkSession = SparkSession.builder
.appName("continuousTrigger")
.master("local[2]")
.getOrCreate()
//设置日志输出级别
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
var batchId: Long = 0
//对查询添加一个监听,获取每个批次的处理信息
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val progress: StreamingQueryProgress = event.progress
batchId = progress.batchId
val inputRowsPerSecond: Double = progress.inputRowsPerSecond
val processRowsPerSecond: Double = progress.processedRowsPerSecond
val numInputRows: Long = progress.numInputRows
println("batchId=" + batchId, " numInputRows=" + numInputRows + " inputRowsPerSecond=" + inputRowsPerSecond +
" processRowsPerSecond=" + processRowsPerSecond)
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
})
//使用structuredStreaming自带的Source产生数据
//|-- timestamp: timestamp (nullable = true)
// |-- value: long (nullable = true)
val rateSource: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", 100)
.load()
//增加一列 batchId
val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
tuple
}).toDF("batchId","timestamp","num")
//过滤
val resultDS: Dataset[Row] = addDF.filter("num%2=0")
resultDS.writeStream
.outputMode("complete")
.foreach(new redisSink())
//也可以这样
/*.foreach(new ForeachWriter[Row](){
override def open(partitionId: Long, version: Long): Boolean ={。。。。。}
override def process(value: Row): Unit ={。。。。。。}
override def close(errorOrNull: Throwable): Unit ={。。。。。。}
})*/
.start()
.awaitTermination()
}
class redisSink extends ForeachWriter[Row](){
var jedis:Jedis=null
override def open(partitionId: Long, version: Long): Boolean ={
val config: JedisPoolConfig = new JedisPoolConfig()
config.setMaxTotal(20)
config.setMaxIdle(5)
config.setMaxWaitMillis(1000)
config.setMinIdle(2)
config.setTestOnBorrow(false)
val jedisPool = new JedisPool(config,"127.0.0.1",6379)
jedis=jedisPool.getResource()
return true
}
override def process(value: Row): Unit ={
//写入数据到redis
jedis.rpush("rate",value.get(0)+" "+value.get(1)+" "+value.get(2))
}
override def close(errorOrNull: Throwable): Unit = {
//关闭连接
jedis.close()
}
}
}
写入到Mysql:
import java.sql.{Connection, PreparedStatement, Timestamp}
import com.mchange.v2.c3p0.ComboPooledDataSource
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
object writeToMysql {
def main(args:Array[String]):Unit= {
val rows = 100
//获取sparkSession对象
val spark: SparkSession = SparkSession.builder
.appName("continuousTrigger")
.master("local[2]")
.getOrCreate()
//设置日志输出级别
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
var batchId: Long = 0
//对查询添加一个监听,获取每个批次的处理信息
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val progress: StreamingQueryProgress = event.progress
batchId = progress.batchId
val inputRowsPerSecond: Double = progress.inputRowsPerSecond
val processRowsPerSecond: Double = progress.processedRowsPerSecond
val numInputRows: Long = progress.numInputRows
println("batchId=" + batchId, " numInputRows=" + numInputRows + " inputRowsPerSecond=" + inputRowsPerSecond +
" processRowsPerSecond=" + processRowsPerSecond)
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
})
//读取数据源
val rateSource: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", rows)
.load()
rateSource.printSchema()
val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
tuple
}).toDF("batchId","timestamp","num")
val resultDS: Dataset[Row] = addDF.filter("num%2=0")
resultDS.writeStream
.foreach(new mysqlSink())
.start()
.awaitTermination()
}
class mysqlSink extends ForeachWriter[Row](){
var conn:Connection=null
var ps:PreparedStatement=null
var dataSource:ComboPooledDataSource=_
val sql="insert into rate(batchId,InputTimestamp,num) values(?,?,?)"
override def open(partitionId: Long, version: Long): Boolean ={
dataSource = new ComboPooledDataSource()
dataSource.setDriverClass("com.mysql.jdbc.Driver")
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/entrobus")
dataSource.setUser("root")
dataSource.setPassword("root")
dataSource.setInitialPoolSize(40)
dataSource.setMaxPoolSize(100)
dataSource.setMinPoolSize(10)
dataSource.setAcquireIncrement(5)
conn= dataSource.getConnection
ps= conn.prepareStatement(sql)
return true
}
override def process(value: Row): Unit ={
ps.setObject(1,value.get(0))
ps.setObject(2,value.get(1))
ps.setObject(3,value.get(2))
val i: Int = ps.executeUpdate()
println(i+" "+value.get(0)+" "+value.get(1)+" "+value.get(2))
}
override def close(errorOrNull: Throwable): Unit = {
dataSource.close()
}
}
}
很久没用mysql数据库了,不知道连接池那块这样写合不合理,不过数据写入到mysql数据库是没问题的。
最后
以上就是酷炫金毛为你收集整理的StructuredStreaming中的ForeachSink的用法(一)的全部内容,希望文章能够帮你解决StructuredStreaming中的ForeachSink的用法(一)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复