我是靠谱客的博主 大方大碗,这篇文章主要介绍Flink消费kafka中的数据(scala版),现在分享给大家,希望可以做个参考。

kafka依赖

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
   <version>1.10.1</version>
</dependency>

核心代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011


import java.util.Properties


/**
* @author shkstart
* @create 2021-07-23 8:06
*/
object Test {
  def main(args: Array[String]): Unit = {
    //1.创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment


    //2.读取kafka中的数据
    //2.1创建配置对象,这里的对象为java的对象
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","hadoop101:9092")    //hadoop101为自己的kafka地址
    properties.setProperty("group.id","consumer-group")
    val kafavalue = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
    //打印
    kafavalue.print()

    //执行
    env.execute()
  }
}

最后

以上就是大方大碗最近收集整理的关于Flink消费kafka中的数据(scala版)的全部内容,更多相关Flink消费kafka中内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(119)

评论列表共有 0 条评论

立即
投稿
返回
顶部