canal由来及相关介绍
需要了解canal框架介绍,安装及源码请参考wiki;基本安装过程及demo,官方git上都介绍的很清楚,强烈建议阅读git上相关介绍。
canal部署前Mysql相关设定
- 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
复制代码
1
2
3
4
5[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- : 创建canal账号
容器启动mysql,需要进入容器内部设置(docker exec -it mysql bash)
自建是直接执行命令
复制代码
1
2
3
4
5
6
7
8mysql -uroot -proot #创建账号(账号:canal;密码:canal) CREATE USER canal IDENTIFIED BY 'canal'; #授予权限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; #刷新并应用权限 FLUSH PRIVILEGES;
docker-compose部署canal
基于已有docker,docker-compose环境;
docker-compose.yml文件如下:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24version: '2' services: canal-server: image: canal/canal-server:v1.1.3 container_name: canal-server ports: - 11111:11111 environment: - canal.instance.mysql.slaveId=12 - canal.auto.scan=false - canal.destinations=test - canal.instance.master.address=192.168.1.9:3306 - canal.instance.dbUsername=canal - canal.instance.dbPassword=canal - canal.mq.topic=test - canal.instance.filter.regex=esen_approval.apt_approval volumes: - ./canal-server/conf/:/admin/canal-server/conf/ - ./canal-server/logs/:/admin/canal-server/logs/
- canal.instance.mysql.slaveId:slaveId不能与mysql的serverId一样
- canal.instance.master.address:mysql地址
- canal.instance.dbUsername:mysql账号
- canal.instance.dbPassword:mysql密码
- 更多配置项介绍
启动:docker-compose up -d
新建测试项目添加mvn依赖:
复制代码
1
2
3
4
5
6<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
测试类demo:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104/** * Copyright @ 2019 by com.esenyun, All rights reserved. * <p> * Project: demo * Package: esenyun.canal.demo * File: ClientSample.java * Date: 2019/9/3 11:49 */ package esenyun.canal.demo; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class ClientSample { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.29", 11111), "test", "canal", "canal"); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
启动测试类后,执行插入或更新sql,就可以在控制台看到变动日志了
最后
以上就是拉长小伙最近收集整理的关于基于docker-compose部署canal的全部内容,更多相关基于docker-compose部署canal内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复