我是靠谱客的博主 霸气手机,最近开发中收集的这篇文章主要介绍Canal初探 (mysql8),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1. binlog开启, 并设置为ROW格式

修改my.cnf, 重点关注两个参数

  • log_bin = ON
  • binlog_format = ROW
select version();
# mysql version (8.0)
show master logs;
# master binlog
show master status;
# master 状态
show global variables like '%log_bin%';
# binlog 是否开启
show global variables like 'log_bin_basename';
# binlog 地址
show global variables like 'max_binlog_size';
# binlog 最大单文件大小 (默认1G)
show global variables like 'binlog_format';
# binlog 格式(STATEMENT, ROW, MIXED), 默认ROW
show global variables like 'binlog_expire_logs_seconds' # binlog 过期时间 (默认30天)
show global variables like 'expire_logs_days'
# binlog 过期时间 (mysql 8.0 已废弃, 改用 binlog_expire_logs_seconds)

2. 创建canal用户, 并赋予复制权限

CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

3. 部署canal-server

安装:
cd /opt/
mkdir canal-server
tar -zxvf canal.deployer-1.1.5.tar.gz -C canal-server
配置(据实际情况调整):
conf/canal.properties
canal.port = 11111
conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.filter.regex=.*\..*
启停:
bin/startup.sh
bin/stop.sh
日志:
logs/canal/canal.log
logs/example/example.log
logs/example/meta.log

4. 编写canal-demo

  • pom.xml

    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.5</version>
    </dependency>
    <dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.protocol</artifactId>
    <version>1.1.5</version>
    </dependency>
    
  • application.yml

    canal:
    server:
    ip: 127.0.0.1
    port: 11111
    destination: example
    filter: .*..*
    username:
    password:
    
  • CanalProperties.java

    package demo.springboot.canal;
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    /**
    * Canal Server 配置
    *
    * @author liuxianqiang
    */
    @Component
    @Data
    @ConfigurationProperties(prefix = "canal.server")
    public class CanalProperties {
    private String ip;
    private Integer port;
    private String destination;
    private String filter;
    private String username;
    private String password;
    }
    
  • CanalData.java

    package demo.springboot.canal;
    import lombok.Data;
    /**
    * Canal操作信息
    *
    * @author liuxianqiang
    */
    @Data
    public class CanalData {
    private String dataId;
    private String dataTable;
    private String dataSql;
    private String dataSqlType;
    public CanalData() {
    super();
    }
    public CanalData(String dataId, String dataTable, String dataSql, String dataSqlType) {
    this.dataId = dataId;
    this.dataTable = dataTable;
    this.dataSql = dataSql;
    this.dataSqlType = dataSqlType;
    }
    }
    
  • CanalDataSqlType.java

    package demo.springboot.canal;
    /**
    * Canal SQL类型
    *
    * @author liuxianqiang
    */
    public class CanalDataSqlType {
    /**
    * 删除
    */
    public static final String DELETE = "DELETE";
    /**
    * 新增
    */
    public static final String INSERT = "INSERT";
    /**
    * 修改
    */
    public static final String UPDATE = "UPDATE";
    }
    
  • CanalRunner.java

    package demo.springboot.canal;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang.StringEscapeUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    import java.net.InetSocketAddress;
    import java.util.List;
    import java.util.Objects;
    import java.util.concurrent.TimeUnit;
    @Component
    @Slf4j
    public class CanalRunner implements CommandLineRunner {
    @Autowired
    private CanalProperties canalProperties;
    @Override
    public void run(String... args) throws Exception {
    log.info("canal start");
    // 创建连接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getIp(), canalProperties.getPort()),
    canalProperties.getDestination(), canalProperties.getUsername(), canalProperties.getPassword());
    while (true) {
    try {
    // 打开连接
    connector.connect();
    // 订阅数据库的全部表
    connector.subscribe(canalProperties.getFilter());
    // 回滚到未进行ack的地方, 下次fetch的时候, 以从最后一个没有ack的地方开始拿
    connector.rollback();
    while (true) {
    // 批量获取数据
    Message message = connector.getWithoutAck(1000);
    long batchId = message.getId();
    int size = message.getEntries().size();
    // 无数据
    if (batchId == -1 || size == 0) {
    log.info("canal sleep 3S");
    TimeUnit.SECONDS.sleep(3);
    }
    // 有数据
    else {
    log.info("canal run size:{}", size);
    processEntries(message.getEntries());
    }
    // 确认batchId, 确认之后, 小于等于此 batchId 的 Message 都会被确认.
    if (batchId != -1) {
    connector.ack(batchId);
    }
    }
    } catch (Exception e) {
    log.error("canal connect error", e);
    /*
    * 没有连接上canal时, 直接rollback会抛异常
    *
    * connector.rollback(), 放到打开连接时.
    */
    // connector.rollback();
    } finally {
    connector.disconnect();
    }
    // canal 重连
    log.info("canal sleep 10S to reconnect");
    try {
    TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
    log.error("sleep error", e);
    }
    }
    }
    private void processEntries(List<CanalEntry.Entry> entries) {
    for (CanalEntry.Entry entry : entries) {
    // 过滤非ROWDATA
    if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
    log.info("过滤非ROWDATA, {}", entry.getEntryType());
    continue;
    }
    // 解析RowChange
    CanalEntry.RowChange rowChange;
    try {
    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
    } catch (Exception e) {
    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
    }
    // 解析RowChange 的操作类型
    CanalEntry.EventType eventType = rowChange.getEventType();
    // 过滤DDL语句
    if (rowChange.getIsDdl()) {
    log.info("filter ddl sql : {}", rowChange.getSql());
    continue;
    }
    // 构建SQL (目前只关注 delete, insert, update)
    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
    CanalData canalData = null;
    if (eventType == CanalEntry.EventType.DELETE) {
    canalData = buildDeleteSql(entry.getHeader().getTableName(), rowData.getBeforeColumnsList());
    } else if (eventType == CanalEntry.EventType.INSERT) {
    canalData = buildInsertSql(entry.getHeader().getTableName(), rowData.getAfterColumnsList());
    } else if (eventType == CanalEntry.EventType.UPDATE) {
    canalData = buildUpdateSql(entry.getHeader().getTableName(), rowData.getAfterColumnsList());
    } else {
    continue;
    // ingore
    }
    log.info("canal data : {}", canalData);
    }
    }
    }
    /**
    * 构建删除语句
    *
    * @param tableName
    * @param columns
    */
    private CanalData buildDeleteSql(String tableName, List<CanalEntry.Column> columns) {
    for (int i = 0; i < columns.size(); i++) {
    CanalEntry.Column column = columns.get(i);
    if (column.getIsKey() && column.getName().equals("id")) {
    String sql = "delete from " + tableName + " where id = " + column.getValue();
    return new CanalData(column.getValue(), tableName, sql, CanalDataSqlType.DELETE);
    }
    }
    return null;
    }
    /**
    * 构建插入语句
    *
    * @param tableName
    * @param columns
    */
    private CanalData buildInsertSql(String tableName, List<CanalEntry.Column> columns) {
    StringBuffer sql = new StringBuffer("insert into " + tableName + " (");
    String idVal = null;
    for (int i = 0; i < columns.size(); i++) {
    if (columns.get(i).getIsKey() && columns.get(i).getName().equals("id")) {
    idVal = columns.get(i).getValue();
    }
    sql.append(columns.get(i).getName());
    if (i != columns.size() - 1) {
    sql.append(",");
    }
    }
    sql.append(") VALUES (");
    for (int i = 0; i < columns.size(); i++) {
    sql.append(getVal(columns.get(i)));
    if (i != columns.size() - 1) {
    sql.append(",");
    }
    }
    sql.append(")");
    if (Objects.isNull(idVal)) {
    return null;
    }
    return new CanalData(idVal, tableName, sql.toString(), CanalDataSqlType.INSERT);
    }
    /**
    * 构建更新语句
    *
    * @param tableName
    * @param columns
    */
    private CanalData buildUpdateSql(String tableName, List<CanalEntry.Column> columns) {
    StringBuilder sql = new StringBuilder("update " + tableName + " set ");
    String idVal = null;
    boolean isFirst = true;
    for (int i = 0; i < columns.size(); i++) {
    CanalEntry.Column column = columns.get(i);
    if (column.getIsKey() && column.getName().equals("id")) {
    idVal = column.getValue();
    } else {
    // 全量更新所有字段
    if (isFirst) {
    isFirst = false;
    } else {
    sql.append(", ");
    }
    sql.append(column.getName()
    + " = " + getVal(column));
    }
    }
    if (Objects.isNull(idVal)) {
    return null;
    }
    sql.append(" where id = " + idVal);
    return new CanalData(idVal, tableName, sql.toString(), CanalDataSqlType.UPDATE);
    }
    private String getVal(CanalEntry.Column column) {
    if (column.getIsNull()) {
    return "null";
    }
    String value = column.getValue();
    if (Objects.isNull(value)) {
    return "null";
    } else {
    return "'" + StringEscapeUtils.escapeSql(value) + "'";
    }
    }
    }
    

5. 参考

  • Github Canal
  • 超详细的Canal入门,看这篇就够了!
  • 使用Canal作为mysql的数据同步工具
  • Canal客户端覆盖服务端Subscribe | 只有TRANSACTIONBEGIN和TRANSACTIONEND日志

最后

以上就是霸气手机为你收集整理的Canal初探 (mysql8)的全部内容,希望文章能够帮你解决Canal初探 (mysql8)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部