概述
单节点canal-server+canal-admin的介绍和搭建(对接mysql和rocketMQ)
- 一、简介
- 1、Canal
- 1、工作原理
- 2、MySQL主从复制实现
- 3、canal架构
- 4、binarylog
- 1、新增binlog
- 2、更新binglog
- 3、增加字段bin-log
- 4、删除字段bin-log
- 5、修改字段bin-log
- 二、使用
- 2.1 安装
- 1、本地安装
- 2、docker安装
- canal-admin
- canal-server
- 3、使用docker-compose搭建
- 4、配置需要监控的mysql
一、简介
通过数据源的事务日志抓取数据源变更,这能解决一致性问题(只要下游能保证变更应用到新库上)。它的问题在于各种数据源的变更抓取没有统一的协议,如,MySQL用binlog,MongoDB用oplog。
1、Canal
纯java开发,蛀牙用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,目前canal只能支持row模式的增量订阅(statement只有sql,没有数据,所以无法获取原始的变更日志)
1、工作原理
1、canal模拟MySQL salve的交互协议,伪装自己为MySQL salve,向MySQL master发送dump协议
2、MySQL master收到dump请求,开始推送binary log给salve(即canal)
3、canal解析binary log对象(原始为byte流)
2、MySQL主从复制实现
从上图可知,复制可分为三步:
- master将改变记录到二进制日志(binary log)中(这些记录叫二进制日志时间,binary log events,可以通过show binlog events进行查看);
- salve将master的binary log events拷贝到它的中继日志(relay log);
- salve重做中继日志的时间,将改变反映它自己的数据;
3、canal架构
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
通过deployer模块,启动一个canal-server,一个canal-server内部包含多个instance,每个instance都会伪装成一个Mysql实例的salve.client与server之间的通信协议有protocol模块定义。client订阅binlog信息时,需要传递一个destination参数,server会根据这个destination确定由那一个instance尾气提供服务。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
说明:
-
server代表一个canal运行实例,对应一个jvm
-
instance对应一个数据队列(1个server对应1…n个instance)
instance模块:
- eventParser(数据接入,模拟salve协议和master进行交互,协议解析)
- eventSink(Parser和Store链接器,进行数据过滤,加工和分发的工作)
- eventStore(数据存储,目前只存在内存里)
- metaManager(增量订阅&消费信息管理器)
canal假装成MySQL Salve步骤:
- 与MySQL Mater服务器建立Socket链接;
- 根据MySQL协议规范发送身份认证数据包进行身份认证;
- 根据MySQL协议规范发送salve注册数据包将自己伪装成MySQL Salve;
- 根据MySQL协议规范发送Dump请求,让Master给自己推送Binglog日志;
4、binarylog
一个binlog包含一个四字节的模数和一系列描述数据变更的Event,没一个Event又包含header和data两部分,大致结构如下:
基于Row的binlog主要包括以下几个Event:
TABLE_MAP_EVENT:描述变更的数据库表
WRITE_ROWS_EVENT:描述插入数据变更
UPDATE_ROWS_EVENT:描述修改数据变更
DELETE_ROWS_EVENT:描述删除数据变更
1、新增binlog
{"data":[{"BRAND_ID":"39103535-2d93-425b-a85e-c9709abf4c15","BRAND_NAME":"体验配方","BRAND_CODE":"5464365","BRAND_FLAG":"0","STATE":"0","CREATE_DATE":"2021-12-03","REMARK":"!","TS":"2021-12-03 12:04:13","CORP_ID":"50c43444-004c-44b9-a2d3-25c91a8b04c3"}],"database":"visdb_20210729","es":1638504356000,"id":43,"isDdl":false,"mysqlType":{"BRAND_ID":"varchar(36)","BRAND_NAME":"varchar(36)","BRAND_CODE":"varchar(36)","BRAND_FLAG":"int(2)","STATE":"int(2)","CREATE_DATE":"char(10)","REMARK":"varchar(200)","TS":"char(19)","CORP_ID":"varchar(36)"},"old":null,"pkNames":["BRAND_ID"],"sql":"","sqlType":{"BRAND_ID":12,"BRAND_NAME":12,"BRAND_CODE":12,"BRAND_FLAG":4,"STATE":4,"CREATE_DATE":1,"REMARK":12,"TS":1,"CORP_ID":12},"table":"bdp_brand","ts":1638504272407,"type":"INSERT"}
2、更新binglog
{"data":[{"BRAND_ID":"39103535-2d93-425b-a85e-c9709abf4c15","BRAND_NAME":"体验配方","BRAND_CODE":"5464365","BRAND_FLAG":"0","STATE":"0","CREATE_DATE":"2021-12-03","REMARK":"!!","TS":"2021-12-03 12:06:25","CORP_ID":"50c43444-004c-44b9-a2d3-25c91a8b04c3"}],"database":"visdb_20210729","es":1638504488000,"id":44,"isDdl":false,"mysqlType":{"BRAND_ID":"varchar(36)","BRAND_NAME":"varchar(36)","BRAND_CODE":"varchar(36)","BRAND_FLAG":"int(2)","STATE":"int(2)","CREATE_DATE":"char(10)","REMARK":"varchar(200)","TS":"char(19)","CORP_ID":"varchar(36)"},"old":[{"REMARK":"!","TS":"2021-12-03 12:04:13"}],"pkNames":["BRAND_ID"],"sql":"","sqlType":{"BRAND_ID":12,"BRAND_NAME":12,"BRAND_CODE":12,"BRAND_FLAG":4,"STATE":4,"CREATE_DATE":1,"REMARK":12,"TS":1,"CORP_ID":12},"table":"bdp_brand","ts":1638504404514,"type":"UPDATE"}
3、增加字段bin-log
{"data":null,"database":"visdb20210119","es":1638518233000,"id":15,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER TABLE `visdb20210119`.`bdp_produce` rnADD COLUMN `linshi` varchar(255) NULL AFTER `production_form`","sqlType":null,"table":"bdp_produce","ts":1638518234990,"type":"ALTER"}
4、删除字段bin-log
{"data":null,"database":"villagesale","es":1632384119000,"id":5041,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER TABLE `villagesale`.`bdp_brand` rnDROP COLUMN `earn`","sqlType":null,"table":"bdp_brand","ts":1632384062153,"type":"ALTER"}
5、修改字段bin-log
{"data":null,"database":"villagesale","es":1632623334000,"id":508,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"ALTER TABLE `villagesale`.`bdp_specification` rnMODIFY COLUMN `spec_alias_tonnage` decimal(10, 6) NULL COMMENT '辅计量单位换算率' AFTER `spec_alias`","sqlType":null,"table":"bdp_specification","ts":1632623275956,"type":"ALTER"}
二、使用
2.1 安装
1、本地安装
1、下载控制台安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
注:使用本地安装需要jdk1.8环境
2、解压
tar -zvxf canal.admin-1.1.5.tar.gz
tar -zvxf canal.deployer-1.1.5.tar.gz
3、修改配置文件
#canal-server配置文件
vim canal.properties #具体配置内容参考docker安装配置
#canal-admin配置文件
vim application.yml #具体配置内容参考docker安装配置
4、启动
cd bin
./startup
2、docker安装
1、拉取镜像
docker pull canal/canal-server
docker pull canal/canal-admin
2、创建映射目录
mkdir -p /u01/canal/canal-server/conf
mkdir -p /u01/canal/canal-server/conf/instance
mkdir -p /u01/canal/canal-server/logs
mkdir -p /u01/canal/canalAdmin/conf
mkdir -p /u01/canal/canalAdmin/logs
3、创建配置文件
canal-admin
在映射目录/u01/canal/canalAdmin/conf下
application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: XXX:3306
database: canal_manager
username: root
password: 960626
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
# canal-admin登录用户名和密码,注意,密码长度要大于等于6,否则登录会报错
canal:
adminUser: admin
adminPasswd: 960626
注:
- spring.datasource配置可以选择自己的mysql,也可以选择canan-admin自带的,也可以选择canan-admin自带的,选择自带的操作如下
- 进入自带的mysql,进入docker容器,然后进入mysql,用户名和密码都是canal
mysql -u canal -p- 执行conf文件下的canal.manager.sql文件
source canal_manager.sql- 当使用canal-admin自带mysql的时候每次重新创建canan-admin,其中的数据库信息可能会丢失,所以建议canal-admin的数据库信息做映射或者使用自己的mysql
canal-server
在映射目录/u01/canal/canal-server/conf下
canal.properties
#################################################
######### common argument #############
#################################################
# tcp bind ip,canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务,服务器本地ip,非公网和内网
canal.ip =
# register ip to zookeeper,canal server注册到外部zookeeper、admin的ip信息 (针对docker的外部可见ip),当外部网路访问该服务,可通过该ip
canal.register.ip = XXX
# canal server提供socket服务的端口
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd,canal数据端口订阅的ACL配置 (v1.1.4新增)如果为空,代表不开启
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
# canal链接canal-admin的地址 (v1.1.4新增)
canal.admin.manager = XXX:8089
# admin管理指令链接端口 (v1.1.4新增)
canal.admin.port = 11110
canal.admin.user = admin
#canal-admin登录密码加密之后的密码
canal.admin.passwd = 9a32691507290514f5c52a6f2bb0e2a5e2eaafec
# admin auto register,该服务是否自动注册到admin上
canal.admin.register.auto = true
# 不填则默认为单节点
canal.admin.register.cluster =
canal.admin.register.name =
#canal server链接zookeeper集群的链接信息
canal.zkServers =
# flush data to zk,canal持久化数据到zookeeper上的更新频率,单位毫秒
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ,对接该服务的订阅类型
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file,主要针对h2-tsdb.xml时对应h2文件的存放目录,默认为conf/xx/h2.mv.db
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n),canal内存store中可缓存buffer记录数,需要为2的指数
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb,内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE,canal内存store中数据缓存模式
#1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量
#2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config,是否开启心跳检查
canal.instance.detecting.enable = false
# canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
# 心跳检查频率,单位秒
canal.instance.detecting.interval.time = 3
# 心跳检查失败重试次数
canal.instance.detecting.retry.threshold = 3
# 心跳检查失败后,是否开启自动mysql自动切换,说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times,canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒
# 说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢
canal.instance.fallbackIntervalInSeconds = 60
# network config,网络链接参数,SocketOptions.SO_RCVBUF
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
# 是否使用druid处理所有的ddl解析来获取库和表名
canal.instance.filter.druid.ddl = true
# 是否忽略dcl语句
canal.instance.filter.query.dcl = false
# 是否忽略dml语句,(mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)
canal.instance.filter.query.dml = false
# 是否忽略ddl语句
canal.instance.filter.query.ddl = false
# 是否忽略binlog表结构获取失败的异常,(主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况)
canal.instance.filter.table.error = false
# 是否dml的数据变更事件(主要针对用户只订阅ddl/dcl的操作)
canal.instance.filter.rows = false
# 是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check,支持的binlog format格式列表(otter会有支持format格式限制)
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation,ddl语句是否单独一个batch返回
# (比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)
canal.instance.get.ddl.isolation = false
# parallel parser config,是否开启binlog并行解析模式
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2,binlog并行解析的异步ringbuffer队列(必须为2的指数)
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info,是否开启tablemeta的tsdb能力
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
# 当前server上部署的instance列表
canal.destinations = example
# conf root dir,conf/目录所在的路径
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance,开启instance自动扫描,如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发
canal.auto.scan = true
# instance自动扫描的间隔时间,单位秒
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.v1.0.25版本新增,全局的tsdb配置方式的组件文件
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
# 全局配置加载方式
canal.instance.global.mode = spring
# 全局lazy模式
canal.instance.global.lazy = false
# 全局的manager配置方式的链接信息
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml,全局的spring配置方式的组件文件
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq,如果消息队列为阿里云的,则须填写阿里云秘钥
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.如果你的队列是阿里云的则需要改成cloud
canal.mq.accessChannel = cloud
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
# canal默认的生产者组,即传到rocketmq形成的生产者
rocketmq.producer.group = GID_P_PRODUCE
# 是否开启消息轨迹,如果rocketMQ为阿里云的建议开启
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
# rocketMQ地址
rocketmq.namesrv.addr = http://XXX:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
注:canal.admin.passwd里的密码是通过加密之后的密码,如果想看自己设的加密之后是多少使用以下方法:
进入MySQL,执行
select password(“密码”);
*号之后的部分即为加密后的密码,可放进canal.admin.passwd
instance.properties部分配置
#数据库匹配配置,将会匹配下列配置的表
canal.instance.filter.regex=villagesale.bdp_costobject
#数据库与rocketMQ的topic的配置
canal.mq.dynamicTopic=examp2:.*;exmaple3:mytest\..*,mytest2\..*;example4:mytest3.user
dynamicTopic规则: 表达式如果只有库名则匹配库名的数据都会发送到对应名称topic, 如果是库名.表名则匹配的数据会发送到以’库名_表名’为名称的topic。如要指定topic名称,则可以配置
4、创建容器
canal-server
docker run --name canal-server -p 11110:11110 -p 11111:11111 -p 11112:11112 -d -v /u01/canal/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties -v /u01/canal/canal-server/conf/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /u01/canal/canal-server/logs:/home/admin/canal-server/logs canal/canal-server:latest
注:如果有canal-admin的搭配使用的话,先配置canal_local.properties的内容,然后再用canal_local.properties的内容覆盖canal.properties文件,因为如果canal.properties中的配置优先级高于数据库中的配置,如果通过canal.properties配置之后,在canal.admin中修改就不生效了
canal-admin
docker run -it --name canal-admin -p 8089:8089 -d -v /u01/canal/canalAdmin/conf/application.yml:/home/admin/canal-admin/conf/application.yml -v /u01/canal/canalAdmin/logs:/home/admin/canaladmin/logs canal/canal-admin:latest
3、使用docker-compose搭建
1、创建数据映射
同docker搭建
2、创建docker-compose.yml文件
version: '3.0'
services:
canal-admin:
image: canal/canal-admin
container_name: canal-admin
ports:
- 8089:8089
volumes:
- /opt/local/canal/canal-admin/conf/application.yml:/home/admin/canal-admin/conf/application.yml:rw
- /opt/local/canal/canal-admin/logs:/home/admin/canal-admin/logs
networks:
- canalnet
canal-server:
image: canal/canal-server
container_name: canal-server
ports:
- 11110:11110
- 11111:11111
- 11112:11112
volumes:
- /opt/local/canal/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties:rw
- /opt/local/canal/canal-server/conf/example/instance.properties:/home/admin/canal-server/conf/example/instance.properties:rw
- /opt/local/canal/canal-server/logs:/home/admin/canal-server/logs
networks:
- canalnet
networks:
canalnet:
3、启动
# 在docker-compose.yml文件所在目录执行下面指令启动
docker-compose up
4、配置需要监控的mysql
1、启动bin-log日志
#进入mysql的my.cnf配置文件中
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2、创建canal操作mysql的用户
-- 新建用户
CREATE USER canal IDENTIFIED BY 'canal';
-- 赋予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新权限
FLUSH PRIVILEGES;
3、配置instance.properties
进入canal-admin界面instance管理,点击新建instance
选择集群和服务,点击载入模板
案例配置如下
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info,监听的数据库配置
# 地址
canal.instance.master.address=192.168.0.57:3306
# 具体的log偏移量,可帮助你快速定位到你所需要监听的位置,
#监听的binlog文件
canal.instance.master.journal.name= mysql-bin.000099
# 监听的位置
canal.instance.master.position= 604054467
# 时间戳
canal.instance.master.timestamp= 1638497701000
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password,canal服务连接mysql所需要的用户,即mysql配置创建的用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex,监听的表,可多选,模糊匹配
canal.instance.filter.regex=visdb_20210729.bdp_produce
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config,binlog进入队列所属的主题
canal.mq.topic=vis-produce
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\..*
#################################################
点击保存之后即会自动监听mysql数据
注:如果instance.properties没有设置偏移量,那么canal自动找到偏移量需要一段时间,请耐心等待或者设置偏移量,即instance.properties开头的position info
最后
以上就是糟糕御姐为你收集整理的单节点canal的介绍和搭建(对接mysql和rocketMQ)一、简介二、使用的全部内容,希望文章能够帮你解决单节点canal的介绍和搭建(对接mysql和rocketMQ)一、简介二、使用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复