我是靠谱客的博主 潇洒爆米花,最近开发中收集的这篇文章主要介绍canal 源码解析(2)-数据流转篇(1),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、组装数据


       上一篇 只是正常启动,但是线程是等待中,没有数据接入处理。现在开始模拟同步数据,并分析其中原理


AbstractEventParser类的start方法



2、mysql主从复制重点就在这里了,当前因为 startposition里部位空对象,

EntryPosition[included=false,journalName=mysql-bin.000023,position=123,serverId=1,gtid=<null>,timestamp=1528939448000]

3、

// 重新链接,因为在找position过程中可能有状态,需要断开后重建
erosaConnection.reconnect();

改变一下状态:connect为true、

4、dum数据

// 4. 开始dump数据
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
if (isGTIDMode()) {
erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
} else {
if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);

} else {
erosaConnection.dump(startPosition.getJournalName(),

startPosition.getPosition(),

sinkHandler);

}
}

 4.1 dump详细内容

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
updateSettings();

sendRegisterSlave();

sendBinlogDump(binlogfilename, binlogPosition);

DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());

fetcher.start(connector.getChannel());

LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);

LogContext context = new LogContext();

while (fetcher.fetch()) {
LogEvent event = null;

event = decoder.decode(fetcher, context);


if (event == null) {
throw new CanalParseException("parse failed");

}
if (!func.sink(event)) {
break;

}
if (event.getSemival() == 1) {
sendSemiAck(context.getLogPosition().getFileName(), binlogPosition);

}
}
}

4.1.1  设置mysql的通用参数,心跳间隔时间,内存参数

private void updateSettings() throws IOException {
try {
update("set wait_timeout=9999999");

} catch (Exception e) {
logger.warn("update wait_timeout failed", e);

}
try {
update("set net_write_timeout=1800");

} catch (Exception e) {
logger.warn("update net_write_timeout failed", e);

}

 构造器初始化第一个参数
public QueryCommandPacket(){
setCommand((byte) 0x03);
}
public byte[] toBytes() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();

out.write(getCommand());

out.write(getQueryString().getBytes("UTF-8"));// 链接建立时默认指定编码为UTF-8

return out.toByteArray();
}


二、发送数据


三、接受数据

mysql协议规定了,通用接受数据的每个类型;参考链接:

okpacket: https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html

erropacket https://dev.mysql.com/doc/internals/en/packet-ERR_Packet.html

根据mysql协议okpacket的要求,则判断出该命令执行是否成功

组装okpacket,参考上面协议规定字段

然后解析转化

public void fromBytes(byte[] data) throws IOException {
int index = 0;

// 1. read field count

this.fieldCount = data[0];

index++;

// 2. read affected rows

this.affectedRows = ByteHelper.readBinaryCodedLengthBytes(data, index);

index += this.affectedRows.length;

// 3. read insert id

this.insertId = ByteHelper.readBinaryCodedLengthBytes(data, index);

index += this.insertId.length;

// 4. read server status

this.serverStatus = ByteHelper.readUnsignedShortLittleEndian(data, index);

index += 2;

// 5. read warning count

this.warningCount = ByteHelper.readUnsignedShortLittleEndian(data, index);

index += 2;

// 6. read message.

this.message = new String(ByteHelper.readFixedLengthBytes(data, index, data.length - index));

// end read
}
OKPacket [affectedRows=[0], fieldCount=0, insertId=[0], message=, serverStatus=2, warningCount=1]



4.1.2 向mysql发送slave指令

参考mysql event type。https://dev.mysql.com/doc/internals/en/event-meanings.html

private void sendRegisterSlave() throws IOException {
RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket();

cmd.reportHost = authInfo.getAddress().getAddress().getHostAddress();

cmd.reportPasswd = authInfo.getPassword();

cmd.reportUser = authInfo.getUsername();

cmd.reportPort = authInfo.getAddress().getPort(); // 暂时先用master节点的port

cmd.serverId = this.slaveId;

byte[] cmdBody = cmd.toBytes();


logger.info("Register slave {}", cmd);


HeaderPacket header = new HeaderPacket();

header.setPacketBodyLength(cmdBody.length);

header.setPacketSequenceNumber((byte) 0x00);

PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);


header = PacketManager.readHeader(connector.getChannel(), 4);

byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength());

assert body != null;

if (body[0] < 0) {
if (body[0] == -1) {
ErrorPacket err = new ErrorPacket();

err.fromBytes(body);

throw new IOException("Error When doing Register slave:" + err.toString());

} else {
throw new IOException("unpexpected packet with field_count=" + body[0]);

}
}
}

byte[] cmdBody = cmd.toBytes();

public byte[] toBytes() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();

out.write(getCommand());

ByteHelper.writeUnsignedIntLittleEndian(serverId, out);

out.write((byte) reportHost.getBytes().length);

ByteHelper.writeFixedLengthBytesFromStart(reportHost.getBytes(), reportHost.getBytes().length, out);

out.write((byte) reportUser.getBytes().length);

ByteHelper.writeFixedLengthBytesFromStart(reportUser.getBytes(), reportUser.getBytes().length, out);

out.write((byte) reportPasswd.getBytes().length);

ByteHelper.writeFixedLengthBytesFromStart(reportPasswd.getBytes(), reportPasswd.getBytes().length, out);

ByteHelper.writeUnsignedShortLittleEndian(reportPort, out);

ByteHelper.writeUnsignedIntLittleEndian(0, out);// Fake

// rpl_recovery_rank

ByteHelper.writeUnsignedIntLittleEndian(0, out);// master id

return out.toByteArray();
}

RegisterSlaveCommandPacket对象转为byte数组

注意 :平常在使用中,把一个对象转为字节,调用的是对象流,且对象必须实现序列化接口,也就是 先把对象转为流,然后转为byte数组。然后遍历。

但是这里使用的编解码是自定义的编码规则进行编码解码。

mysql客户端命令请求报文

命令:用于标识当前请求消息的类型,例如切换数据库(0x02)、查询命令(0x03)等。命令值的取值范围及说明如下表(参考MySQL源代码/include/mysql_com.h头文件中的定义):

类型值命令功能关联函数
0x00COM_SLEEP(内部线程状态)(无)
0x01COM_QUIT关闭连接mysql_close
0x02COM_INIT_DB切换数据库mysql_select_db
0x03COM_QUERYSQL查询请求mysql_real_query
0x04COM_FIELD_LIST获取数据表字段信息mysql_list_fields
0x05COM_CREATE_DB创建数据库mysql_create_db
0x06COM_DROP_DB删除数据库mysql_drop_db
0x07COM_REFRESH清除缓存mysql_refresh
0x08COM_SHUTDOWN停止服务器mysql_shutdown
0x09COM_STATISTICS获取服务器统计信息mysql_stat
0x0ACOM_PROCESS_INFO获取当前连接的列表mysql_list_processes
0x0BCOM_CONNECT(内部线程状态)(无)
0x0CCOM_PROCESS_KILL中断某个连接mysql_kill
0x0DCOM_DEBUG保存服务器调试信息mysql_dump_debug_info
0x0ECOM_PING测试连通性mysql_ping
0x0FCOM_TIME(内部线程状态)(无)
0x10COM_DELAYED_INSERT(内部线程状态)(无)
0x11COM_CHANGE_USER重新登陆(不断连接)mysql_change_user
0x12COM_BINLOG_DUMP获取二进制日志信息(无)
0x13COM_TABLE_DUMP获取数据表结构信息(无)
0x14COM_CONNECT_OUT(内部线程状态)(无)
0x15COM_REGISTER_SLAVE从服务器向主服务器进行注册(无)
0x16COM_STMT_PREPARE预处理SQL语句mysql_stmt_prepare
0x17COM_STMT_EXECUTE执行预处理语句mysql_stmt_execute
0x18COM_STMT_SEND_LONG_DATA发送BLOB类型的数据mysql_stmt_send_long_data
0x19COM_STMT_CLOSE销毁预处理语句mysql_stmt_close
0x1ACOM_STMT_RESET清除预处理语句参数缓存mysql_stmt_reset
0x1BCOM_SET_OPTION设置语句选项mysql_set_server_option
0x1CCOM_STMT_FETCH获取预处理语句的执行结果mysql_stmt_fetch

参数:内容是用户在MySQL客户端输入的命令(不包括每行命令结尾的";"分号)。另外这个字段的字符串不是以NULL字符结尾,而是通过消息头中的长度值计算而来。


1)向mysql发送数据包:(日常的所有操作都是想mysql发送指令,然后ack,现在就以发送slave指令来解析)



每次向server发送报文,序号都从0开始,且header里包含第一个字段为消息体的长度

组装header



组装header(头部4字节,第一位消息体长度,第四位位序列号)

public byte[] toBytes() {
byte[] data = new byte[4];

data[0] = (byte) (packetBodyLength & 0xFF);

data[1] = (byte) (packetBodyLength >>> 8);

data[2] = (byte) (packetBodyLength >>> 16);

data[3] = getPacketSequenceNumber();

return data;
}


组装为整个报文,

HeaderPacket header = new HeaderPacket();
header.setPacketBodyLength(cmdBody.length);
header.setPacketSequenceNumber((byte) 0x00);
PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);

bio为如下
public void write(byte[]... buf) throws IOException {
OutputStream output = this.output;

if (output != null) {
for (byte[] bs : buf) {
output.write(bs);

}
} else {
throw new SocketException("Socket already closed.");

}
}


nio为

public void write(byte[]... buf) throws IOException {
if (channel != null && channel.isWritable()) {
channel.writeAndFlush(Unpooled.copiedBuffer(buf));

} else {
throw new IOException("write failed ! please checking !");

}
}

下一篇讲如何接收msyql发过来的数据。


最后

以上就是潇洒爆米花为你收集整理的canal 源码解析(2)-数据流转篇(1)的全部内容,希望文章能够帮你解决canal 源码解析(2)-数据流转篇(1)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(67)

评论列表共有 0 条评论

立即
投稿
返回
顶部