Spark2.3中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink,
如果有其他的需求我们只有通过ForeachSink自定义sink,这篇文件主要以写入到Redis和Mysql为例。
要使用ForeachSink自定义sink,必须实现ForeachWriter[T](),包括open(),process(),close()三个方法:
复制代码
1
2
3
4
5
6
7
8
9
10class redisSink extends ForeachWriter[Row](){ override def open(partitionId: Long, version: Long): Boolean ={ //这个方法进行一些初始化,如redis,获取连接 } override def process(value: Row): Unit ={ //具体的处理逻辑,写数据到数据库中 } override def close(errorOrNull: Throwable): Unit = { //关闭连接 }
在每个batch中,这三个方法各调用一次,相当每批数据调用一次。下面是写入到redis和mysql的具体实现:
写入到Redis:
复制代码
1
2
3
4
5<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82import java.sql.Timestamp import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} object writeToRedis { def main(args:Array[String]):Unit= { //获取sparkSession对象 val spark: SparkSession = SparkSession.builder .appName("continuousTrigger") .master("local[2]") .getOrCreate() //设置日志输出级别 spark.sparkContext.setLogLevel("WARN") import spark.implicits._ var batchId: Long = 0 //对查询添加一个监听,获取每个批次的处理信息 spark.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val progress: StreamingQueryProgress = event.progress batchId = progress.batchId val inputRowsPerSecond: Double = progress.inputRowsPerSecond val processRowsPerSecond: Double = progress.processedRowsPerSecond val numInputRows: Long = progress.numInputRows println("batchId=" + batchId, " numInputRows=" + numInputRows + " inputRowsPerSecond=" + inputRowsPerSecond + " processRowsPerSecond=" + processRowsPerSecond) } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} }) //使用structuredStreaming自带的Source产生数据 //|-- timestamp: timestamp (nullable = true) // |-- value: long (nullable = true) val rateSource: DataFrame = spark.readStream .format("rate") .option("rowsPerSecond", 100) .load() //增加一列 batchId val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => { val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2) tuple }).toDF("batchId","timestamp","num") //过滤 val resultDS: Dataset[Row] = addDF.filter("num%2=0") resultDS.writeStream .outputMode("complete") .foreach(new redisSink()) //也可以这样 /*.foreach(new ForeachWriter[Row](){ override def open(partitionId: Long, version: Long): Boolean ={。。。。。} override def process(value: Row): Unit ={。。。。。。} override def close(errorOrNull: Throwable): Unit ={。。。。。。} })*/ .start() .awaitTermination() } class redisSink extends ForeachWriter[Row](){ var jedis:Jedis=null override def open(partitionId: Long, version: Long): Boolean ={ val config: JedisPoolConfig = new JedisPoolConfig() config.setMaxTotal(20) config.setMaxIdle(5) config.setMaxWaitMillis(1000) config.setMinIdle(2) config.setTestOnBorrow(false) val jedisPool = new JedisPool(config,"127.0.0.1",6379) jedis=jedisPool.getResource() return true } override def process(value: Row): Unit ={ //写入数据到redis jedis.rpush("rate",value.get(0)+" "+value.get(1)+" "+value.get(2)) } override def close(errorOrNull: Throwable): Unit = { //关闭连接 jedis.close() } } }
写入到Mysql:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85import java.sql.{Connection, PreparedStatement, Timestamp} import com.mchange.v2.c3p0.ComboPooledDataSource import org.apache.spark.sql._ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress} object writeToMysql { def main(args:Array[String]):Unit= { val rows = 100 //获取sparkSession对象 val spark: SparkSession = SparkSession.builder .appName("continuousTrigger") .master("local[2]") .getOrCreate() //设置日志输出级别 spark.sparkContext.setLogLevel("WARN") import spark.implicits._ var batchId: Long = 0 //对查询添加一个监听,获取每个批次的处理信息 spark.streams.addListener(new StreamingQueryListener() { override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val progress: StreamingQueryProgress = event.progress batchId = progress.batchId val inputRowsPerSecond: Double = progress.inputRowsPerSecond val processRowsPerSecond: Double = progress.processedRowsPerSecond val numInputRows: Long = progress.numInputRows println("batchId=" + batchId, " numInputRows=" + numInputRows + " inputRowsPerSecond=" + inputRowsPerSecond + " processRowsPerSecond=" + processRowsPerSecond) } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {} }) //读取数据源 val rateSource: DataFrame = spark.readStream .format("rate") .option("rowsPerSecond", rows) .load() rateSource.printSchema() val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => { val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2) tuple }).toDF("batchId","timestamp","num") val resultDS: Dataset[Row] = addDF.filter("num%2=0") resultDS.writeStream .foreach(new mysqlSink()) .start() .awaitTermination() } class mysqlSink extends ForeachWriter[Row](){ var conn:Connection=null var ps:PreparedStatement=null var dataSource:ComboPooledDataSource=_ val sql="insert into rate(batchId,InputTimestamp,num) values(?,?,?)" override def open(partitionId: Long, version: Long): Boolean ={ dataSource = new ComboPooledDataSource() dataSource.setDriverClass("com.mysql.jdbc.Driver") dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/entrobus") dataSource.setUser("root") dataSource.setPassword("root") dataSource.setInitialPoolSize(40) dataSource.setMaxPoolSize(100) dataSource.setMinPoolSize(10) dataSource.setAcquireIncrement(5) conn= dataSource.getConnection ps= conn.prepareStatement(sql) return true } override def process(value: Row): Unit ={ ps.setObject(1,value.get(0)) ps.setObject(2,value.get(1)) ps.setObject(3,value.get(2)) val i: Int = ps.executeUpdate() println(i+" "+value.get(0)+" "+value.get(1)+" "+value.get(2)) } override def close(errorOrNull: Throwable): Unit = { dataSource.close() } } }
很久没用mysql数据库了,不知道连接池那块这样写合不合理,不过数据写入到mysql数据库是没问题的。
最后
以上就是酷炫金毛最近收集整理的关于StructuredStreaming中的ForeachSink的用法(一)的全部内容,更多相关StructuredStreaming中内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复