我是靠谱客的博主 动听母鸡,最近开发中收集的这篇文章主要介绍canal: mysql数据实时同步kafka1,环境准备:zk, kafka, mysql-master2,安装配置canal3,测试使用,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
文章目录
- 1,环境准备:zk, kafka, mysql-master
- 1.1,mysql启用binlog
- 1.2,启动zk, kafka
- 2,安装配置canal
- 3,测试使用
官网介绍: https://github.com/alibaba/canal/wiki/Introduction
1,环境准备:zk, kafka, mysql-master
角色 | IP |
---|---|
mysql-master | 192.168.56.1 (windows) |
zk, kafka, canal1,canal2 | 192.168.56.7 (centos7) |
1.1,mysql启用binlog
[root@c7 kafka_2.11-1.1.0-packs]# mysql -uroot -proot -h192.168.56.1
Warning: Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or g.
Your MySQL connection id is 50
Server version: 5.7.34-log MySQL Community Server (GPL)
Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or 'h' for help. Type 'c' to clear the current input statement.
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set, 1 warning (0.00 sec)
mysql> show variables like 'server_id';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| server_id | 1 |
+---------------+-------+
1 row in set, 1 warning (0.01 sec)
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000007 | 1817 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.01 sec)
1.2,启动zk, kafka
[root@c7 zookeeper-3.5.5]# cd conf/
[root@c7 conf]# cat zoo.cfg |grep -v ^#
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper/
clientPort=2181
admin.serverPort=8881
[root@c7 conf]# cd ..
[root@c7 zookeeper-3.5.5]# sh bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /root/zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... already running as process 3108.
[root@c7 kafka_2.11-1.1.0-packs]# cat config/server.properties |grep -v ^# |grep -v ^$
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/common/kafka_2.11-1.1.0/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@c7 kafka_2.11-1.1.0-packs]# sh bin/kafka-server-start.sh -daemon config/server.properties
[root@c7 kafka_2.11-1.1.0-packs]# tail logs/server.log
[2022-02-14 14:56:44,444] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2022-02-14 14:56:44,444] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2022-02-14 14:56:44,444] INFO Client environment:user.dir=/root/kafka_2.11-1.1.0-packs (org.apache.zookeeper.ZooKeeper)
[2022-02-14 14:56:44,446] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@6c64cb25 (org.apache.zookeeper.ZooKeeper)
[2022-02-14 14:56:44,883] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2022-02-14 14:56:44,883] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2022-02-14 14:56:44,935] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2022-02-14 14:56:44,943] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100002e9f38000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2022-02-14 14:56:44,945] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
[2022-02-14 14:56:46,492] INFO Cluster ID = Laod0bPIRiCmex0wK3QRjw (kafka.server.KafkaServer)
2,安装配置canal
canal2 只要修改canal.properties中对应的端口,即可启动
[root@c7 ~]# ll -h canal.deployer-1.1.5.tar.gz
-rw-r--r-- 1 root root 58M Feb 14 11:07 canal.deployer-1.1.5.tar.gz
[root@c7 ~]# mkdir canal
[root@c7 ~]# tar -xf canal.deployer-1.1.5.tar.gz -C canal
[root@c7 ~]# ll canal
total 4
drwxr-xr-x 2 root root 93 Feb 14 14:37 bin
drwxr-xr-x 5 root root 151 Feb 14 14:44 conf
drwxr-xr-x 2 root root 4096 Feb 14 11:08 lib
drwxrwxrwx 4 root root 34 Feb 14 11:18 logs
drwxrwxrwx 2 root root 177 Apr 19 2021 plugin
[root@c7 ~]# cd canal/conf
[root@c7 conf]# vim canal.properties
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =192.168.56.7
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers = 192.168.56.7:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
....
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
....
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 192.168.56.7:9092
....
[root@c7 conf]# vim example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=192.168.56.1:3306
canal.instance.master.journal.name=mysql-bin.000007
canal.instance.master.position=1817
....
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
....
# mq config (需提前创建好topic )
canal.mq.topic=canal_test
....
3,测试使用
问题:主备节点切换时,发现会向kafka发送部分重复数据。
[root@c7 ~]# sh canal/bin/startup.sh
found canal.pid , Please run stop.sh first ,then startup.sh
[root@c7 ~]# sh canal2/bin/startup.sh
found canal.pid , Please run stop.sh first ,then startup.sh
#查看 zk 上的记录
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, otter, controller_epoch, clickhouse, consumers, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 1] ls /otter
[canal]
[zk: localhost:2181(CONNECTED) 2] ls /otter/canal
[cluster, destinations]
[zk: localhost:2181(CONNECTED) 3] ls /otter/canal/cluster
[192.168.56.7:11111, 192.168.56.7:22222]
[zk: localhost:2181(CONNECTED) 4] ls /otter/canal/destinations
[example]
[zk: localhost:2181(CONNECTED) 5] ls /otter/canal/destinations/example
[running, cluster]
[zk: localhost:2181(CONNECTED) 6] ls /otter/canal/destinations/example/running
[]
[zk: localhost:2181(CONNECTED) 7] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.56.7:11111"}
cZxid = 0xa53
ctime = Mon Feb 14 14:19:08 CST 2022
mZxid = 0xa53
mtime = Mon Feb 14 14:19:08 CST 2022
pZxid = 0xa53
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100002e9f380008
dataLength = 46
numChildren = 0
[zk: localhost:2181(CONNECTED) 8]
[zk: localhost:2181(CONNECTED) 8] ls /otter/canal/destinations/example/cluster
[192.168.56.7:11111, 192.168.56.7:22222]
####创建数据库表,并插入数据
mysql> create database t1;
Query OK, 1 row affected (0.11 sec)
mysql> use t1;
Database changed
mysql> create table per(id int);
Query OK, 0 rows affected (0.25 sec)
mysql> insert into per values(6),(7);
Query OK, 2 rows affected (0.06 sec)
Records: 2 Duplicates: 0 Warnings: 0
mysql> insert into per values(8);
Query OK, 1 row affected (0.16 sec)
###########消费kafka topic
[root@c7 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canal_test
{"data":null,"database":"t1","es":1644819646000,"id":4,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database t1","sqlType":null,"table":"","ts":1644819647070,"type":"QUERY"}
{"data":null,"database":"t1","es":1644819657000,"id":5,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table per(id int)","sqlType":null,"table":"per","ts":1644819658225,"type":"CREATE"}
{"data":[{"id":"6"},{"id":"7"}],"database":"t1","es":1644819670000,"id":6,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"per","ts":1644819670986,"type":"INSERT"}
{"data":[{"id":"8"}],"database":"t1","es":1644820545000,"id":7,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"per","ts":1644820546063,"type":"INSERT"}
最后
以上就是动听母鸡为你收集整理的canal: mysql数据实时同步kafka1,环境准备:zk, kafka, mysql-master2,安装配置canal3,测试使用的全部内容,希望文章能够帮你解决canal: mysql数据实时同步kafka1,环境准备:zk, kafka, mysql-master2,安装配置canal3,测试使用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复