概述
一、新建三个表
(1)主表user_id。它包含有主键user_id
CREATE TABLE `user_id` (
`user_id` int(11) NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`user_info_id` int(11) DEFAULT NULL,
PRIMARY KEY (`user_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
(2)从表。
CREATE TABLE `user_info` (
`sex` int(1) DEFAULT NULL,
`user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`tele` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`user_info_id` int(11) NOT NULL,
PRIMARY KEY (`user_info_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
(3)合并后的表user
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`sex` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`tele` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
二、要实现的功能说明
需要接收kafka推送过来的的增量消息。分别入库对应库的表(表名相同),比如我要从A库的表入库到B库的表,A库的表CRUD一条消息后会推动到kafka,B会从kafka消费这条消息。从而入库到B库对用的表(这就是增量同步消息),并且,需要根据kafka上的消息,确定哪些表需要合并成新的具体的表,我们定义为C,C的字段包含了来源于A的和B的。
我们在Test里边写死kafka消息进行测试:假设(1)下边是从kafka消费的binlog日志消息。也就是推送过来的两条消息分别是user_info_id表的和user_id表的。我们现在要把这两条消息入库到我们新的库新的表(表名相同)。并且,合并成一个新的表user。这个合并规则以及kafka上的映射关系请看(2)
(1)op_type为“I”代表新增,“U”代表修改 ,“D”代表删除。
messageList.add("{n" +
"t"after": {n" +
"tt"user_info_id": "4",n" +
"tt"sex": "1",n" +
"tt"user_name": "admin",n" +
"tt"tele": "183xxx",n" +
"tt"password": "123"n" +
"t},n" +
"t"before": {},n" +
"t"op_ts": "1",n" +
"t"op_type": "I",n" +
"t"primary_keys": ["user_info_id"],n" +
"t"schema": "crm1",n" +
"t"table": "user_info"n" +
"}");
messageList.add("{n" +
"t"after": {n" +
"tt"user_id": "2",n" +
"tt"name": "rui",n" +
"tt"age": "100",n" +
"tt"user_info_id": "4"n" +
"t},n" +
"t"before": {},n" +
"t"op_ts": "1",n" +
"t"op_type": "I",n" +
"t"primary_keys": ["user_id"],n" +
"t"schema": "crm1",n" +
"t"table": "user_id"n" +
"}");
(2)从kafka的这个映射关系可以看到,user_id-->user_id表、user_info-->user_info表、user_id+user_info-->user表
ktssubscriberJsonStr = "{n" +
" "url": "jdbc:mysql://xx:3306/test?rewriteBatchedStatements=true",n" +
" "username": "dataopen",n" +
" "password": "dataopen",n" +
" "type": "rds",n" +
" "kafkaBrokers": "xx:9011",n" +
" "kafkaTopic": "testtopic3",n" +
" "kafkaPartition": "",n" +
" "kafkaOffsetReset": "latest",n" +
" "kafkaConsumerGroup": "",n" +
" "kafkaOffsetsForTimes": "",n" +
" "kafkaUserName": "",n" +
" "kafkaPassword": "",n" +
" "dataFormatType": "JSON",n" +
" "dataFormat": "OGG",n" +
" "relation": [n" +
" {n" +
" "partition": "",n" +
" "sourceSchema": "crm1",n" +
" "sourceTable": "user_id",n" +
" "targetSchema": "test",n" +
" "targetTable": "user_id",n" +
" "operaType": "I,U,D",n" +
" "where": "",n" +
" "tableType": "sub",n" +
" "column": [n" +
" {n" +
" "sourceColumn": "user_id",n" +
" "columnCode": "user_id",n" +
" "columnType": "int",n" +
" "columnValue": "user_id"n" +
" },n" +
" {n" +
" "sourceColumn": "name",n" +
" "columnCode": "name",n" +
" "columnType": "varchar",n" +
" "columnValue": "name"n" +
" },n" +
" {n" +
" "sourceColumn": "age",n" +
" "columnCode": "age",n" +
" "columnType": "int",n" +
" "columnValue": "age"n" +
" },n" +
" {n" +
" "sourceColumn": "user_info_id",n" +
" "columnCode": "user_info_id",n" +
" "columnType": "int",n" +
" "columnValue": "user_info_id"n" +
" }n" +
" ]n" +
" },n" +
" {n" +
" "partition": "",n" +
" "sourceSchema": "crm1",n" +
" "sourceTable": "user_info",n" +
" "targetSchema": "test",n" +
" "targetTable": "user_info",n" +
" "operaType": "I,U,D",n" +
" "where": "",n" +
" "tableType": "sub",n" +
" "column": [n" +
" {n" +
" "sourceColumn": "user_info_id",n" +
" "columnCode": "user_info_id",n" +
" "columnType": "int",n" +
" "columnValue": "user_info_id"n" +
" },n" +
" {n" +
" "sourceColumn": "sex",n" +
" "columnCode": "sex",n" +
" "columnType": "int",n" +
" "columnValue": "sex"n" +
" },n" +
" {n" +
" "sourceColumn": "user_name",n" +
" "columnCode": "user_name",n" +
" "columnType": "varchar",n" +
" "columnValue": "user_name"n" +
" },n" +
" {n" +
" "sourceColumn": "password",n" +
" "columnCode": "password",n" +
" "columnType": "varchar",n" +
" "columnValue": "password"n" +
" },n" +
" {n" +
" "sourceColumn": "tele",n" +
" "columnCode": "tele",n" +
" "columnType": "varchar",n" +
" "columnValue": "tele"n" +
" }n" +
" ]n" +
" },n" +
" {n" +
" "partition": "",n" +
" "sourceSchema": "crm1",n" +
" "sourceTable": "user_id",n" +
" "targetSchema": "test",n" +
" "targetTable": "user",n" +
" "operaType": "I,U,D",n" +
" "where": "",n" +
" "tableType": "main",n" +
" "column": [n" +
" {n" +
" "sourceColumn": "user_id",n" +
" "columnCode": "id",n" +
" "columnType": "int",n" +
" "columnValue": "user_id",n" +
" "isPrimary": "Y"n" +
" },n" +
" {n" +
" "sourceColumn": "name",n" +
" "columnCode": "name",n" +
" "columnType": "varchar",n" +
" "columnValue": "name"n" +
" },n" +
" {n" +
" "sourceColumn": "age",n" +
" "columnCode": "age",n" +
" "columnType": "int",n" +
" "columnValue": "age"n" +
" }n" +
" ]n" +
" },n" +
" {n" +
" "partition": "",n" +
" "sourceSchema": "crm1",n" +
" "sourceTable": "user_info",n" +
" "targetSchema": "test",n" +
" "targetTable": "user",n" +
" "operaType": "I,U,D",n" +
" "where": "",n" +
" "tableType": "sub",n" +
" "column": [n" +
" {n" +
" "sourceColumn": "user_info_id",n" +
" "columnCode": "user_info_id",n" +
" "columnType": "int",n" +
" "isPrimary": "Y",n" +
" "columnValue": "user_info_id"n" +
" },n" +
" {n" +
" "sourceColumn": "sex",n" +
" "columnCode": "sex",n" +
" "columnType": "int",n" +
" "columnValue": "sex"n" +
" },n" +
" {n" +
" "sourceColumn": "user_name",n" +
" "columnCode": "user_name",n" +
" "columnType": "varchar",n" +
" "columnValue": "user_name"n" +
" },n" +
" {n" +
" "sourceColumn": "password",n" +
" "columnCode": "password",n" +
" "columnType": "varchar",n" +
" "columnValue": "password"n" +
" },n" +
" {n" +
" "sourceColumn": "tele",n" +
" "columnCode": "tele",n" +
" "columnType": "varchar",n" +
" "columnValue": "tele"n" +
" }n" +
" ]n" +
" }n" +
" ]n" +
"}n" +
"n";
三、功能演示
(1)我们先查看目前三个表都为空:
(2)执行代码后
user表:可以看到user新增了一条数分别来自user_id表和user_info表,就是(二)上边的kafka消息
user_id表:
user_info表:
(2)修改一下user表的数据:再次执行,发现原来已经有id为2的数据(对应user_id主表的user_id主键字段),则自动替换为更新。并且user_id表和user_info表的数据不变,因为已经有了原来的数据,会自动根据主键去判断,即使是新增的类型,也会自动替换为修改
执行代码后:
总结:
以上就是kafka消息入库的功能,配合上一篇文章讲的,则可以实现Canal+kafka+flink/SparkStreaming实现一个完整的增量同步功能,并且可以定制同步合并到具体表。后边将分析代码部分......
最后
以上就是健忘铃铛为你收集整理的基于canal+kafka+flink实现实时增量同步4:kafka消息入库到MySQL功能演示一、新建三个表二、要实现的功能说明三、功能演示的全部内容,希望文章能够帮你解决基于canal+kafka+flink实现实时增量同步4:kafka消息入库到MySQL功能演示一、新建三个表二、要实现的功能说明三、功能演示所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复