kafka依赖
复制代码
1
2
3
4
5<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.10.1</version> </dependency>
核心代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import java.util.Properties /** * @author shkstart * @create 2021-07-23 8:06 */ object Test { def main(args: Array[String]): Unit = { //1.创建运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.读取kafka中的数据 //2.1创建配置对象,这里的对象为java的对象 val properties = new Properties() properties.setProperty("bootstrap.servers","hadoop101:9092") //hadoop101为自己的kafka地址 properties.setProperty("group.id","consumer-group") val kafavalue = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties)) //打印 kafavalue.print() //执行 env.execute() } }
最后
以上就是大方大碗最近收集整理的关于Flink消费kafka中的数据(scala版)的全部内容,更多相关Flink消费kafka中内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复