概述
canal-admin单机模式部署安装
一、安装canal-admin
- 解压安装
mkdir /opt/soft/canal-admin115
tar -zxvf canal.admin-1.1.5-SNAPSHOT.tar.gz -C /opt/soft/canal-admin115/
- 修改配置文件
application.yml
vi conf/application.yml
,修改address为本地源数据库,并修改相关用户和密码
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 192.168.56.10:3306
database: canal_manager
username: root
password: ok
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
- 使用具有root权限的账户进行初始化,我这里直接使用的root用户
mysql -uroot -pok
source /opt/soft/canal-admin115/conf/canal_manager.sql
- 在bin目录下执行如下命令启动
./startup.sh
此时可以访问8089端口,访问可视化界面,账户admin,密码默认为123456
如果想要关闭,可在bin目录下使用如下命令
./stop.sh
二、 安装canal-server
- 解压安装
mkdir /opt/soft/canal115
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /opt/soft/canal115/
-
配置canal-server端
进入canal目录,编辑canal_local.properties文件
vi conf/canal_local.properties
修改注册机ip和对应admin端口,最后一行不指定即为单机模式
# register ip
canal.register.ip = 192.168.56.10
# canal admin config
canal.admin.manager = 192.168.56.10:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
- 进入bin目录,使用如下命令启动
./startup.sh local
刷新canal-admin的web页面,可以发现server已经添加进去,如下所示
注意使用canal-admin的启动顺序为为:canal-admin —> canal-server(startup.sh local)
三、使用测试
-
检测mysql开启binlog日志
对于自建的mysql需要开启binlog写入功能
配置 binlog-format 为 ROW 模式,my.cnf中配置如下
编辑文件my.cnf,
vi /etc/my.cnf
,在[mysqld]下添加如下内容
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 创建可以用于访问的非root账户
create user canal identified by 'canal';
grant all privileges on *.* to 'canal'@'%';
flush privileges;
- 按照如下步骤,操作----->配置,配置server管理
配置相关同步信息,网页中可以载入相关模板,但是我在实际使用中用该模板投递消息至kafka一直不成功,于是,进入canal-server端查看canal.properties文件,发现网页端的模板和canal-server端的模板不是很一致,但是替换为server端的模板却能成功投递消息至kafka,不知道是什么原因,具体模板如下,可以拷贝至web页面进行修改。
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.parallelThreadSize = 8
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
canal.mq.vhost=
canal.mq.exchange=
canal.mq.username=
canal.mq.password=
canal.mq.aliyunuid=
##################################################
######### Kafka Kerberos Info #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
修改内容如下
# 默认值tcp,这里改为投递到Kafka
canal.serverMode = kafka
# 所使用的的instance
canal.destinations = test01
# Kafka bootstrap.servers
canal.mq.servers = 192.168.56.10:9092
# Kafka batch.size,即producer一个微批次的大小,默认16K,这里加倍
canal.mq.batchSize = 32768
# Kafka max.request.size,即一个请求的最大大小,默认1M,这里也加倍
canal.mq.maxRequestSize = 2097152
# Kafka linger.ms,即sender线程在检查微批次是否就绪时的超时,默认0ms,这里改为200ms
# 满足batch.size和linger.ms其中之一,就会发送消息
canal.mq.lingerMs = 200
# 获取binlog数据的超时时间,默认10,改为200ms
canal.mq.canalGetTimeout = 200
修改后保存
- 配置instance文件
然后再instance中新建instance管理,命名为我们在server配置中添加的test01,然后载入模板
模板中需要填入监控库的日志名称和偏移量
可以在监控库中查询,如下所示
show binary logs;
修改如下内容
## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0
# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=199702
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# table regex .*\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=
定义topic主题,使用动态可以注释
#canal.mq.topic=
动态生成topic主题,注意必须与库名保持一致
canal.mq.dynamicTopic=mytest\..*
- 运行后,kafka中会自动创建topic,可以使用消费命令查看相关测试
最后
以上就是积极月饼为你收集整理的canal-admin单机模式部署安装canal-admin单机模式部署安装的全部内容,希望文章能够帮你解决canal-admin单机模式部署安装canal-admin单机模式部署安装所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复