概述
一、组装数据
上一篇 只是正常启动,但是线程是等待中,没有数据接入处理。现在开始模拟同步数据,并分析其中原理
AbstractEventParser类的start方法
2、mysql主从复制重点就在这里了,当前因为 startposition里部位空对象,
EntryPosition[included=false,journalName=mysql-bin.000023,position=123,serverId=1,gtid=<null>,timestamp=1528939448000]
3、
// 重新链接,因为在找position过程中可能有状态,需要断开后重建 erosaConnection.reconnect();
改变一下状态:connect为true、
4、dum数据
// 4. 开始dump数据 // 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据 if (isGTIDMode()) { erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler); } else { if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) { erosaConnection.dump(startPosition.getTimestamp(), sinkHandler); } else { erosaConnection.dump(startPosition.getJournalName(), startPosition.getPosition(), sinkHandler); } }
4.1 dump详细内容
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException { updateSettings(); sendRegisterSlave(); sendBinlogDump(binlogfilename, binlogPosition); DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize()); fetcher.start(connector.getChannel()); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); while (fetcher.fetch()) { LogEvent event = null; event = decoder.decode(fetcher, context); if (event == null) { throw new CanalParseException("parse failed"); } if (!func.sink(event)) { break; } if (event.getSemival() == 1) { sendSemiAck(context.getLogPosition().getFileName(), binlogPosition); } } }
4.1.1 设置mysql的通用参数,心跳间隔时间,内存参数
private void updateSettings() throws IOException { try { update("set wait_timeout=9999999"); } catch (Exception e) { logger.warn("update wait_timeout failed", e); } try { update("set net_write_timeout=1800"); } catch (Exception e) { logger.warn("update net_write_timeout failed", e); }
构造器初始化第一个参数
public QueryCommandPacket(){ setCommand((byte) 0x03); }
public byte[] toBytes() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); out.write(getCommand()); out.write(getQueryString().getBytes("UTF-8"));// 链接建立时默认指定编码为UTF-8 return out.toByteArray(); }
二、发送数据
三、接受数据
mysql协议规定了,通用接受数据的每个类型;参考链接:
okpacket: https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
erropacket https://dev.mysql.com/doc/internals/en/packet-ERR_Packet.html
根据mysql协议okpacket的要求,则判断出该命令执行是否成功
组装okpacket,参考上面协议规定字段
然后解析转化
public void fromBytes(byte[] data) throws IOException { int index = 0; // 1. read field count this.fieldCount = data[0]; index++; // 2. read affected rows this.affectedRows = ByteHelper.readBinaryCodedLengthBytes(data, index); index += this.affectedRows.length; // 3. read insert id this.insertId = ByteHelper.readBinaryCodedLengthBytes(data, index); index += this.insertId.length; // 4. read server status this.serverStatus = ByteHelper.readUnsignedShortLittleEndian(data, index); index += 2; // 5. read warning count this.warningCount = ByteHelper.readUnsignedShortLittleEndian(data, index); index += 2; // 6. read message. this.message = new String(ByteHelper.readFixedLengthBytes(data, index, data.length - index)); // end read }OKPacket [affectedRows=[0], fieldCount=0, insertId=[0], message=, serverStatus=2, warningCount=1]
4.1.2 向mysql发送slave指令
参考mysql event type。https://dev.mysql.com/doc/internals/en/event-meanings.html
private void sendRegisterSlave() throws IOException { RegisterSlaveCommandPacket cmd = new RegisterSlaveCommandPacket(); cmd.reportHost = authInfo.getAddress().getAddress().getHostAddress(); cmd.reportPasswd = authInfo.getPassword(); cmd.reportUser = authInfo.getUsername(); cmd.reportPort = authInfo.getAddress().getPort(); // 暂时先用master节点的port cmd.serverId = this.slaveId; byte[] cmdBody = cmd.toBytes(); logger.info("Register slave {}", cmd); HeaderPacket header = new HeaderPacket(); header.setPacketBodyLength(cmdBody.length); header.setPacketSequenceNumber((byte) 0x00); PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody); header = PacketManager.readHeader(connector.getChannel(), 4); byte[] body = PacketManager.readBytes(connector.getChannel(), header.getPacketBodyLength()); assert body != null; if (body[0] < 0) { if (body[0] == -1) { ErrorPacket err = new ErrorPacket(); err.fromBytes(body); throw new IOException("Error When doing Register slave:" + err.toString()); } else { throw new IOException("unpexpected packet with field_count=" + body[0]); } } }
byte[] cmdBody = cmd.toBytes();
public byte[] toBytes() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); out.write(getCommand()); ByteHelper.writeUnsignedIntLittleEndian(serverId, out); out.write((byte) reportHost.getBytes().length); ByteHelper.writeFixedLengthBytesFromStart(reportHost.getBytes(), reportHost.getBytes().length, out); out.write((byte) reportUser.getBytes().length); ByteHelper.writeFixedLengthBytesFromStart(reportUser.getBytes(), reportUser.getBytes().length, out); out.write((byte) reportPasswd.getBytes().length); ByteHelper.writeFixedLengthBytesFromStart(reportPasswd.getBytes(), reportPasswd.getBytes().length, out); ByteHelper.writeUnsignedShortLittleEndian(reportPort, out); ByteHelper.writeUnsignedIntLittleEndian(0, out);// Fake // rpl_recovery_rank ByteHelper.writeUnsignedIntLittleEndian(0, out);// master id return out.toByteArray(); }
把RegisterSlaveCommandPacket对象转为byte数组
注意 :平常在使用中,把一个对象转为字节,调用的是对象流,且对象必须实现序列化接口,也就是 先把对象转为流,然后转为byte数组。然后遍历。
但是这里使用的编解码是自定义的编码规则进行编码解码。
命令:用于标识当前请求消息的类型,例如切换数据库(0x02)、查询命令(0x03)等。命令值的取值范围及说明如下表(参考MySQL源代码/include/mysql_com.h
头文件中的定义):
类型值 | 命令 | 功能 | 关联函数 |
---|---|---|---|
0x00 | COM_SLEEP | (内部线程状态) | (无) |
0x01 | COM_QUIT | 关闭连接 | mysql_close |
0x02 | COM_INIT_DB | 切换数据库 | mysql_select_db |
0x03 | COM_QUERY | SQL查询请求 | mysql_real_query |
0x04 | COM_FIELD_LIST | 获取数据表字段信息 | mysql_list_fields |
0x05 | COM_CREATE_DB | 创建数据库 | mysql_create_db |
0x06 | COM_DROP_DB | 删除数据库 | mysql_drop_db |
0x07 | COM_REFRESH | 清除缓存 | mysql_refresh |
0x08 | COM_SHUTDOWN | 停止服务器 | mysql_shutdown |
0x09 | COM_STATISTICS | 获取服务器统计信息 | mysql_stat |
0x0A | COM_PROCESS_INFO | 获取当前连接的列表 | mysql_list_processes |
0x0B | COM_CONNECT | (内部线程状态) | (无) |
0x0C | COM_PROCESS_KILL | 中断某个连接 | mysql_kill |
0x0D | COM_DEBUG | 保存服务器调试信息 | mysql_dump_debug_info |
0x0E | COM_PING | 测试连通性 | mysql_ping |
0x0F | COM_TIME | (内部线程状态) | (无) |
0x10 | COM_DELAYED_INSERT | (内部线程状态) | (无) |
0x11 | COM_CHANGE_USER | 重新登陆(不断连接) | mysql_change_user |
0x12 | COM_BINLOG_DUMP | 获取二进制日志信息 | (无) |
0x13 | COM_TABLE_DUMP | 获取数据表结构信息 | (无) |
0x14 | COM_CONNECT_OUT | (内部线程状态) | (无) |
0x15 | COM_REGISTER_SLAVE | 从服务器向主服务器进行注册 | (无) |
0x16 | COM_STMT_PREPARE | 预处理SQL语句 | mysql_stmt_prepare |
0x17 | COM_STMT_EXECUTE | 执行预处理语句 | mysql_stmt_execute |
0x18 | COM_STMT_SEND_LONG_DATA | 发送BLOB类型的数据 | mysql_stmt_send_long_data |
0x19 | COM_STMT_CLOSE | 销毁预处理语句 | mysql_stmt_close |
0x1A | COM_STMT_RESET | 清除预处理语句参数缓存 | mysql_stmt_reset |
0x1B | COM_SET_OPTION | 设置语句选项 | mysql_set_server_option |
0x1C | COM_STMT_FETCH | 获取预处理语句的执行结果 | mysql_stmt_fetch |
参数:内容是用户在MySQL客户端输入的命令(不包括每行命令结尾的";"分号)。另外这个字段的字符串不是以NULL字符结尾,而是通过消息头中的长度值计算而来。
1)向mysql发送数据包:(日常的所有操作都是想mysql发送指令,然后ack,现在就以发送slave指令来解析)
每次向server发送报文,序号都从0开始,且header里包含第一个字段为消息体的长度
组装header
组装header(头部4字节,第一位消息体长度,第四位位序列号)
public byte[] toBytes() { byte[] data = new byte[4]; data[0] = (byte) (packetBodyLength & 0xFF); data[1] = (byte) (packetBodyLength >>> 8); data[2] = (byte) (packetBodyLength >>> 16); data[3] = getPacketSequenceNumber(); return data; }
组装为整个报文,
HeaderPacket header = new HeaderPacket(); header.setPacketBodyLength(cmdBody.length); header.setPacketSequenceNumber((byte) 0x00); PacketManager.writePkg(connector.getChannel(), header.toBytes(), cmdBody);
bio为如下
public void write(byte[]... buf) throws IOException { OutputStream output = this.output; if (output != null) { for (byte[] bs : buf) { output.write(bs); } } else { throw new SocketException("Socket already closed."); } }
nio为
public void write(byte[]... buf) throws IOException { if (channel != null && channel.isWritable()) { channel.writeAndFlush(Unpooled.copiedBuffer(buf)); } else { throw new IOException("write failed ! please checking !"); } }
下一篇讲如何接收msyql发过来的数据。
最后
以上就是潇洒爆米花为你收集整理的canal 源码解析(2)-数据流转篇(1)的全部内容,希望文章能够帮你解决canal 源码解析(2)-数据流转篇(1)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复