我是靠谱客的博主 美丽月饼,最近开发中收集的这篇文章主要介绍使用canal同步mysql数据库信息到RabbitMQ1、canal简介2、工作原理3、环境准备4、数据同步测试5、总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1、canal简介

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

github:https://github.com/alibaba/canal/

gitee:https://gitee.com/mirrors/canal

2、工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

3、环境准备

3.1 首先安装好mysql及rabbitmq

mysql比较简单,这里不介绍,rabbitmq安装及使用可以参考:

RabbitMQ安装及简单使用。

3.2 mysql需开启binlog

查看是否开启binlog

SHOW VARIABLES LIKE '%log_bin%'

如果log_bin的值为OFF是未开启,为ON是已开启。

未开启的话可以修改/etc/my.cnf 开启binlog

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

配置好后重启mysql。

创建用于同步的mysql账号:

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

3.3 rabbitmq配置

在virtualHost:/  下新增Exchanges: canal.topic

 新增队列:test.queue,  绑定canal.topic, RoutingKey:test.routingKey

3.4 canal下载及配置

https://github.com/alibaba/canal/releases/tag/canal-1.1.5

 

目前最新版本为1.1.5,支持将数据发送至rabbitmq。

目录如下:

修改conf目录下的canal.properties 

canal.serverMode = rabbitMQ

rabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
# rabbitmq中新建的 Exchange
rabbitmq.exchange = canal.topic
rabbitmq.username = guest
rabbitmq.password = guest

修改conf/example下的instance.properties

canal.instance.master.address=127.0.0.1:3306

# mysql中配置的用于同步的canal用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

# rabbitmq中配置的 绑定的 routingkey
canal.mq.topic=test.routingKey

修改完成后,启动bin下的startup.bat(windows下,linux下启动startup.sh),查看logs/canal下canal.log, 如下内容说明启动成功:

2021-07-10 18:30:18.088 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2021-07-10 18:30:18.147 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2021-07-10 18:30:18.320 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2021-07-10 18:30:18.899 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.104(192.168.0.104):11111]
2021-07-10 18:30:20.723 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

至此,环境准备完成。

4、数据同步测试

4.1 新建数据库

新建测试数据库:canal

可以看到rabbitmq,test.queue队列中收到数据:

{
    "data": null,
    "database": "`canal`",
    "es": 1625913686000,
    "id": 6,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "CREATE DATABASE `canal` CHARACTER SET utf8",
    "sqlType": null,
    "table": "",
    "ts": 1625913686928,
    "type": "QUERY"
}

4.2 新增表

新建测试表:tb_user

CREATE TABLE `tb_user` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(30) NOT NULL DEFAULT '',
  `address` varchar(200) DEFAULT NULL,
  `email` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

mq中收到数据:

{
    "data": null,
    "database": "canal",
    "es": 1625914138000,
    "id": 7,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "CREATE TABLE `tb_user` (rn`id`  int(11) NOT NULL AUTO_INCREMENT ,rn`name`  varchar(30) NOT NULL DEFAULT '' ,rn`address`  varchar(200) NULL ,rn`email`  varchar(100) NULL ,rnPRIMARY KEY (`id`)rn)",
    "sqlType": null,
    "table": "tb_user",
    "ts": 1625914138420,
    "type": "CREATE"
}

4.3 新增数据记录

在表中插入一条数据:

INSERT INTO `canal`.`tb_user` (`id`, `name`, `address`, `email`) VALUES ('1', 'jason', '安徽合肥', 'jason@qq.com');

{
    "data": [
        {
            "id": "1",
            "name": "jason",
            "address": "安徽合肥",
            "email": "jason@qq.com"
        }
    ],
    "database": "canal",
    "es": 1625914305000,
    "id": 8,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "name": "varchar(30)",
        "address": "varchar(200)",
        "email": "varchar(100)"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "address": 12,
        "email": 12
    },
    "table": "tb_user",
    "ts": 1625914305197,
    "type": "INSERT"
}

通过 "type": "INSERT" 及data可以获取到新增的数据

4.4 更新数据

更新 jason 的 address 信息

{
    "data": [
        {
            "id": "1",
            "name": "jason",
            "address": "安徽合肥高新区",
            "email": "jason@qq.com"
        }
    ],
    "database": "canal",
    "es": 1625914494000,
    "id": 9,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "name": "varchar(30)",
        "address": "varchar(200)",
        "email": "varchar(100)"
    },
    "old": [
        {
            "address": "安徽合肥"
        }
    ],
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "address": 12,
        "email": 12
    },
    "table": "tb_user",
    "ts": 1625914494755,
    "type": "UPDATE"
}

通过 "type": "UPDATE",data及old中的信息 可以获取到更新的数据

4.5 删除数据

{
    "data": [
        {
            "id": "1",
            "name": "jason",
            "address": "安徽合肥高新区",
            "email": "jason@qq.com"
        }
    ],
    "database": "canal",
    "es": 1625914783000,
    "id": 10,
    "isDdl": false,
    "mysqlType": {
        "id": "int(11)",
        "name": "varchar(30)",
        "address": "varchar(200)",
        "email": "varchar(100)"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "address": 12,
        "email": 12
    },
    "table": "tb_user",
    "ts": 1625914783109,
    "type": "DELETE"
}

通过 "type": "DELETE",data 就可以判断删除的数据

4.6 新增字段

新增remark字段

{
    "data": null,
    "database": "canal",
    "es": 1625914881000,
    "id": 11,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "ALTER TABLE `tb_user`rnADD COLUMN `remark`  varchar(100) NULL AFTER `email`",
    "sqlType": null,
    "table": "tb_user",
    "ts": 1625914881179,
    "type": "ALTER"
}

通过  "type": "ALTER" 及 sql即可知道 新增了remark列。

4.7 删除字段

删除email字段

{
    "data": null,
    "database": "canal",
    "es": 1625915072000,
    "id": 12,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "ALTER TABLE `tb_user`rnDROP COLUMN `email`",
    "sqlType": null,
    "table": "tb_user",
    "ts": 1625915072617,
    "type": "ALTER"
}

通过 "type": "ALTER",sql即可知道删除了email列。

5、总结

mysql中变更的数据都可以通过canal发送到rabbitmq,只要监听rabbitmq队列,就可以获取到变更的数据,进行业务处理,可以写入redis,刷新缓存,也可以同步到elasticsearch等等。

最后

以上就是美丽月饼为你收集整理的使用canal同步mysql数据库信息到RabbitMQ1、canal简介2、工作原理3、环境准备4、数据同步测试5、总结的全部内容,希望文章能够帮你解决使用canal同步mysql数据库信息到RabbitMQ1、canal简介2、工作原理3、环境准备4、数据同步测试5、总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部