我是靠谱客的博主 时尚芹菜,最近开发中收集的这篇文章主要介绍Mysql 实时同步到大数据数仓,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

如何能够做到数据的实时同步呢?我们想到了MySQL主从复制时使用的binlog日志,它记录了所有的 DDL 和 DML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗时间下面来看一下MySQL主从复制的原理,主要有以下几个步骤:

  1. master(主库)在每次准备提交事务完成数据更新前,将改变记录到二进制日志(binary log)中

  2. slave(从库)发起连接,连接到master,请求获取指定位置的binlog文件

  3. master创建dump线程,推送binlog的slave

  4. slave启动一个I/O线程来读取主库上binary log中的事件,并记录到slave自己的中继日志(relay log)中

  5. slave还会起动一个SQL线程,该线程从relay log中读取事件并在备库执行,完成数据同步

  6. slave记录自己的binlog   

binlog记录了Mysql数据的实时变化,是数据同步的基础,服务需要做的就是遵守Mysql的协议,将自己伪装成Mysql的slave来监听业务从库,完成数据实时同步。 MySQL默认没有开启使用binlog,且mac安装默认没有my.cnf文件,因此需要自己在/etc目录下新建文件并添加相应的配置
一、可以查看本地MySQL的binlog开启情况
 
# 登录
mysql -uroot -p

# 查询binlog开启情况

show variables like 'log_bin’;

查询结果如下,是未开启的

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | OFF   |
+---------------+-------+

二、添加my.cnf配置文件,内容如下

[mysqld]
#log_bin
log-bin = mysql-bin #开启binlog,表示mysql的binlog是打开的,并且使用了默认路径
binlog-format = ROW #选择row模式
server_id = 1 #配置mysql replication需要定义,不能和canal的slaveId重复

然后通过系统偏好设置重启MySQL,再使用show variables like 'log_bin';查看开启情况,OK,已经开启

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.01 sec)

三、查看日志文件

mysql > show master logs;
+------------------+------------+
| Log_name         | File_size  |
+------------------+------------+
| mysql-bin.000001 |        177 |
| mysql-bin.000002 |        479 |
| mysql-bin.000003 |       2975 |

注:以上就是日志文件的名字等信息。

 
四、查看默认路径:
 
mysql > show variables like 'log_%';

| Variable_name                          | Value                                  |
+----------------------------------------+----------------------------------------+
| log_bin                                | ON                                     |
| log_bin_basename                       | /usr/local/mysql/data/mysql-bin        |
| log_bin_index                          | /usr/local/mysql/data/mysql-bin.index  

注:可以看出,我们的log文件是存储在/usr/local/mysql/data下的

 
五、查看日志文件
 
执行可进行查看日志:
 
mysqlbinlog --no-defaults mysql-bin.000001

注:如果不加--no-defaults,会提示错误:mysqlbinlog: [ERROR] unknown variable 'default-character-set=utf8',应该是字符集的问题。

 
 六、下载并部署Maxwell
wget https://github.com/zendesk/maxwell/releases/download/v1.14.1/maxwell-1.14.1.tar.gz  
   
tar -zxvf maxwell-1.14.1.tar.gz

七、配置mysql 用户并配置访问权限

mysql> create database maxwell;
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> flush privileges;

八、启动maxwell (kafka 服务)

 
bin/maxwell --user='maxwell' --password='hge123!@#' --host='huogenet' --producer=kafka --kafka.bootstrap.servers=devbox3:9092,devbox4:9092,devbox5:9092 --kafka_topic=test   --client_id=1 --output_ddl=true

如果kafka 版本与自己集群版本不一致,需要执行以下步骤:

kafka client 对应的jar copy 到 maxwell-1.14.1/lib/kafka-clients 目录下

cp kafka-clients-0.10.2-kafka-2.2.0.jar /home/bigdata/maxwell/maxwell-1.14.1/lib/kafka-clients/
 
使用以下命令启动
bin/maxwell --user='maxwell' --password='hge123!@#' --host='huogenet' --producer=kafka --kafka.bootstrap.servers=devbox3:9092,devbox4:9092,devbox5:9092 --kafka_topic=test --kafka_version=0.10.2-kafka-2.2.0 --client_id=1 --output_ddl=true

九、启动kafka消费

kafka-console-consumer --bootstrap-server devbox3:9092 --topic test

kafka 日志日志如下:

insert 操作
{
    "database": "test",
    "table": "users",
    "type": "insert",
    "ts": 1573813955,
    "xid": 519,
    "commit": true,
    "data": {
        "id": 4,
        "user_name": "小王2",
        "emails": "aa@email.com,bb@email.com,cc@email.com"
    }
}

delete 操作

{
    "database": "test",
    "table": "bb_user",
    "type": "delete",
    "ts": 1573814395,
    "xid": 725,
    "commit": true,
    "data": {
        "id": 1,
        "username": "4",
        "nickname": "5",
        "password": "6",
        "avatar": "7",
        "updatetime": "2019-10-10 18:44:06",
        "timestamp": "2019-10-10 18:44:09"
    }
}

update 操作

{
    "database": "test",
    "table": "bb_user",
    "type": "update",
    "ts": 1573814047,
    "xid": 604,
    "commit": true,
    "data": {
        "id": 1,
        "username": "4",
        "nickname": "5",
        "password": "6",
        "avatar": "7",
        "updatetime": "2019-10-10 18:44:06",
        "timestamp": "2019-10-10 18:44:09"
    },
    "old": {
        "username": "2",
        "nickname": "3",
        "password": "4",
        "avatar": "5"
    }
}

drop table 操作

 
{
    "type": "table-drop",
    "database": "test",
    "table": "bb_user",
    "ts": 1573814543000,
    "sql": "DROP TABLE `bb_user` /* generated by server */"
}

create table 操作

{
    "type": "table-create",
    "database": "test",
    "table": "ppp_copy1",
    "def": {
        "database": "test",
        "charset": "utf8",
        "table": "ppp_copy1",
        "primary-key": [],
        "columns": [
            {
                "type": "int",
                "name": "id",
                "signed": true
            },
            {
                "type": "varchar",
                "name": "pname",
                "charset": "utf8"
            },
            {
                "type": "varchar",
                "name": "subject",
                "charset": "utf8"
            },
            {
                "type": "int",
                "name": "points",
                "signed": true
            },
            {
                "type": "int",
                "name": "age",
                "signed": true
            },
            {
                "type": "int",
                "name": "sex",
                "signed": true
            }
        ]
    },
    "ts": 1573814589000,
    "sql": "CREATE TABLE `ppp_copy1` (n  `id` int(11) DEFAULT NULL,n  `pname` varchar(255) DEFAULT NULL,n  `subject` varchar(255) DEFAULT NULL,n  `points` int(255) DEFAULT NULL,n  `age` int(11) DEFAULT NULL,n  `sex` int(255) DEFAULT NULLn) ENGINE=InnoDB DEFAULT CHARSET=utf8"
}
十、如何保障数据正确性
 
 

>>>>顺序性

在进行消费的时候需要保证同一条mysql记录操作的顺序性,消息队列是无法保证全局消息有序的,只能保证partition内部有序。对于配置分库分表或者多库同步任务的BinlogSource,服务会根据库表信息进行hash,将数据写入相应的partiton,保证同一张表的数据在一个partition中,使得下游消费数据的顺序性;对于单表同步的作业目前使用一个partition保证其数据有序。

集成kafka的时候,发现一个问题,多分区的情况下,跨分区的数据消费是无序的。

这时候就会出现问题,如果消费端消费的更新日志在插入日志之前,就会因为数据缺失导致异常(这样的情况随着并发出现的概率会增大),所以,需要保证新增的日志和更新的日志是有序的被消费。

 

kafka发送数据是支持指定分区的,这时候,只要把同一个表的同一个主键的数据发到同一个分区即可(如果多数据库得加入数据库名)

private int partitionDefine(String keyToPartition) {
    if (keyToPartition == null) {
        return new Random().nextInt(numPartitions);
    } else {
        return Math.abs(keyToPartition.hashCode()) % numPartitions;
    }
}

传入的参数 tableName+主键

这样,消费到的数据就是有序的。不同的场景灵活运用即可.

 

>>>>一致性

如何保证在作业异常退出后,作业重新启动能够完整地将mysql中的数据同步到下游系统,主要依赖于以下三点

  1. 服务会记录作业同步的offset,重启后从上次commit的offset继续消费   

  2. Binlog数据的顺序性保证了即便数据被重复消费(未commit的数据),也能对同一条记录的操作以相同的顺序执行

  3. 下游存储系统kudu,Es ,Redis基于主键的操作能够保证binlog重复回放后数据的最终一致性

最后

以上就是时尚芹菜为你收集整理的Mysql 实时同步到大数据数仓的全部内容,希望文章能够帮你解决Mysql 实时同步到大数据数仓所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(73)

评论列表共有 0 条评论

立即
投稿
返回
顶部