概述
如何能够做到数据的实时同步呢?我们想到了MySQL主从复制时使用的binlog日志,它记录了所有的 DDL 和 DML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗时间下面来看一下MySQL主从复制的原理,主要有以下几个步骤:
-
master(主库)在每次准备提交事务完成数据更新前,将改变记录到二进制日志(binary log)中
-
slave(从库)发起连接,连接到master,请求获取指定位置的binlog文件
-
master创建dump线程,推送binlog的slave
-
slave启动一个I/O线程来读取主库上binary log中的事件,并记录到slave自己的中继日志(relay log)中
-
slave还会起动一个SQL线程,该线程从relay log中读取事件并在备库执行,完成数据同步
-
slave记录自己的binlog
mysql -uroot -p
# 查询binlog开启情况
show variables like 'log_bin’;
查询结果如下,是未开启的
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
二、添加my.cnf配置文件,内容如下
[mysqld]
#log_bin
log-bin = mysql-bin #开启binlog,表示mysql的binlog是打开的,并且使用了默认路径
binlog-format = ROW #选择row模式
server_id = 1 #配置mysql replication需要定义,不能和canal的slaveId重复
然后通过系统偏好设置重启MySQL,再使用show variables like 'log_bin';查看开启情况,OK,已经开启
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)
三、查看日志文件
mysql > show master logs;
+------------------+------------+
| Log_name | File_size |
+------------------+------------+
| mysql-bin.000001 | 177 |
| mysql-bin.000002 | 479 |
| mysql-bin.000003 | 2975 |
注:以上就是日志文件的名字等信息。
mysql > show variables like 'log_%';
| Variable_name | Value |
+----------------------------------------+----------------------------------------+
| log_bin | ON |
| log_bin_basename | /usr/local/mysql/data/mysql-bin |
| log_bin_index | /usr/local/mysql/data/mysql-bin.index
注:可以看出,我们的log文件是存储在/usr/local/mysql/data下的
mysqlbinlog --no-defaults mysql-bin.000001
注:如果不加--no-defaults,会提示错误:mysqlbinlog: [ERROR] unknown variable 'default-character-set=utf8',应该是字符集的问题。
wget https://github.com/zendesk/maxwell/releases/download/v1.14.1/maxwell-1.14.1.tar.gz
tar -zxvf maxwell-1.14.1.tar.gz
七、配置mysql 用户并配置访问权限
mysql> create database maxwell;
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> flush privileges;
八、启动maxwell (kafka 服务)
bin/maxwell --user='maxwell' --password='hge123!@#' --host='huogenet' --producer=kafka --kafka.bootstrap.servers=devbox3:9092,devbox4:9092,devbox5:9092 --kafka_topic=test --client_id=1 --output_ddl=true
如果kafka 版本与自己集群版本不一致,需要执行以下步骤:
kafka client 对应的jar copy 到 maxwell-1.14.1/lib/kafka-clients 目录下
cp kafka-clients-0.10.2-kafka-2.2.0.jar /home/bigdata/maxwell/maxwell-1.14.1/lib/kafka-clients/
bin/maxwell --user='maxwell' --password='hge123!@#' --host='huogenet' --producer=kafka --kafka.bootstrap.servers=devbox3:9092,devbox4:9092,devbox5:9092 --kafka_topic=test --kafka_version=0.10.2-kafka-2.2.0 --client_id=1 --output_ddl=true
九、启动kafka消费
kafka-console-consumer --bootstrap-server devbox3:9092 --topic test
kafka 日志日志如下:
{
"database": "test",
"table": "users",
"type": "insert",
"ts": 1573813955,
"xid": 519,
"commit": true,
"data": {
"id": 4,
"user_name": "小王2",
"emails": "aa@email.com,bb@email.com,cc@email.com"
}
}
delete 操作
{
"database": "test",
"table": "bb_user",
"type": "delete",
"ts": 1573814395,
"xid": 725,
"commit": true,
"data": {
"id": 1,
"username": "4",
"nickname": "5",
"password": "6",
"avatar": "7",
"updatetime": "2019-10-10 18:44:06",
"timestamp": "2019-10-10 18:44:09"
}
}
update 操作
{
"database": "test",
"table": "bb_user",
"type": "update",
"ts": 1573814047,
"xid": 604,
"commit": true,
"data": {
"id": 1,
"username": "4",
"nickname": "5",
"password": "6",
"avatar": "7",
"updatetime": "2019-10-10 18:44:06",
"timestamp": "2019-10-10 18:44:09"
},
"old": {
"username": "2",
"nickname": "3",
"password": "4",
"avatar": "5"
}
}
drop table 操作
{
"type": "table-drop",
"database": "test",
"table": "bb_user",
"ts": 1573814543000,
"sql": "DROP TABLE `bb_user` /* generated by server */"
}
create table 操作
{
"type": "table-create",
"database": "test",
"table": "ppp_copy1",
"def": {
"database": "test",
"charset": "utf8",
"table": "ppp_copy1",
"primary-key": [],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "varchar",
"name": "pname",
"charset": "utf8"
},
{
"type": "varchar",
"name": "subject",
"charset": "utf8"
},
{
"type": "int",
"name": "points",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "int",
"name": "sex",
"signed": true
}
]
},
"ts": 1573814589000,
"sql": "CREATE TABLE `ppp_copy1` (n `id` int(11) DEFAULT NULL,n `pname` varchar(255) DEFAULT NULL,n `subject` varchar(255) DEFAULT NULL,n `points` int(255) DEFAULT NULL,n `age` int(11) DEFAULT NULL,n `sex` int(255) DEFAULT NULLn) ENGINE=InnoDB DEFAULT CHARSET=utf8"
}
>>>>顺序性
在进行消费的时候需要保证同一条mysql记录操作的顺序性,消息队列是无法保证全局消息有序的,只能保证partition内部有序。对于配置分库分表或者多库同步任务的BinlogSource,服务会根据库表信息进行hash,将数据写入相应的partiton,保证同一张表的数据在一个partition中,使得下游消费数据的顺序性;对于单表同步的作业目前使用一个partition保证其数据有序。
集成kafka的时候,发现一个问题,多分区的情况下,跨分区的数据消费是无序的。
这时候就会出现问题,如果消费端消费的更新日志在插入日志之前,就会因为数据缺失导致异常(这样的情况随着并发出现的概率会增大),所以,需要保证新增的日志和更新的日志是有序的被消费。
kafka发送数据是支持指定分区的,这时候,只要把同一个表的同一个主键的数据发到同一个分区即可(如果多数据库得加入数据库名)
private int partitionDefine(String keyToPartition) {
if (keyToPartition == null) {
return new Random().nextInt(numPartitions);
} else {
return Math.abs(keyToPartition.hashCode()) % numPartitions;
}
}
传入的参数 tableName+主键
这样,消费到的数据就是有序的。不同的场景灵活运用即可.
>>>>一致性
如何保证在作业异常退出后,作业重新启动能够完整地将mysql中的数据同步到下游系统,主要依赖于以下三点
-
服务会记录作业同步的offset,重启后从上次commit的offset继续消费
-
Binlog数据的顺序性保证了即便数据被重复消费(未commit的数据),也能对同一条记录的操作以相同的顺序执行
-
下游存储系统kudu,Es ,Redis基于主键的操作能够保证binlog重复回放后数据的最终一致性
最后
以上就是时尚芹菜为你收集整理的Mysql 实时同步到大数据数仓的全部内容,希望文章能够帮你解决Mysql 实时同步到大数据数仓所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复