概述
文章目录
- 背景
- 实现
- slave
- 1、连接master
- 2、判断是否需要上报当前offset
- 3、上报
- 4、selector阻塞1s
- 5、处理master响应
- master
- 读
- 写
- 1、判断slaveRequestOffset状态
- 2、计算传输offset位置
- 3、判断上次传输是否成功
- 4、传输消息
背景
rocketmq的broker架构为n主n从,kafka的broker架构是1主n从
生产者客户端只写入master,由master完成主从同步;消费者客户端可以从master或slave中读取数据。
实现
- master启动,默认通过端口10912监听slave的连接
- slave主动连接master,master接收客户端的连接,并建立相关TCP连接
- slave主动向master发送待拉取offset,master解析请求并返回消息给slave
- slave保存消息并继续发送新的消息同步请求
slave
实现类:org.apache.rocketmq.store.ha.HAService.HAClient
1、连接master
从配置文件找有无显式指定haMasterAddress,如果没有后续会从ns根据brokerName获取master的地址,即master配置文件中的brokerIP2。
如果获取到master地址,则建立到master的TCP连接,然后注册OP_READ(网络读事件)、初始化
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
2、判断是否需要上报当前offset
根据interval=(当前时间-lastWriteTimestamp)判断是否需要向master上报当前offset,默认5s
3、上报
上报待拉取offset给master
- 对于slave来说,是对master发送下次待拉取offset的请求
- 对于master来说,既是slave请求待拉取offset,也是slave的ack
如果上报结果为失败,则关闭与master的连接
4、selector阻塞1s
this.selector.select(1000);
5、处理master响应
处理master返回的响应,更新lastWriteTimestamp
如此循环1~5
master
master在收到slave的连接请求后,会将主从服务器的连接SocketChannel封装成HAConnection对象,实现master与slave的读写操作。
读
实现类:org.apache.rocketmq.store.ha.HAConnection.ReadSocketService
处理slave传过来的ack,通知阻塞等待结果的生产者
写
1、判断slaveRequestOffset状态
如果slaveRequestOffset为-1,表示未更新过,不处理,sleep(10)
2、计算传输offset位置
如果nextTransferFromWhere为-1,表示初次进行数据传输,需要计算待传输的offset:
如果slaveRequestOffset为0,表示slave没有任何数据,此时返回的offset=(最大offset-(最大offset%1g)),得到的值为最后一个文件的起始offset。如果结果<0,则从0开始同步。
masterOffset = masterOffset-(masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());
如果slaveRequestOffset不为0,则直接用slaveRequestOffset
3、判断上次传输是否成功
根据标志位判断
- 如果成功,但interval>5s(默认),发送一个心跳包
- 如果成功,interval<=5s,do nothing
- 如果未成功,则先传输上一次的数据再判断一次标志位。如果还是未成功状态,则直接continue
4、传输消息
- 根据上面计算的nextTransferFromWhere获取该offset后面的所有消息
- haTransferBatchSize为一次传输请求的大小限制,默认32kb,通过设置ByteBuffer的limit来控制(可以看出传输到slave的消息会有不完整的消息)
- 更新nextTransferFromWhere、标志位等
this.nextTransferFromWhere += size;
循环1~4
最后
以上就是粗暴水蜜桃为你收集整理的rocketmq笔记9-主从同步背景实现的全部内容,希望文章能够帮你解决rocketmq笔记9-主从同步背景实现所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复