我是靠谱客的博主 酷炫金毛,最近开发中收集的这篇文章主要介绍StructuredStreaming中的ForeachSink的用法(一),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Spark2.3中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink,

如果有其他的需求我们只有通过ForeachSink自定义sink,这篇文件主要以写入到Redis和Mysql为例。

       要使用ForeachSink自定义sink,必须实现ForeachWriter[T](),包括open(),process(),close()三个方法:

class redisSink extends ForeachWriter[Row](){
  override def open(partitionId: Long, version: Long): Boolean ={
     //这个方法进行一些初始化,如redis,获取连接
  }
  override def process(value: Row): Unit ={
    //具体的处理逻辑,写数据到数据库中
  }
  override def close(errorOrNull: Throwable): Unit = {
   //关闭连接
  }

        在每个batch中,这三个方法各调用一次,相当每批数据调用一次。下面是写入到redis和mysql的具体实现:

写入到Redis:     

 

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

 

import java.sql.Timestamp
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object writeToRedis {
  def main(args:Array[String]):Unit= {
    //获取sparkSession对象
    val spark: SparkSession = SparkSession.builder
      .appName("continuousTrigger")
      .master("local[2]")
      .getOrCreate()
    //设置日志输出级别
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    var batchId: Long = 0
    //对查询添加一个监听,获取每个批次的处理信息
    spark.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        val progress: StreamingQueryProgress = event.progress
        batchId = progress.batchId
        val inputRowsPerSecond: Double = progress.inputRowsPerSecond
        val processRowsPerSecond: Double = progress.processedRowsPerSecond
        val numInputRows: Long = progress.numInputRows
        println("batchId=" + batchId, "  numInputRows=" + numInputRows + "  inputRowsPerSecond=" + inputRowsPerSecond +
          "  processRowsPerSecond=" + processRowsPerSecond)
      }
      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
    })

    //使用structuredStreaming自带的Source产生数据
    //|-- timestamp: timestamp (nullable = true)
    // |-- value: long (nullable = true)
    val rateSource: DataFrame = spark.readStream
      .format("rate")
      .option("rowsPerSecond", 100)
      .load()
    //增加一列 batchId
    val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
      val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
      tuple
    }).toDF("batchId","timestamp","num")
    //过滤
    val resultDS: Dataset[Row] = addDF.filter("num%2=0")

    resultDS.writeStream
      .outputMode("complete")
      .foreach(new redisSink())
      //也可以这样
      /*.foreach(new ForeachWriter[Row](){
        override def open(partitionId: Long, version: Long): Boolean ={。。。。。}
        override def process(value: Row): Unit ={。。。。。。}
        override def close(errorOrNull: Throwable): Unit ={。。。。。。}
      })*/
      .start()
      .awaitTermination()
  }
  class redisSink extends ForeachWriter[Row](){
    var jedis:Jedis=null
    override def open(partitionId: Long, version: Long): Boolean ={
      val config: JedisPoolConfig = new JedisPoolConfig()
      config.setMaxTotal(20)
      config.setMaxIdle(5)
      config.setMaxWaitMillis(1000)
      config.setMinIdle(2)
      config.setTestOnBorrow(false)
      val jedisPool = new JedisPool(config,"127.0.0.1",6379)
      jedis=jedisPool.getResource()
      return true
    }
    override def process(value: Row): Unit ={
      //写入数据到redis
      jedis.rpush("rate",value.get(0)+" "+value.get(1)+" "+value.get(2))
    }
    override def close(errorOrNull: Throwable): Unit = {
      //关闭连接
      jedis.close()
    }
  }
}

写入到Mysql:

import java.sql.{Connection, PreparedStatement, Timestamp}
import com.mchange.v2.c3p0.ComboPooledDataSource
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}


object writeToMysql {
  def main(args:Array[String]):Unit= {

    val rows = 100
    //获取sparkSession对象
    val spark: SparkSession = SparkSession.builder
      .appName("continuousTrigger")
      .master("local[2]")
      .getOrCreate()
    //设置日志输出级别
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    var batchId: Long = 0
    //对查询添加一个监听,获取每个批次的处理信息
    spark.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        val progress: StreamingQueryProgress = event.progress
        batchId = progress.batchId
        val inputRowsPerSecond: Double = progress.inputRowsPerSecond
        val processRowsPerSecond: Double = progress.processedRowsPerSecond
        val numInputRows: Long = progress.numInputRows
        println("batchId=" + batchId, "  numInputRows=" + numInputRows + "  inputRowsPerSecond=" + inputRowsPerSecond +
          "  processRowsPerSecond=" + processRowsPerSecond)
      }
      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
    })

    //读取数据源
    val rateSource: DataFrame = spark.readStream
      .format("rate")
      .option("rowsPerSecond", rows)
      .load()
    rateSource.printSchema()


    val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
      val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
      tuple
    }).toDF("batchId","timestamp","num")
    
    val resultDS: Dataset[Row] = addDF.filter("num%2=0")
    resultDS.writeStream
      .foreach(new mysqlSink())
      .start()
      .awaitTermination()
  }
  class mysqlSink extends ForeachWriter[Row](){
    var conn:Connection=null
    var ps:PreparedStatement=null
    var dataSource:ComboPooledDataSource=_
    val sql="insert into rate(batchId,InputTimestamp,num) values(?,?,?)"
    override def open(partitionId: Long, version: Long): Boolean ={
      dataSource = new ComboPooledDataSource()
      dataSource.setDriverClass("com.mysql.jdbc.Driver")
      dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/entrobus")
      dataSource.setUser("root")
      dataSource.setPassword("root")
      dataSource.setInitialPoolSize(40)
      dataSource.setMaxPoolSize(100)
      dataSource.setMinPoolSize(10)
      dataSource.setAcquireIncrement(5)
      conn= dataSource.getConnection
      ps= conn.prepareStatement(sql)
      return true
    }
    override def process(value: Row): Unit ={
      ps.setObject(1,value.get(0))
      ps.setObject(2,value.get(1))
      ps.setObject(3,value.get(2))
      val i: Int = ps.executeUpdate()
      println(i+" "+value.get(0)+" "+value.get(1)+" "+value.get(2))
    }
    override def close(errorOrNull: Throwable): Unit = {
      dataSource.close()
    }
  }
}

    很久没用mysql数据库了,不知道连接池那块这样写合不合理,不过数据写入到mysql数据库是没问题的。

 

 

 

最后

以上就是酷炫金毛为你收集整理的StructuredStreaming中的ForeachSink的用法(一)的全部内容,希望文章能够帮你解决StructuredStreaming中的ForeachSink的用法(一)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(59)

评论列表共有 0 条评论

立即
投稿
返回
顶部