我是靠谱客的博主 如意水蜜桃,最近开发中收集的这篇文章主要介绍【Canal源码分析】数据传输协议一、EntryProtocal二、CanalProtocal,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Canal的数据传输有两块,一块是进行binlog订阅时,binlog转换为我们所定义的Message,第二块是client与server进行TCP交互时,传输的TCP协议。

一、EntryProtocal

这块是binlog的一个存储。主要的格式如下:

Entry
Header
version
[协议的版本号,default = 1]
logfileName
[binlog文件名]
logfileOffset
[binlog position]
serverId
[服务端serverId]
serverenCode
[变更数据的编码]
executeTime
[变更数据的执行时间]
sourceType
[变更数据的来源,default = MYSQL]
schemaName
[变更数据的schemaname]
tableName
[变更数据的tablename]
eventLength
[每个event的长度]
eventType
[insert/update/delete类型,default = UPDATE]
props
[预留扩展]
gtid
[当前事务的gitd]
entryType
[事务头BEGIN/事务尾END/数据ROWDATA/HEARTBEAT/GTIDLOG]
storeValue
[byte数据,可展开,对应的类型为RowChange]
RowChange
tableId
[tableId,由数据库产生]
eventType
[数据变更类型,default = UPDATE]
isDdl
[标识是否是ddl语句,比如create table/drop table]
sql
[ddl/query的sql语句]
rowDatas
[具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns
[字段信息,增量数据(修改前,删除前),Column类型的数组]
afterColumns
[字段信息,增量数据(修改后,新增后),Column类型的数组]
props
[预留扩展]
props
[预留扩展]
ddlSchemaName
[ddl/query的schemaName,会存在跨库ddl,需要保留执行ddl的当前schemaName]
Column
index
[字段下标]
sqlType
[jdbc type]
name
[字段名称(忽略大小写),在mysql中是没有的]
isKey
[是否为主键]
updated
[是否发生过变更]
isNull
[值是否为null]
props
[预留扩展]
value
[字段值,timestamp,Datetime是一个时间格式的文本]
length
[对应数据对象原始长度]
mysqlType
[字段mysql类型]

二、CanalProtocal

这块主要定义了client和server交互的协议。

Packet
magic_number
[default = 17]
version
[default = 1]
type
[PacketType,类型]
compression
[压缩,default = NONE]
body
[具体内容]

主要的类型和对应的body,都可以在CanalProtocal.proto里面查看到。

enum PacketType {
HANDSHAKE = 1;
CLIENTAUTHENTICATION = 2;
ACK = 3;
SUBSCRIPTION = 4;
UNSUBSCRIPTION = 5;
GET = 6;
MESSAGES = 7;
CLIENTACK = 8;
// management part
SHUTDOWN = 9;
// integration
DUMP = 10;
HEARTBEAT = 11;
CLIENTROLLBACK = 12;
}
//心跳
message HeartBeat {
optional int64 send_timestamp = 1;
optional int64 start_timestamp = 2;
}
//握手
message Handshake {
optional string communication_encoding = 1 [default = "utf8"];
optional bytes seeds = 2;
repeated Compression supported_compressions = 3;
}
// client authentication
message ClientAuth {
optional string username = 1;
optional bytes password = 2; // hashed password with seeds from Handshake message
optional int32 net_read_timeout = 3 [default = 0]; // in seconds
optional int32 net_write_timeout = 4 [default = 0]; // in seconds
optional string destination = 5;
optional string client_id = 6;
optional string filter = 7;
optional int64 start_timestamp = 8;
}
//服务端响应
message Ack {
optional int32 error_code = 1 [default = 0];
optional string error_message = 2; // if something like compression is not supported, erorr_message will tell about it.
}
//客户端提交
message ClientAck {
optional string destination = 1;
optional string client_id = 2;
optional int64 batch_id = 3;
}
// subscription
message Sub {
optional string destination = 1;
optional string client_id = 2;
optional string filter = 7;
}
// Unsubscription
message Unsub {
optional string destination = 1;
optional string client_id = 2;
optional string filter = 7;
}
//
PullRequest
message Get {
optional string destination = 1;
optional string client_id = 2;
optional int32 fetch_size = 3;
optional int64 timeout = 4 [default = -1]; // 默认-1时代表不控制
optional int32 unit = 5 [default = 2];// 数字类型,0:纳秒,1:毫秒,2:微秒,3:秒,4:分钟,5:小时,6:天
optional bool auto_ack = 6 [default = false]; // 是否自动ack
}
//消息
message Messages {
optional int64 batch_id = 1;
repeated bytes messages = 2;
}
// TBD when new packets are required
message Dump{
optional string journal = 1;
optional int64
position = 2;
optional int64 timestamp = 3 [default = 0];
}
// 客户端回滚
message ClientRollback{
optional string destination = 1;
optional string client_id = 2;
optional int64 batch_id = 3;
}

转载于:https://www.cnblogs.com/f-zhao/p/9110575.html

最后

以上就是如意水蜜桃为你收集整理的【Canal源码分析】数据传输协议一、EntryProtocal二、CanalProtocal的全部内容,希望文章能够帮你解决【Canal源码分析】数据传输协议一、EntryProtocal二、CanalProtocal所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部