我是靠谱客的博主 专注曲奇,最近开发中收集的这篇文章主要介绍spark 结构化流,创建流式DataFrame和流式Datasets,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

创建流式DataFrame和流式Datasets

Streaming DataFrames可以通过SparkSession.readStream()返回的DataStreamReader接口(Scala / Java / Python文档)创建。

Input Sources

常见的内置Sources

  • File source : 读取指定目录下的文件作为流数据,支持的文件格式有:text、csv、json、parquet、orc等
  • Kafka source(常用): 从kafka读取数据
  • Socket source (测试使用):从套接字连接读取UTF-8编码的文本数据
SourceOptionsFault-tolerantNotes
File source1. **path:**路径到输入目录,并且与所有文件格式通用。2. **maxFilesPerTrigger:**每个时间间隔触发器中要考虑的最大新文件数(默认值:无最大值)3. **latestFirst:**是否首先处理最新的新文件,当有大量的文件积压(default:false)时很有用。4. **fileNameOnly:**是否仅根据文件名而不是完整路径检查新文件(默认值:false)。将此设置为“true”,以下文件将被视为相同的文件,因为它们的文件名“dataset.txt”是相同的:· “file:///dataset.txt” · “s3://a/dataset.txt”Yes支持glob路径,但不支持多个逗号分隔的poths/ globs。
Socket Source**host:**主机连接,必须指定 **port:**要连接的端口,必须指定No
Kafka Source详细查看kafka整合教程Yes

Some examples:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.4</version>
package example8

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

object StructuredStreamingSource {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("StructuredNetworkWordCount")
    val sc = new SparkContext(conf)
    sc.setLogLevel("FATAL")
    val spark = SparkSession.builder().getOrCreate()
    import spark.implicits._
    // =====================================CSV===========================================
    /*
    val df = spark.readStream
      .format("csv")
      //.option("sep",";")
      .schema(new StructType().add("id", "integer").add("name", "string").add("salary", "double"))
      .csv("/Users/gaozhy/data/csv")
     */
    // =====================================CSV===========================================

    // =====================================json===========================================
    /*
    val df = spark.readStream
      .format("json")
      .schema(new StructType().add("id","integer").add("name","string").add("salary","float"))
      .json("/Users/gaozhy/data/json")
    */
    // =====================================json===========================================

    //df.createOrReplaceTempView("t_user")
    /*
    spark.sql("select * from t_user")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
      .awaitTermination()
    */
    
    // =====================================kafka===========================================
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "spark:9092")
      .option("startingOffsets", """{"bz":{"0":-2}}""") // 指定偏移量消费
      .option("subscribe", "baizhi")
      .load()

    val kafka = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic", "partition")
      .as[(String, String, String, Int)]

    kafka.createOrReplaceTempView("t_kafka")
//    spark.sql("select * from t_kafka")
//      .writeStream
//      .format("console")
//      .outputMode("append")
//      .start()
//      .awaitTermination()

    spark.sql("select count(*) from t_kafka group by partition")
      .writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()
  }
}

Output Modes

下面是几种输出模式:

  • Append mode (default) - 这是默认模式,其中只有从上次触发后添加到结果表的新行将被输出到sink。 只有那些添加到“结果表”中并且从不会更改的行的查询才支持这一点。 因此,该模式保证每行只能输出一次(假定容错sink)。 例如,只有select,where,map,flatMap,filter,join等的查询将支持Append模式。
  • Complete mode -每个触发后,整个结果表将被输出到sink。 聚合查询支持这一点。
  • Update mode - (自Spark 2.1.1以来可用)只有结果表中自上次触发后更新的行才会被输出到sink。

不同类型的流式查询支持不同的输出模式。 以下是兼容性信息。

Query TypeSupported Output Modes备注
没有聚合的查询Append, Update不支持完整模式,因为将所有数据保存在结果表中是不可行的。
有聚合的查询:使用watermark对event-time进行聚合Append, Update, Complete附加模式使用watermark 来降低旧聚合状态。 但是,窗口化聚合的输出会延迟“withWatermark()”中指定的晚期阈值,因为模式语义可以在结果表中定义后才能将结果表添加到结果表中(即在watermark 被交叉之后)。 有关详细信息,请参阅后期数据部分。更新模式使用水印去掉旧的聚合状态。完全模式不会丢弃旧的聚合状态,因为根据定义,此模式保留结果表中的所有数据。
有聚合的查询:其他聚合Complete, Update由于没有定义watermark (仅在其他类别中定义),旧的聚合状态不会被丢弃。不支持附加模式,因为聚合可以更新,从而违反了此模式的语义。

Output Sinks

有几种类型的内置输出sinks

  • File sink - Stores the output to a directory
  • Kafka sink - Stores the output to one or more topics in Kafka.
  • Foreach sink - Runs arbitrary(任意) computation on the records in the output.
  • Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger
sinkSupported Output ModesOptionsFault-tolerantNotes
File SinkAppendpath:输出目录的路径,必须指定。 maxFilesPerTrigger:每个触发器中要考虑的最大新文件数(默认值:无最大值) latestFirst:是否首先处理最新的新文件,当有大量的文件积压(default:false)时很有用 有关特定于文件格式的选项,请参阅DataFrameWriter(Scala / Java / Python)中的相关方法。 例如。 对于“parquet”格式选项请参阅DataFrameWriter.parquet()yes支持对分区表的写入。 按时间划分可能有用。
Foreach SinkAppend, Update, CompeleteNone取决于ForeachWriter的实现更多细节在下一节
Console SinkAppend, Update, CompletenumRows:每次触发打印的行数(默认值:20)truncate:输出太长是否截断(默认值:true)no
Memory SinkAppend, CompleteNone否。但在Complete模式下,重新启动的查询将重新创建整个表。查询名就是表名
package com.baizhi

import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.streaming.OutputMode
import redis.clients.jedis.Jedis

/**
  * 如何通过不同的sink构建结构化流计算结构的输出
  */
object SparkStructuredStreamingForOutputSink {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("input source")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._


    //--------------------------------kafka【重点】-----------------------------------
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "HadoopNode01:9092,HadoopNode02:9092,HadoopNode03:9092")
      .option("startingOffsets", """{"bz":{"0":-2}}""") // 指定消费baizhi topic的0号分区的earliest消费方式
      // 指定偏移量消费  In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
      .option("subscribe", "bz")
      .load

    // kafka record 转换为要求的类型
    //    val ds = df
    //      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS INT)", "CAST(timestamp AS LONG)")
    //      .as[(String, String, String, Int, Long)] // key value topic partition Long
    //
    //    ds.createOrReplaceTempView("t_kafka")
    //
    //    val text = spark.sql("select key as k,value as v,topic as t,partition as p, timestamp as ts from t_kafka")
    //--------------------------------kafka【重点】-----------------------------------

    //    text
    //      .writeStream
    //      .format("console")
    //      .outputMode(OutputMode.Append())
    //      .start()
    //      .awaitTermination()

    //===================================文件【输出模式:只支持Append】===========================================
    //    text
    //      .writeStream
    //      .format("json") // 文件格式 CSV JSON parquet ORC等
    //      .outputMode(OutputMode.Append())
    //      .option("checkpointLocation","hdfs://Spark:9000/checkpoint1")     // 检查点path 用以故障恢复
    //      .option("path","file:///D://result") // path支持本地和HDFS文件系统路径
    //      .start()
    //      .awaitTermination()

    //    text
    //      .writeStream
    //      .format("csv") // 文件格式 CSV JSON parquet ORC等
    //      .outputMode(OutputMode.Append())
    //      .option("checkpointLocation", "hdfs://Spark:9000/checkpoint2") // 检查点path 用以故障恢复
    //      .option("path", "file:///D://result2") // path支持本地和HDFS文件系统路径
    //      .start()
    //      .awaitTermination()


    //===================================Kafka【输出模式:Append | Updated | Completed】=========================
    //    val ds = df
    //      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS INT)", "CAST(timestamp AS LONG)")
    //      .as[(String, String, String, Int, Long)].flatMap(_._2.split(" ")).map((_, 1)) // key value topic partition Long
    //
    //    ds.createOrReplaceTempView("t_kafka")
    //
    //    val text = spark.sql("select _1 as word,count(_2) as num from t_kafka group by _1")
    //
    //    text
    //      // .selectExpr("CAST(k AS STRING) as key", "CAST(v AS STRING) as value") // 对输出到kafka的数据定义key 和 value信息
    //      .selectExpr("CAST(word AS STRING) as key", "CAST(num AS STRING) as value") // 对输出到kafka的数据定义key 和 value信息
    //      .writeStream
    //      .format("kafka") // 文件格式 CSV JSON parquet ORC等
    //      // .outputMode(OutputMode.Append())
    //      .outputMode(OutputMode.Update())
    //      .option("checkpointLocation", "hdfs://Spark:9000/checkpoint4") // 检查点path 用以故障恢复
    //      .option("kafka.bootstrap.servers", "HadoopNode01:9092,HadoopNode02:9092,HadoopNode03:9092") // kafka集群信息
    //      .option("topic", "result") // 指定计算结果的保存topic
    //      .start()
    //      .awaitTermination()

    /*
      [root@HadoopNode02 kafka_2.11-2.2.0]# bin/kafka-console-consumer.sh --topic result --bootstrap-server HadoopNode01:9092,HadoopNode02:9092,HadoopNode03:9092 --property print.key=true
      */

    //===================================Foreach【输出模式:Append | Updated | Completed】==================================
    // 将计算结果输出到redis中
    //kafka record 转换为要求的类型

    val ds = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS INT)", "CAST(timestamp AS LONG)")
      .as[(String, String, String, Int, Long)].flatMap(_._2.split(" ")).map((_, 1)) // key value topic partition Long

    ds.createOrReplaceTempView("t_kafka")

    val text = spark.sql("select _1 as word,count(_2) as num from t_kafka group by _1")
    text
      .writeStream
      .outputMode(OutputMode.Update())
      .foreach(new ForeachWriter[Row] {
        /**
          * 打开方法
          *
          * @param partitionId 分区序号
          * @param epochId
          * @return Boolean  true 创建一个连接 对这一行数据进行处理操作
          *         false 不会创建连接 跳过这一行数据
          */
        override def open(partitionId: Long, epochId: Long): Boolean = true

        /**
          * 处理方法
          *
          * @param value resultTable 行对象
          */
        override def process(value: Row): Unit = {
          val word = value.getString(0)
          val count = value.getLong(1).toString

          val jedis = new Jedis("Spark", 6379)
          jedis.set(word, count)
          jedis.close()
        }

        override def close(errorOrNull: Throwable): Unit = if (errorOrNull != null) errorOrNull.printStackTrace()

      }) // 对resultTable中的每一行记录应用写出规则
      .option("checkpointLocation", "hdfs://Spark:9000/checkpoint4") // 检查点path 用以故障恢复
      .start()
      .awaitTermination()
  }
}

最后

以上就是专注曲奇为你收集整理的spark 结构化流,创建流式DataFrame和流式Datasets的全部内容,希望文章能够帮你解决spark 结构化流,创建流式DataFrame和流式Datasets所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部