概述
原来用windows本地运行,在本地模拟客户端和服务端,鼓捣了一天都没运行成功。。。最后放弃了,改用
windows的客户端+ubuntu的服务端(客户端程序运行在window,canal监听运行在Ubuntu中)
用到:windows+ubuntu+eclipse+mysql+canal
1、客户端(windows)
1)用eclipse建一个java项目,建类
public class ClientTest { public static void main(String args[]) { // 创建链接 这个ip是你虚拟机的ip
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.168.12.43",11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
2)引入依赖(这里我用的1.0.24最新的是1.1.3)
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.24</version> </dependency>
简单吧!!!
2、服务端
主要工作在这里了!!!
1、确定你的虚拟机安装了java环境和mysql数据库
2、查看是否启用了日志:
mysql>show variables like 'log_bin';
3、开启binlog
如果log_bin关闭,需要在etc下面找到my.cnf,开启binlog:
server_id=1 log-bin=/var/lib/mysql/mysql-bin
4、添加canal mysql数据库账号
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
5、下载1.0.24的(https://github.com/alibaba/canal/releases/tag/canal-1.0.24)这个包 canal.deployer-1.0.24.tar.gz
6、修改 vi canal/conf/example/instance.properties
## mysql serverId canal.instance.mysql.slaveId = 1234 # position info canal.instance.master.address = 10.168.12.43:3306 canal.instance.master.journal.name =mysql-bin.000003 canal.instance.master.position = canal.instance.master.timestamp = …… canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName =testcanal canal.instance.connectionCharset = UTF-8 # table regex canal.instance.filter.regex = .*\..*
ip不要写错了,这个是虚拟机的ip,因为要监听的是运行与Ubuntu上的数据库(这里用的mysql)的操作。
testcanal是默认数据库,这个是你要监听的那个表所属的数据库
7、启动canal
sh bin/startup.sh
8、查看日志
vi logs/canal/canal.log vi logs/example/example.log
看看日志是否启动成功没报错
9、测试
1)确保startup.sh 文件运行了
2)在windows上运行上面的ClientTest 程序
3)在mysql的testcanal数据库中建立表,进行增删改查
4)会在eclipse上打印操作数据库的信息
参考:https://www.cnblogs.com/janes/p/9318576.html
转载于:https://www.cnblogs.com/51python/p/10855907.html
最后
以上就是生动保温杯为你收集整理的canal使用入坑,亲测 !!!!的全部内容,希望文章能够帮你解决canal使用入坑,亲测 !!!!所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复