概述
kafka依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
核心代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.util.Properties
/**
* @author shkstart
* @create 2021-07-23 8:06
*/
object Test {
def main(args: Array[String]): Unit = {
//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.读取kafka中的数据
//2.1创建配置对象,这里的对象为java的对象
val properties = new Properties()
properties.setProperty("bootstrap.servers","hadoop101:9092") //hadoop101为自己的kafka地址
properties.setProperty("group.id","consumer-group")
val kafavalue = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
//打印
kafavalue.print()
//执行
env.execute()
}
}
最后
以上就是大方大碗为你收集整理的Flink消费kafka中的数据(scala版)的全部内容,希望文章能够帮你解决Flink消费kafka中的数据(scala版)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复