将canal采集mysql的日志文件存储于Kafka中,获取日志文件并解析
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41object Demo02Canal { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "master:9092") properties.setProperty("group.id", "test1") //创建kafka的消费者 val flinkKafkaCusumor = new FlinkKafkaConsumer[String]("student.order", new SimpleStringSchema(), properties) flinkKafkaCusumor.setStartFromEarliest() val canalDS: DataStream[String] = env.addSource(flinkKafkaCusumor) val orderDS: DataStream[(String, Double)] = canalDS.map(line => { //将一个json的字符串转成JSON的对象数据 val jsonOBJ: JSONObject = JSON.parseObject(line) val datas: JSONArray = jsonOBJ.getJSONArray("data") val data = datas.getJSONObject(0) val id = data.getString("id") val order_id = data.getString("order_id") val amount = data.getString("amount").toDouble val create_time = data.getString("create_time") //获取类型 val t = jsonOBJ.getString("type") var acc = 0.0 t match { case "UPDATE" => val oldJson = jsonOBJ.getJSONArray("old").getJSONObject(0) val oldAmount = oldJson.getString("amount").toDouble acc = amount - oldAmount case "INSERT" => acc = amount case "DELETE" => acc = -amount } (id, amount) }) orderDS.keyBy(_._1) .sum(1) .print() env.execute() } }
最后
以上就是积极灰狼最近收集整理的关于11.5.2、Canal__json解析的全部内容,更多相关11内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复