概述
一、Canal介绍
Canal的原理就是它自己伪装成slave, 向mysql发送dump协议,MySQL master接收到dump请求之后推送binlog文件给slave, 也就是canal。
二、Canal安装
1. 下载Canal
wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz
2. 解压到/opt/softwares/canal目录, 解压完之后如下图所示:
3. 配置instance
4. 修改canal.properties
三、Mysql 安装
1、mysql 安装
yum install mysql
yum install mysql-server
2、启动mysql
/etc/init.d/mysqld start 或者sevice mysqld start
3、设置root用户密码
mysqladmin -u root password '123456'
4、登录msyql
mysql -uroot -p123456
5、检查并开启binlog复制功能及binlog模式是否为ROW模式
参考: binlog详解
四、Canal抽取binlog
Canal只是伪装成slave抽取binlog,Canal拿到binlog之后还需要交给业务方去做响应的处理,那么怎么去交给业务方呢?一般都是Canal获取到binlog之后写到kafka里,业务方订阅kafka topic消费binlog,完成业务逻辑处理。
但是Canal不能直接写Kafka, 所以还需要有个client连接Canal,Canal获取binlog之后交给Client, Client在往Kafka里写binlog消息,Client代码如下:
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class CanalClientExample { public static void main(String[] args) { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.41.254", 11111), "example", "canal", "canal"); try { int batchSize = 1000; connector.connect(); connector.subscribe("zhengxinv6\..*"); connector.rollback(); while (true) { // 获取指定数量的数据 Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } System.out.println("batchId = [" + batchId + "]"); printEntry(message.getEntries()); connector.ack(batchId); //提交确认 //connector.rollback(batchId); } } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException( "ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChange.getEventType(); System.out.println(String.format("================> binlog[%s:%s] ,name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset()+"", entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData: rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
五、Canal使用过程出现的问题及解决方法
参考:canal报错解决方法
参考:https://www.jianshu.com/p/6299048fad66
最后
以上就是甜甜小丸子为你收集整理的Canal的安装与使用一、Canal介绍二、Canal安装三、Mysql 安装四、Canal抽取binlog 五、Canal使用过程出现的问题及解决方法的全部内容,希望文章能够帮你解决Canal的安装与使用一、Canal介绍二、Canal安装三、Mysql 安装四、Canal抽取binlog 五、Canal使用过程出现的问题及解决方法所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复