我是靠谱客的博主 心灵美路灯,最近开发中收集的这篇文章主要介绍Structure Streaming-Kafka source,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

导入依赖:

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
import org.apache.spark.sql.SparkSession

object KafkaSource {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("KafkaSource")
      .getOrCreate()
    import spark.implicits._

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092")
      .option("subscribe","topic1")
      .load


    df.writeStream
      .format("console")
      .outputMode("update")
      .option("truncate",false)
      .start
      .awaitTermination()
  }

}

在这里插入图片描述
在这里插入图片描述

import org.apache.spark.sql.SparkSession

object KafkaSource {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("KafkaSource")
      .getOrCreate()
    import spark.implicits._

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092")
      .option("subscribe","topic1")
      .load
      //  .select("value")
        .selectExpr("cast(value as string)")
        .as[String]
        .flatMap(_.split(" "))
        .groupBy("value")
        .count()

    df.writeStream
      .format("console")
      .outputMode("update")
      .option("truncate",false)
      .start
      .awaitTermination()
  }

}

在这里插入图片描述
在这里插入图片描述

最后

以上就是心灵美路灯为你收集整理的Structure Streaming-Kafka source的全部内容,希望文章能够帮你解决Structure Streaming-Kafka source所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部