我是靠谱客的博主 健忘铃铛,最近开发中收集的这篇文章主要介绍基于canal+kafka+flink实现实时增量同步4:kafka消息入库到MySQL功能演示一、新建三个表二、要实现的功能说明三、功能演示,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、新建三个表

(1)主表user_id。它包含有主键user_id

CREATE TABLE `user_id`  (
  `user_id` int(11) NOT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `user_info_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`user_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

(2)从表。

CREATE TABLE `user_info`  (
  `sex` int(1) DEFAULT NULL,
  `user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `tele` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `user_info_id` int(11) NOT NULL,
  PRIMARY KEY (`user_info_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

(3)合并后的表user

CREATE TABLE `user`  (
  `id` int(11) NOT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `sex` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  `tele` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

二、要实现的功能说明

需要接收kafka推送过来的的增量消息。分别入库对应库的表(表名相同),比如我要从A库的表入库到B库的表,A库的表CRUD一条消息后会推动到kafka,B会从kafka消费这条消息。从而入库到B库对用的表(这就是增量同步消息),并且,需要根据kafka上的消息,确定哪些表需要合并成新的具体的表,我们定义为C,C的字段包含了来源于A的和B的

我们在Test里边写死kafka消息进行测试:假设(1)下边是从kafka消费的binlog日志消息。也就是推送过来的两条消息分别是user_info_id表的和user_id表的。我们现在要把这两条消息入库到我们新的库新的表(表名相同)。并且,合并成一个新的表user。这个合并规则以及kafka上的映射关系请看(2)

(1)op_type为“I”代表新增,“U”代表修改 ,“D”代表删除。

        messageList.add("{n" +
                "t"after": {n" +
                "tt"user_info_id": "4",n" +
                "tt"sex": "1",n" +
                "tt"user_name": "admin",n" +
                "tt"tele": "183xxx",n" +
                "tt"password": "123"n" +
                "t},n" +
                "t"before": {},n" +
                "t"op_ts": "1",n" +
                "t"op_type": "I",n" +
                "t"primary_keys": ["user_info_id"],n" +
                "t"schema": "crm1",n" +
                "t"table": "user_info"n" +
                "}");
        messageList.add("{n" +
                "t"after": {n" +
                "tt"user_id": "2",n" +
                "tt"name": "rui",n" +
                "tt"age": "100",n" +
                "tt"user_info_id": "4"n" +
                "t},n" +
                "t"before": {},n" +
                "t"op_ts": "1",n" +
                "t"op_type": "I",n" +
                "t"primary_keys": ["user_id"],n" +
                "t"schema": "crm1",n" +
                "t"table": "user_id"n" +
                "}");

(2)从kafka的这个映射关系可以看到,user_id-->user_id表、user_info-->user_info表、user_id+user_info-->user表

       ktssubscriberJsonStr = "{n" +
               "  "url": "jdbc:mysql://xx:3306/test?rewriteBatchedStatements=true",n" +
               "  "username": "dataopen",n" +
               "  "password": "dataopen",n" +
               "  "type": "rds",n" +
               "  "kafkaBrokers": "xx:9011",n" +
               "  "kafkaTopic": "testtopic3",n" +
               "  "kafkaPartition": "",n" +
               "  "kafkaOffsetReset": "latest",n" +
               "  "kafkaConsumerGroup": "",n" +
               "  "kafkaOffsetsForTimes": "",n" +
               "  "kafkaUserName": "",n" +
               "  "kafkaPassword": "",n" +
               "  "dataFormatType": "JSON",n" +
               "  "dataFormat": "OGG",n" +
               "  "relation": [n" +
               "    {n" +
               "      "partition": "",n" +
               "      "sourceSchema": "crm1",n" +
               "      "sourceTable": "user_id",n" +
               "      "targetSchema": "test",n" +
               "      "targetTable": "user_id",n" +
               "      "operaType": "I,U,D",n" +
               "      "where": "",n" +
               "      "tableType": "sub",n" +
               "      "column": [n" +
               "        {n" +
               "          "sourceColumn": "user_id",n" +
               "          "columnCode": "user_id",n" +
               "          "columnType": "int",n" +
               "          "columnValue": "user_id"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "name",n" +
               "          "columnCode": "name",n" +
               "          "columnType": "varchar",n" +
               "          "columnValue": "name"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "age",n" +
               "          "columnCode": "age",n" +
               "          "columnType": "int",n" +
               "          "columnValue": "age"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "user_info_id",n" +
               "          "columnCode": "user_info_id",n" +
               "          "columnType": "int",n" +
               "          "columnValue": "user_info_id"n" +
               "        }n" +
               "      ]n" +
               "    },n" +
               "    {n" +
               "      "partition": "",n" +
               "      "sourceSchema": "crm1",n" +
               "      "sourceTable": "user_info",n" +
               "      "targetSchema": "test",n" +
               "      "targetTable": "user_info",n" +
               "      "operaType": "I,U,D",n" +
               "      "where": "",n" +
               "      "tableType": "sub",n" +
               "      "column": [n" +
               "        {n" +
               "          "sourceColumn": "user_info_id",n" +
               "          "columnCode": "user_info_id",n" +
               "          "columnType": "int",n" +
               "          "columnValue": "user_info_id"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "sex",n" +
               "          "columnCode": "sex",n" +
               "          "columnType": "int",n" +
               "          "columnValue": "sex"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "user_name",n" +
               "          "columnCode": "user_name",n" +
               "          "columnType": "varchar",n" +
               "          "columnValue": "user_name"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "password",n" +
               "          "columnCode": "password",n" +
               "          "columnType": "varchar",n" +
               "          "columnValue": "password"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "tele",n" +
               "          "columnCode": "tele",n" +
               "          "columnType": "varchar",n" +
               "          "columnValue": "tele"n" +
               "        }n" +
               "      ]n" +
               "    },n" +
               "    {n" +
               "      "partition": "",n" +
               "      "sourceSchema": "crm1",n" +
               "      "sourceTable": "user_id",n" +
               "      "targetSchema": "test",n" +
               "      "targetTable": "user",n" +
               "      "operaType": "I,U,D",n" +
               "      "where": "",n" +
               "      "tableType": "main",n" +
               "      "column": [n" +
               "        {n" +
               "          "sourceColumn": "user_id",n" +
               "          "columnCode": "id",n" +
               "          "columnType": "int",n" +
               "          "columnValue": "user_id",n" +
               "          "isPrimary": "Y"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "name",n" +
               "          "columnCode": "name",n" +
               "          "columnType": "varchar",n" +
               "          "columnValue": "name"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "age",n" +
               "          "columnCode": "age",n" +
               "          "columnType": "int",n" +
               "          "columnValue": "age"n" +
               "        }n" +
               "      ]n" +
               "    },n" +
               "    {n" +
               "      "partition": "",n" +
               "      "sourceSchema": "crm1",n" +
               "      "sourceTable": "user_info",n" +
               "      "targetSchema": "test",n" +
               "      "targetTable": "user",n" +
               "      "operaType": "I,U,D",n" +
               "      "where": "",n" +
               "      "tableType": "sub",n" +
               "      "column": [n" +
               "        {n" +
               "          "sourceColumn": "user_info_id",n" +
               "          "columnCode": "user_info_id",n" +
               "          "columnType": "int",n" +
               "          "isPrimary": "Y",n" +
               "          "columnValue": "user_info_id"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "sex",n" +
               "          "columnCode": "sex",n" +
               "          "columnType": "int",n" +
               "          "columnValue": "sex"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "user_name",n" +
               "          "columnCode": "user_name",n" +
               "          "columnType": "varchar",n" +
               "          "columnValue": "user_name"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "password",n" +
               "          "columnCode": "password",n" +
               "          "columnType": "varchar",n" +
               "          "columnValue": "password"n" +
               "        },n" +
               "        {n" +
               "          "sourceColumn": "tele",n" +
               "          "columnCode": "tele",n" +
               "          "columnType": "varchar",n" +
               "          "columnValue": "tele"n" +
               "        }n" +
               "      ]n" +
               "    }n" +
               "  ]n" +
               "}n" +
               "n";

三、功能演示

(1)我们先查看目前三个表都为空:

(2)执行代码后

user表:可以看到user新增了一条数分别来自user_id表和user_info表,就是(二)上边的kafka消息

user_id表:

user_info表:

(2)修改一下user表的数据:再次执行,发现原来已经有id为2的数据(对应user_id主表的user_id主键字段),则自动替换为更新。并且user_id表和user_info表的数据不变,因为已经有了原来的数据,会自动根据主键去判断,即使是新增的类型,也会自动替换为修改

执行代码后:

总结:

以上就是kafka消息入库的功能,配合上一篇文章讲的,则可以实现Canal+kafka+flink/SparkStreaming实现一个完整的增量同步功能,并且可以定制同步合并到具体表。后边将分析代码部分......

最后

以上就是健忘铃铛为你收集整理的基于canal+kafka+flink实现实时增量同步4:kafka消息入库到MySQL功能演示一、新建三个表二、要实现的功能说明三、功能演示的全部内容,希望文章能够帮你解决基于canal+kafka+flink实现实时增量同步4:kafka消息入库到MySQL功能演示一、新建三个表二、要实现的功能说明三、功能演示所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部