我是靠谱客的博主 迷你发箍,最近开发中收集的这篇文章主要介绍Docker部署Canal并将消息推送到RabbitMQ1 Docker部署MySQL2 Docker部署RabbitMQ3 Docker部署Canal4 JAVA代码读取,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
- GitHub:https://github.com/alibaba/canal
- Docker:https://blog.csdn.net/qq_40794973/category_9639934.html
- MySQL读写分离:https://blog.csdn.net/qq_40794973/article/details/106320629
- https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
- https://github.com/alibaba/canal/wiki/Docker-QuickStart
1 Docker部署MySQL
创建mysql日志目录和配置文件
###
rm -rf /mydata/mysql
###
mkdir -p /mydata/mysql/log /mydata/mysql/data /mydata/mysql/conf
###
vi /mydata/mysql/conf/my.cnf
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
init_connect='SET collation_connection = utf8_unicode_ci'
init_connect='SET NAMES utf8'
character-set-server=utf8
collation-server=utf8_unicode_ci
skip-character-set-client-handshake
skip-name-resolve
### 开启 binlog
log-bin=mysql-bin
### 选择 ROW 模式
binlog-format=ROW
### 配置MySQL replaction需要定义,不要和canal的 slaveId重复
server_id=1
docker启动mysql
docker run
-p 33060:3306
--name mysql_canal
-v /mydata/mysql/log:/var/log/mysql
-v /mydata/mysql/data:/var/lib/mysql
-v /mydata/mysql/conf:/etc/mysql
--env MYSQL_ROOT_HOST=%.%.%.%
--env MYSQL_ROOT_PASSWORD=123456
-d mysql:5.7
连接mysql
jdbc:mysql://121.36.33.154:33060/?serverTimezone=UTC
创建canal账号
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
创建测试数据库
-- 创建数据库
CREATE DATABASE user_db;
USE user_db;
-- 创建测试表
CREATE TABLE `tb_user`
(
`id` INT(10) UNSIGNED AUTO_INCREMENT COMMENT '主键ID',
`username` VARCHAR(20) NOT NULL COMMENT '用户名',
`password` VARCHAR(64) NOT NULL COMMENT '密码',
`name` VARCHAR(20) COMMENT '姓名',
`age` TINYINT UNSIGNED COMMENT '年龄',
`email` VARCHAR(50) COMMENT '邮箱',
PRIMARY KEY `id` (`id`),
UNIQUE KEY `uk_user_username` (`username`)
);
-- 插入测试数据
INSERT INTO `tb_user` (`id`, `username`, `password`, `name`, `age`, `email`)
VALUES ('1', 'zhangsan', '123456', '张三', '18', 'zhangsan@bug.cn'),
('2', 'lisi', '123456', '李四', '20', 'lisi@bug.cn'),
('3', 'wangwu', '123456', '王五', '28', 'wangwu@bug.cn'),
('4', 'zhaoliu', '123456', '赵六', '21', 'zhaoliu@bug.cn'),
('5', 'sunqi', '123456', '孙七', '24', 'sunqi@bug.cn');
2 Docker部署RabbitMQ
docker run
--name rabbitmq_canal
-p 5672:5672
-p 15672:15672
-d rabbitmq:management
默认的登录账户用户名和密码都guest
http://121.36.33.154:15672/
创建Exchanges
创建Queues
绑定
查看
3 Docker部署Canal
注意1.1.5版本后支持rabbitmq
https://hub.docker.com/r/canal/canal-server/tags?page=1&ordering=last_updated
先启动下canal,从里面copy出配置文件
docker run
--name canal_temp
--rm
-d canal/canal-server:latest
拷贝出来的配置文件目录
mkdir -p /mydata/canal/conf
从docker里面拷贝
docker cp canal_temp:/home/admin/canal-server/conf/canal.properties /mydata/canal/conf/
docker cp canal_temp:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf
关闭
docker stop canal_temp
修改instace.properties的这几处位置
vim /mydata/canal/conf/instance.properties
key | value | 例子 |
---|---|---|
canal.instance.master.journal.name | mysql主库链接时起始的binlog文件 | mysql-bin.000001 |
canal.instance.master.position | mysql主库链接时起始的binlog偏移量 | 155 |
dbMessage
修改canal.properties配置
vim /mydata/canal/conf/canal.properties
### tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = 121.36.33.154
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = guest
rabbitmq.password = guest
docker启动canal
docker run
--name canal
-p 11111:11111
-v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
-v /mydata/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
-d canal/canal-server:latest
4 JAVA代码读取
/**
* <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
* <dependency>
* <groupId>com.rabbitmq</groupId>
* <artifactId>amqp-client</artifactId>
* <version>5.6.0</version>
* </dependency>
*/
public class CanalConsumer {
public static Connection connection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.4.42.32");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
return factory.newConnection();
}
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = connection();
// 创建Channel
Channel channel = connection.createChannel();
String queue = "dbMessage";
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后,会自动执行该方法
*
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由key...
* @param properties 配置信息
* @param body 数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
JSONObject jsonObject = JSONObject.parseObject(new String(body));
String pretty = JSON.toJSONString(jsonObject, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteDateUseDateFormat);
System.out.println(pretty);
}
};
/**
* @param queue 队列名称
* @param autoAck 是否自动确认
* @param callback 回调对象
*/
channel.basicConsume(queue, true, consumer);
}
}
{
"data": [
{
"id": "5",
"username": "sunqi55",
"password": "123456",
"name": "孙七",
"age": "24",
"email": "sunqi@bug.cn"
}
],
"database": "user_db",
"es": 1611419308000,
"id": 5,
"isDdl": false,
"mysqlType": {
"id": "int(10) unsigned",
"username": "varchar(20)",
"password": "varchar(64)",
"name": "varchar(20)",
"age": "tinyint(3) unsigned",
"email": "varchar(50)"
},
"old": null,
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"username": 12,
"password": 12,
"name": 12,
"age": -6,
"email": 12
},
"table": "tb_user",
"ts": 1611419308082,
"type": "DELETE"
}
- https://www.cnblogs.com/liuxuebagaomizhe/p/13809317.html
- https://www.cnblogs.com/stayed/p/13542069.html
- https://blog.csdn.net/zyx6a/article/details/111415184
TODO 测试不行,后面完善
public class CanalConsumer {
/**
* 队列
*/
private static final String CANAL_QUEUE_NAME = "channel_sync_user_queue";
/**
* 交换机
*/
public static final String CANAL_EXCHANGE_NAME = "channel_sync_user_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
//创建频道
Channel channel = connection.createChannel();
//声明(创建)队列
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接,只能有一个Consumer监听这个队列
* 参数4:是否在不使用的时候自动删除队列,当没有Consumer时,自动删除
* 参数5:队列其它参数
*/
channel.queueDeclare(CANAL_QUEUE_NAME, true, false, false, null);
/**声明交换机
* 参数1:交换机名称
* 参数2:交换机类型,fanout, topic, direct, headers
*/
channel.exchangeDeclare(CANAL_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//队列绑定交换机
channel.queueBind(CANAL_QUEUE_NAME, CANAL_EXCHANGE_NAME, "user_backup");
// 接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后,会自动执行该方法
*
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由key...
* @param properties 配置信息
* @param body 数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
JSONObject jsonObject = JSONObject.parseObject(new String(body));
String pretty = JSON.toJSONString(jsonObject, SerializerFeature.PrettyFormat, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteDateUseDateFormat);
System.out.println(pretty);
}
};
/**
* @param queue 队列名称
* @param autoAck 是否自动确认
* @param callback 回调对象
*/
channel.basicConsume(CANAL_QUEUE_NAME, true, consumer);
}
}
最后
以上就是迷你发箍为你收集整理的Docker部署Canal并将消息推送到RabbitMQ1 Docker部署MySQL2 Docker部署RabbitMQ3 Docker部署Canal4 JAVA代码读取的全部内容,希望文章能够帮你解决Docker部署Canal并将消息推送到RabbitMQ1 Docker部署MySQL2 Docker部署RabbitMQ3 Docker部署Canal4 JAVA代码读取所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复