我是靠谱客的博主 懵懂豌豆,最近开发中收集的这篇文章主要介绍Canal+Kafka实现mysql与Redis数据同步,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Canal+Kafka实现mysql与Redis数据同步

一、Canal简介

canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括
数据库镜像
数据库实时备份
索引构建和实时维护(拆分异构索引、倒排索引等)
业务 cache 刷新
带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

github:https://github.com/alibaba/canal

二、Canal实现数据同步流程

在这里插入图片描述

三、效果演示

数据库db1有张user表。
在这里插入图片描述
在数据库中添加数据。
在这里插入图片描述

查看redis中的数据。
在这里插入图片描述

更改mysql中的数据。
在这里插入图片描述

再查看redis中的数据。
在这里插入图片描述

删除mysql中的数据。
在这里插入图片描述

再查看redis中的数据。

在这里插入图片描述

三、开始搭建

需在服务器端搭建好mysql,redis,zookeeper,kafka。
可参考下面我的博客:

mysql:https://blog.csdn.net/qq_43692950/article/details/107731431

redis:https://blog.csdn.net/qq_43692950/article/details/107443155

zookeeper&kafka:https://blog.csdn.net/qq_43692950/article/details/110648852

也可以使用docker快速安装并配置上述软件,可参考:

https://blog.csdn.net/qq_43692950/article/details/111084773

四、开启Mysql binlog文件

修改 my.cnf

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=100 #不能和canal的slaveId重复

重启mysql

service mysqld restart

验证是否已经配置成功

show variables like 'log_bin';

开启之后是ON,否则为OFF。

为Canal创建一个账号。

drop user 'canal'@'%'; 
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
grant all privileges on *.* to 'canal'@'%' identified by 'canal'; 
flush privileges;

五、配置CanalServer

  1. 下载canalserver

https://download.csdn.net/download/qq_43692950/13675809

  1. 上传至服务器,并解压
  2. 进入canal根目录,修改 conf/example/instance.properties

修改数据库地址

canal.instance.master.address= 192.168.40.129:3306

修改用户名密码:

canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

指定kafka topic

canal.mq.topic=dbtopic
  1. 修改 conf/canal.properties

指定类型为kafka

canal.serverMode = kafka

指定mq的地址

canal.mq.servers = 192.168.40.129:9092
  1. 启动canalserver
    进入bin
    启动:
./startup.sh

查看/logs/example/example.log 日志文件,出现下面效果便启动成功:
在这里插入图片描述

六、编写Kafka客户端并自定义添加redis

  1. 新建SpringBoot项目,添加maven依赖:
<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
 </dependency>

 <dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <version>1.18.12</version>
 </dependency>

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
     <exclusions>
         <exclusion>
             <groupId>io.lettuce</groupId>
             <artifactId>lettuce-core</artifactId>
         </exclusion>
     </exclusions>
 </dependency>
 
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>
  1. application.yml
server:
  port: 80
  
spring:
  kafka:
    bootstrap-servers: 192.168.40.129:9092
    consumer:
      group-id: kafkaGroup
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      
  redis:
    host: 192.168.40.129
    port: 6379
    timeout: 10000
    jedis:
      pool:
        max-active: 100
        max-idle: 10
        max-wait: 100000
        min-idle: 0
  1. 监听Kafka
@Slf4j
@Component
public class CanalKafkaClient {
    @Autowired
    StringRedisTemplate redisTemplate;

    @KafkaListener(topics = "dbtopic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        pross(consumer.value().toString());
    }

    private void pross(String json) {
        log.info(json);
        if (StringUtils.isEmpty(json)) {
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(json);
        String type = jsonObject.getString("type");
        JSONArray datas = jsonObject.getJSONArray("data");
        String database = jsonObject.getString("database");
        String table = jsonObject.getString("table");
        for (int i = 0; i < datas.size(); i++) {
            String key = database + ":" + table + ":" + datas.getJSONObject(i).get("id");
            String value = datas.getJSONObject(i).toString();
            log.info(type);
            switch (type) {
                case "INSERT":
                case "UPDATE":
                    update(key, value);
                    break;
                case "DELETE":
                    delete(key);
                    break;
            }
        }
    }

    private void update(String key, String value) {
        redisTemplate.opsForValue().set(key, value);
    }

    private void delete(String key) {
        redisTemplate.delete(key);
    }
}

最后

以上就是懵懂豌豆为你收集整理的Canal+Kafka实现mysql与Redis数据同步的全部内容,希望文章能够帮你解决Canal+Kafka实现mysql与Redis数据同步所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部