我是靠谱客的博主 迷你发箍,最近开发中收集的这篇文章主要介绍Docker部署Canal并将消息推送到RabbitMQ1 Docker部署MySQL2 Docker部署RabbitMQ3 Docker部署Canal4 JAVA代码读取,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

  • GitHub:https://github.com/alibaba/canal
  • Docker:https://blog.csdn.net/qq_40794973/category_9639934.html
  • MySQL读写分离:https://blog.csdn.net/qq_40794973/article/details/106320629
  • https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart 
  • https://github.com/alibaba/canal/wiki/Docker-QuickStart

1 Docker部署MySQL

创建mysql日志目录和配置文件

###
rm -rf /mydata/mysql
###
mkdir -p /mydata/mysql/log /mydata/mysql/data /mydata/mysql/conf
###
vi /mydata/mysql/conf/my.cnf
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
init_connect='SET collation_connection = utf8_unicode_ci' 
init_connect='SET NAMES utf8' 
character-set-server=utf8
collation-server=utf8_unicode_ci
skip-character-set-client-handshake
skip-name-resolve

### 开启 binlog
log-bin=mysql-bin
### 选择 ROW 模式
binlog-format=ROW
### 配置MySQL replaction需要定义,不要和canal的 slaveId重复
server_id=1

docker启动mysql

docker run 
-p 33060:3306 
--name mysql_canal 
-v /mydata/mysql/log:/var/log/mysql 
-v /mydata/mysql/data:/var/lib/mysql 
-v /mydata/mysql/conf:/etc/mysql 
--env MYSQL_ROOT_HOST=%.%.%.% 
--env MYSQL_ROOT_PASSWORD=123456 
-d mysql:5.7

连接mysql 

jdbc:mysql://121.36.33.154:33060/?serverTimezone=UTC

创建canal账号  

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

创建测试数据库

-- 创建数据库
CREATE DATABASE user_db;
USE user_db;
-- 创建测试表
CREATE TABLE `tb_user`
(
    `id`       INT(10) UNSIGNED AUTO_INCREMENT COMMENT '主键ID',
    `username` VARCHAR(20) NOT NULL COMMENT '用户名',
    `password` VARCHAR(64) NOT NULL COMMENT '密码',
    `name`     VARCHAR(20) COMMENT '姓名',
    `age`      TINYINT UNSIGNED COMMENT '年龄',
    `email`    VARCHAR(50) COMMENT '邮箱',
    PRIMARY KEY `id` (`id`),
    UNIQUE KEY `uk_user_username` (`username`)
);
-- 插入测试数据
INSERT INTO `tb_user` (`id`, `username`, `password`, `name`, `age`, `email`)
VALUES ('1', 'zhangsan', '123456', '张三', '18', 'zhangsan@bug.cn'),
       ('2', 'lisi', '123456', '李四', '20', 'lisi@bug.cn'),
       ('3', 'wangwu', '123456', '王五', '28', 'wangwu@bug.cn'),
       ('4', 'zhaoliu', '123456', '赵六', '21', 'zhaoliu@bug.cn'),
       ('5', 'sunqi', '123456', '孙七', '24', 'sunqi@bug.cn');

2 Docker部署RabbitMQ

docker run 
--name rabbitmq_canal 
-p 5672:5672 
-p 15672:15672 
-d rabbitmq:management

默认的登录账户用户名和密码都guest 

http://121.36.33.154:15672/

创建Exchanges

创建Queues

绑定

查看


3 Docker部署Canal

注意1.1.5版本后支持rabbitmq

 https://hub.docker.com/r/canal/canal-server/tags?page=1&ordering=last_updated

先启动下canal,从里面copy出配置文件 

docker run 
--name canal_temp 
--rm 
-d canal/canal-server:latest 

拷贝出来的配置文件目录 

mkdir -p /mydata/canal/conf

从docker里面拷贝 

docker cp canal_temp:/home/admin/canal-server/conf/canal.properties /mydata/canal/conf/
docker cp canal_temp:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf

关闭

docker stop canal_temp

修改instace.properties的这几处位置

vim /mydata/canal/conf/instance.properties

keyvalue例子
canal.instance.master.journal.namemysql主库链接时起始的binlog文件mysql-bin.000001
canal.instance.master.positionmysql主库链接时起始的binlog偏移量155

dbMessage

修改canal.properties配置

vim /mydata/canal/conf/canal.properties
### tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ

##################################################
#########         RabbitMQ           #############
##################################################
rabbitmq.host = 121.36.33.154
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = guest
rabbitmq.password = guest

docker启动canal 

docker run 
--name canal 
-p 11111:11111 
-v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties 
-v /mydata/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties 
-d canal/canal-server:latest 

4 JAVA代码读取

/**
 * <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
 * <dependency>
 *     <groupId>com.rabbitmq</groupId>
 *     <artifactId>amqp-client</artifactId>
 *     <version>5.6.0</version>
 * </dependency>
 */
public class CanalConsumer {
    public static Connection connection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("10.4.42.32");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        return factory.newConnection();
    }
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = connection();
        // 创建Channel
        Channel channel = connection.createChannel();
        String queue = "dbMessage";
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后,会自动执行该方法
             *
             * @param consumerTag  标识
             * @param envelope     获取一些信息,交换机,路由key...
             * @param properties   配置信息
             * @param body         数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                JSONObject jsonObject = JSONObject.parseObject(new String(body));
                String pretty = JSON.toJSONString(jsonObject, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteDateUseDateFormat);
                System.out.println(pretty);
            }
        };
        /**
         * @param queue    队列名称
         * @param autoAck  是否自动确认
         * @param callback 回调对象
         */
        channel.basicConsume(queue, true, consumer);
    }
}
{
    "data": [
        {
            "id": "5",
            "username": "sunqi55",
            "password": "123456",
            "name": "孙七",
            "age": "24",
            "email": "sunqi@bug.cn"
        }
    ],
    "database": "user_db",
    "es": 1611419308000,
    "id": 5,
    "isDdl": false,
    "mysqlType": {
        "id": "int(10) unsigned",
        "username": "varchar(20)",
        "password": "varchar(64)",
        "name": "varchar(20)",
        "age": "tinyint(3) unsigned",
        "email": "varchar(50)"
    },
    "old": null,
    "pkNames": [
        "id"
    ],
    "sql": "",
    "sqlType": {
        "id": 4,
        "username": 12,
        "password": 12,
        "name": 12,
        "age": -6,
        "email": 12
    },
    "table": "tb_user",
    "ts": 1611419308082,
    "type": "DELETE"
}

  • https://www.cnblogs.com/liuxuebagaomizhe/p/13809317.html
  • https://www.cnblogs.com/stayed/p/13542069.html
  • https://blog.csdn.net/zyx6a/article/details/111415184

TODO 测试不行,后面完善 

public class CanalConsumer {
    /**
     * 队列
     */
    private static final String CANAL_QUEUE_NAME = "channel_sync_user_queue";
    /**
     * 交换机
     */
    public static final String CANAL_EXCHANGE_NAME = "channel_sync_user_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明(创建)队列
        /**
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接,只能有一个Consumer监听这个队列
         * 参数4:是否在不使用的时候自动删除队列,当没有Consumer时,自动删除
         * 参数5:队列其它参数
         */
        channel.queueDeclare(CANAL_QUEUE_NAME, true, false, false, null);
        /**声明交换机
         * 参数1:交换机名称
         * 参数2:交换机类型,fanout, topic, direct, headers
         */
        channel.exchangeDeclare(CANAL_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //队列绑定交换机
        channel.queueBind(CANAL_QUEUE_NAME, CANAL_EXCHANGE_NAME, "user_backup");
        // 接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法,当收到消息后,会自动执行该方法
             *
             * @param consumerTag  标识
             * @param envelope     获取一些信息,交换机,路由key...
             * @param properties   配置信息
             * @param body         数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                JSONObject jsonObject = JSONObject.parseObject(new String(body));
                String pretty = JSON.toJSONString(jsonObject, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteDateUseDateFormat);
                System.out.println(pretty);
            }
        };
        /**
         * @param queue    队列名称
         * @param autoAck  是否自动确认
         * @param callback 回调对象
         */
        channel.basicConsume(CANAL_QUEUE_NAME, true, consumer);
    }
}

 

最后

以上就是迷你发箍为你收集整理的Docker部署Canal并将消息推送到RabbitMQ1 Docker部署MySQL2 Docker部署RabbitMQ3 Docker部署Canal4 JAVA代码读取的全部内容,希望文章能够帮你解决Docker部署Canal并将消息推送到RabbitMQ1 Docker部署MySQL2 Docker部署RabbitMQ3 Docker部署Canal4 JAVA代码读取所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部