概述
Canal+Kafka实现mysql与Redis数据同步
一、Canal简介
canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
github:https://github.com/alibaba/canal
二、Canal实现数据同步流程
三、效果演示
数据库db1有张user表。
在数据库中添加数据。
查看redis中的数据。
更改mysql中的数据。
再查看redis中的数据。
删除mysql中的数据。
再查看redis中的数据。
三、开始搭建
需在服务器端搭建好mysql,redis,zookeeper,kafka。
可参考下面我的博客:
mysql:https://blog.csdn.net/qq_43692950/article/details/107731431
redis:https://blog.csdn.net/qq_43692950/article/details/107443155
zookeeper&kafka:https://blog.csdn.net/qq_43692950/article/details/110648852
也可以使用docker快速安装并配置上述软件,可参考:
https://blog.csdn.net/qq_43692950/article/details/111084773
四、开启Mysql binlog文件
修改 my.cnf
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=100 #不能和canal的slaveId重复
重启mysql
service mysqld restart
验证是否已经配置成功
show variables like 'log_bin';
开启之后是ON,否则为OFF。
为Canal创建一个账号。
drop user 'canal'@'%';
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal';
flush privileges;
五、配置CanalServer
- 下载canalserver
https://download.csdn.net/download/qq_43692950/13675809
- 上传至服务器,并解压
- 进入canal根目录,修改 conf/example/instance.properties
修改数据库地址
canal.instance.master.address= 192.168.40.129:3306
修改用户名密码:
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
指定kafka topic
canal.mq.topic=dbtopic
- 修改 conf/canal.properties
指定类型为kafka
canal.serverMode = kafka
指定mq的地址
canal.mq.servers = 192.168.40.129:9092
- 启动canalserver
进入bin
启动:
./startup.sh
查看/logs/example/example.log 日志文件,出现下面效果便启动成功:
六、编写Kafka客户端并自定义添加redis
- 新建SpringBoot项目,添加maven依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
- application.yml
server:
port: 80
spring:
kafka:
bootstrap-servers: 192.168.40.129:9092
consumer:
group-id: kafkaGroup
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
redis:
host: 192.168.40.129
port: 6379
timeout: 10000
jedis:
pool:
max-active: 100
max-idle: 10
max-wait: 100000
min-idle: 0
- 监听Kafka
@Slf4j
@Component
public class CanalKafkaClient {
@Autowired
StringRedisTemplate redisTemplate;
@KafkaListener(topics = "dbtopic")
public void receive(ConsumerRecord<?, ?> consumer) {
pross(consumer.value().toString());
}
private void pross(String json) {
log.info(json);
if (StringUtils.isEmpty(json)) {
return;
}
JSONObject jsonObject = JSONObject.parseObject(json);
String type = jsonObject.getString("type");
JSONArray datas = jsonObject.getJSONArray("data");
String database = jsonObject.getString("database");
String table = jsonObject.getString("table");
for (int i = 0; i < datas.size(); i++) {
String key = database + ":" + table + ":" + datas.getJSONObject(i).get("id");
String value = datas.getJSONObject(i).toString();
log.info(type);
switch (type) {
case "INSERT":
case "UPDATE":
update(key, value);
break;
case "DELETE":
delete(key);
break;
}
}
}
private void update(String key, String value) {
redisTemplate.opsForValue().set(key, value);
}
private void delete(String key) {
redisTemplate.delete(key);
}
}
最后
以上就是懵懂豌豆为你收集整理的Canal+Kafka实现mysql与Redis数据同步的全部内容,希望文章能够帮你解决Canal+Kafka实现mysql与Redis数据同步所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复