概述
canal是阿里开源的基于数据库增量日志解析工具,它提供增量数据订阅和消费。目前最新稳定版本是 1.1.0
注意:1.1.1 1.1.2版本的客户端与1.1.0的稍有差别 (2019-02-11更新)
项目地址:https://github.com/alibaba/canal
服务端:https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.deployer-1.1.0.tar.gz
客户端示例:https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.example-1.1.0.tar.gz
卡夫卡:https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.kafka-1.1.0.tar.gz
源码:https://github.com/alibaba/canal/archive/canal-1.1.0.tar.gz
工作原理
canal模拟mysql slave的交互协议,伪装成一个slave,并向mysql master发送dump请求,mysql master收到dump请求后,开始推送binlog给slave(canal),canal解析binlog,并发送给客户端
canal支持statement、row、mixed格式的binlog解析,但statement的只有sql没有数据,不能获取原始变更日志,所以一般建议使用row格式
EventParser过程【解析】
EventSink过程【过滤】
数据过滤支持通配符、表名、字段内容等
EventStore过程【存储】
内存模式、本地文件模式、混合模式等
Put:数据放入位置;Get:数据获取位置;Ack:数据消费成功位置
Instance结构【管理层级】
可见,有两种管理方式,比较简单的方式是spring方式,通过配置文件控制
增量订阅/消费流程
get/ack是异步处理的,如可以连续调用get多次,再按顺序提交ack/rollback
HA机制
多个服务器上有多个instance,但同一时间只能有一个instance处于running状态;一个instance同一时间只能由一个canal client进行get/ack/rollback操作。要保证这两点,需要依赖zookeeper。
快速开始
mysql配置要点
[mysqld]
log-bin=mysql-binlog
binlog-format=ROW
server_id=1234
帐号授权
create user 'canal'@'%' identified by 'canal';
grant select,replication slave,replication client,show view on *.* to 'canal'@'%';
flush privileges;
注意:还需要 SHOW VIEW 权限(2019-02-11更新)
修改配置文件
canal.properties
Apache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#################################################
######### common argument#############
#################################################
canal.id=1
canal.ip=
canal.port=2222
canal.metrics.pull.port=
canal.zkServers=
# flush data to zk
canal.zookeeper.flush.period=1000
canal.withoutNetty=false
# flush meta cursor/parse position to file
canal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size=16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit=1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode=MEMSIZE
## detecing config
canal.instance.detecting.enable=false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql=select1
canal.instance.detecting.interval.time=3
canal.instance.detecting.retry.threshold=3
canal.instance.detecting.heartbeatHaEnable=false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size=1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds=60
# network config
canal.instance.network.receiveBufferSize=16384
canal.instance.network.sendBufferSize=16384
canal.instance.network.soTimeout=30
# binlog filter config
canal.instance.filter.druid.ddl=true
canal.instance.filter.query.dcl=false
canal.instance.filter.query.dml=false
canal.instance.filter.query.ddl=false
canal.instance.filter.table.error=false
canal.instance.filter.rows=false
canal.instance.filter.transaction.entry=false
# binlog format/image check
canal.instance.binlog.format=ROW,STATEMENT,MIXED
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation=false
# parallel parser config
canal.instance.parser.parallel=true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
canal.instance.parser.parallelThreadSize=16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize=256
# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
# rds oss binlog account
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
#################################################
######### destinations#############
#################################################
canal.destinations=example
# conf root dir
canal.conf.dir=../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan=true
canal.auto.scan.interval=5
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode=spring
canal.instance.global.lazy=false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml=classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
instance.properties
Apache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=12345
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=192.168.31.77:3306
canal.instance.master.journal.name=mysql-binlog.000011
canal.instance.master.position=15202619
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
# table regex
canal.instance.filter.regex=.*\..*
# table black regex
canal.instance.filter.black.regex=
#################################################
启动:startup.sh 停止:stop.sh
仅针对阿里云RDS的配置项
# 仅针对阿里云RDS [OSS存储中存放binlog]
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
Prometheus监控
效果图
编辑prometheus.yml
YAML
1
2
3
4
- job_name: 'canal'
static_configs:
- targets: ['localhost:11112']
#端口配置即为canal.properties中的canal.metrics.pull.port
grafana现在使用的是v4.6.3 推荐使用最新版本v5.2
导入模板:https://raw.githubusercontent.com/alibaba/canal/master/deployer/src/main/resources/metrics/Canal_instances_tmpl.json
部分参数:
canal_instance_parser_mode,instance解析模式,是否开启parallel解析
canal_instance_store_produce_mem,instance store接收到的所有events占用的内存问题,单位byte
canal_instance_store_consume_mem,instance store成功消费的所有events占用的内存总量,单位byte
canal_instance_traffic_delay,server与MySQL master的延时
canal_instance_put_rows,store put操作完成的行数
canal_instance_get_rows,client get请求返回的行数
canal_instance_ack_rows,client ack操作释放的行数
AdminGuide
源码编译命令:mvn clean install -Denv=release 会生成canal.deployer-$version.tar.gz包
canal两种管理方式:manager和spring
canal.properties配置文件
common argument,公用参数,多个instance可共享。(instance.properties优化级高于canal.properties)
参数名字
参数说明
默认值
canal.id
每个canal server实例的唯一标识,暂无实际意义
1
canal.ip
canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
无
canal.port
canal server提供socket服务的端口
11111
canal.zkServers
canal server链接zookeeper集群的链接信息
如:192.168.1.10:2181,192.168.1.11:2181
无
canal.zookeeper.flush.period
canal持久化数据到zookeeper上的更新频率,单位毫秒
1000
canal.instance.memory.batch.mode
canal内存store中数据缓存模式
1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量
2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小
MEMSIZE
canal.instance.memory.buffer.size
canal内存store中可缓存buffer记录数,需要为2的指数
16384
canal.instance.memory.buffer.memunit
内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小
1024
canal.instance.transaction.size
最大事务完整解析的长度支持
超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性
1024
canal.instance.fallbackIntervalInSeconds
canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒
说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢
60
canal.instance.detecting.enable
是否开启心跳检查
false
canal.instance.detecting.sql
心跳检查sql,如select 1
insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.interval.time
心跳检查频率,单位秒
3
canal.instance.detecting.retry.threshold
心跳检查失败重试次数
3
canal.instance.detecting.heartbeatHaEnable
心跳检查失败后,是否开启自动mysql自动切换
说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据
false
canal.instance.network.receiveBufferSize
网络链接参数,SocketOptions.SO_RCVBUF
16384
canal.instance.network.sendBufferSize
网络链接参数,SocketOptions.SO_SNDBUF
16384
canal.instance.network.soTimeout
网络链接参数,SocketOptions.SO_TIMEOUT
30
canal.instance.filter.druid.ddl
是否使用druid处理所有的ddl解析来获取库和表名
true
canal.instance.filter.query.dcl
是否忽略dcl语句
false
canal.instance.filter.query.dml
是否忽略dml语句
(mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中
false
canal.instance.filter.query.ddl
是否忽略ddl语句
false
canal.instance.filter.table.error
是否忽略binlog表结构获取失败的异常
(主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况)
false
canal.instance.filter.rows
是否dml的数据变更事件
(主要针对用户只订阅ddl/dcl的操作)
false
canal.instance.filter.transaction.entry
是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件
false
canal.instance.binlog.format
支持的binlog format格式列表
(otter会有支持format格式限制)
ROW,STATEMENT,MIXED
canal.instance.binlog.image
支持的binlog image格式列表
(otter会有支持format格式限制)
FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation
ddl语句是否单独一个batch返回
(比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)
false
canal.instance.parser.parallel
是否开启binlog并行解析模式
(串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+)
true
canal.instance.parser.parallelBufferSize
binlog并行解析的异步ringbuffer队列
(必须为2的指数)
256
canal.instance.tsdb.enable
是否开启tablemeta的tsdb能力
true
canal.instance.tsdb.dir
主要针对h2-tsdb.xml时对应h2文件的存放目录,默认为conf/xx/h2.mv.db
${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url
jdbc url的配置
(h2的地址为默认值,如果是mysql需要自行定义)
jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername
jdbc url的配置
(h2的地址为默认值,如果是mysql需要自行定义)
canal
canal.instance.tsdb.dbPassword
jdbc url的配置
(h2的地址为默认值,如果是mysql需要自行定义)
canal
canal.instance.rds.accesskey
aliyun账号的ak信息 (如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)
无
canal.instance.rds.secretkey
aliyun账号的sk信息
(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)
无
没有提到
canal.withoutNetty = false
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
destinations部分,instance列表定义,定义当前server有多少instance,每个instance的加载方式spring/manager等
参数名字
参数说明
默认值
canal.destinations
当前server上部署的instance列表,如example1,example2
无
canal.conf.dir
conf/目录所在的路径
../conf
canal.auto.scan
开启instance自动扫描
如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发:
a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动
b. instance目录删除:卸载对应instance配置,如已启动则进行关闭
c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作
true
canal.auto.scan.interval
instance自动扫描的间隔时间,单位秒
5
canal.instance.global.mode
全局配置加载方式
spring
canal.instance.global.lazy
全局lazy模式
false
canal.instance.global.manager.address
全局的manager配置方式的链接信息
无
canal.instance.global.spring.xml
全局的spring配置方式的组件文件
classpath:spring/memory-instance.xml
(spring目录相对于canal.conf.dir)
canal.instance.example.mode
canal.instance.example.lazy
canal.instance.example.spring.xml
.....
instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式
命名规则:canal.instance.{name}.xxx
无
canal.instance.tsdb.spring.xml
v1.0.25版本新增,全局的tsdb配置方式的组件文件
classpath:spring/tsdb/h2-tsdb.xml (spring目录相对于canal.conf.dir)
instance.properties配置文件
canal.properties中定义了canal.destinations后,需在canal.conf.dir对应目录下创建以instance名称为名称的子目录,每个目录中各有一份instance.properties
若未定义instance列表,但开启了canal.auto.scan,那么:
发现目录有新增,启动新的instance
发现目录有删除,关闭老的instance
发现对应目录instance.properties有变化,重启该instance
参数名字
参数说明
默认值
canal.instance.mysql.slaveId
mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一
(v1.1.x版本之后canal会自动生成,不需要手工指定)
无
canal.instance.master.address
mysql主库链接地址
127.0.0.1:3306
canal.instance.master.journal.name
mysql主库链接时起始的binlog文件
无
canal.instance.master.position
mysql主库链接时起始的binlog偏移量
无
canal.instance.master.timestamp
mysql主库链接时起始的binlog的时间戳
无
canal.instance.gtidon
是否启用mysql gtid的订阅模式
false
canal.instance.master.gtid
mysql主库链接时对应的gtid位点
无
canal.instance.dbUsername
mysql数据库帐号
canal
canal.instance.dbPassword
mysql数据库密码
canal
canal.instance.defaultDatabaseName
mysql链接时默认schema
canal.instance.connectionCharset
mysql 数据解析编码
UTF-8
canal.instance.filter.regex
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:
1. 所有表:.* or .*\..*
2. canal schema下所有表: canal\..*
3. canal下的以canal打头的表:canal\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
.*\..*
canal.instance.filter.black.regex
mysql 数据解析表的黑名单,表达式规则见白名单的规则
无
canal.instance.rds.instanceId
aliyun rds对应的实例id信息
(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)
无
不指定启动位点时,默认从当前数据库位点进行启动
目前canal仅支持一个数据库一种编码,若存在一个数据库多个编码,可通过filter.regex配置,拆分为多个instance,为每个instance指定不同的编码
默认支持的instance.xml有四种:
spring/memory-instance.xml
spring/default-instance.xml
spring/group-instance.xml
spring/file-instance.xml
模式:
解析位点,上一次解析binlog到了什么位置,对应组件 CanalLogPositionManager
消费位点,客户端ack后,记录客户端提交的最后位点 CanalMetaManager
实现方式:
memory,memory-instance.xml中使用
zookeeper
mixed
period,default-instance.xml中使用,集合zookeeper和memory模式,先写内存,定时刷新数据到zookeeper上
优缺点:
memory-instance.xml
parser,sink,store都使用内存
速度快,不依赖zookeeper; 常用于测试,不应应用于生产环境
default-instance.xml
store使用内存,parser/sink持久化到zookeeper
支持HA,适用生产环境,集群化部署
group-instance.xml
主要针对多库合并,可以将多个物理instance合并为一个逻辑instance
常用于分库业务
HA模式
使用spring/default-instance.xml,需要zookeeper支撑
两台机器上instance目录名字必须完全一致,HA是依赖instance名字进行管理的
两台机器上canal都启动时,只有一台能够连接到mysql上
查看zookeeper可以获取到当前工作的节点:get /otter/canal/destinations/example/running
客户端可以直接指定zookeeper地址和instance名称,client会自动连接到running节点,建立连接;
连接成功后,canal server会记录当前工作的canal client信息,如ip,port等;
数据消费成功后,canal server会在zookeeper中记录当前最后一次消费成功的binlog位点
双主模式
# position info
canal.instance.master.address = 10.20.144.25:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.standby.address = 10.20.144.29:3306
canal.instance.standby.journal.name =
canal.instance.standby.position =
canal.instance.standby.timestamp =
##detecing config
canal.instance.detecting.enable = true ## 需要开启心跳检查
canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() ##心跳检查sql,也可以选择类似select 1的query语句
canal.instance.detecting.interval.time = 3 ##心跳检查频率
canal.instance.detecting.retry.threshold = 3 ## 心跳检查失败次数阀值,超过该阀值后会触发mysql链接切换,比如切换到standby机器上继续消费binlog
canal.instance.detecting.heartbeatHaEnable = true ## 心跳检查超过失败次数阀值后,是否开启master/standby的切换.
Go 客户端
目前测试发现只对1.1.0版本适用,暂不支持1.1.1及1.1.2
项目地址:https://github.com/CanalClient/canal-go (2019-02-11 更新)
喜欢 (3)or分享 (0)
最后
以上就是感性战斗机为你收集整理的canal消耗内存_阿里数据库增量日志解析工具canal资料整理的全部内容,希望文章能够帮你解决canal消耗内存_阿里数据库增量日志解析工具canal资料整理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复