我是靠谱客的博主 粗暴水蜜桃,最近开发中收集的这篇文章主要介绍rocketmq笔记9-主从同步背景实现,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 背景
  • 实现
    • slave
      • 1、连接master
      • 2、判断是否需要上报当前offset
      • 3、上报
      • 4、selector阻塞1s
      • 5、处理master响应
    • master
        • 1、判断slaveRequestOffset状态
        • 2、计算传输offset位置
        • 3、判断上次传输是否成功
        • 4、传输消息

背景

rocketmq的broker架构为n主n从,kafka的broker架构是1主n从

生产者客户端只写入master,由master完成主从同步;消费者客户端可以从master或slave中读取数据。

实现

  1. master启动,默认通过端口10912监听slave的连接
  2. slave主动连接master,master接收客户端的连接,并建立相关TCP连接
  3. slave主动向master发送待拉取offset,master解析请求并返回消息给slave
  4. slave保存消息并继续发送新的消息同步请求

slave

实现类:org.apache.rocketmq.store.ha.HAService.HAClient

1、连接master

从配置文件找有无显式指定haMasterAddress,如果没有后续会从ns根据brokerName获取master的地址,即master配置文件中的brokerIP2。

如果获取到master地址,则建立到master的TCP连接,然后注册OP_READ(网络读事件)、初始化

this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();

2、判断是否需要上报当前offset

根据interval=(当前时间-lastWriteTimestamp)判断是否需要向master上报当前offset,默认5s

3、上报

上报待拉取offset给master

  • 对于slave来说,是对master发送下次待拉取offset的请求
  • 对于master来说,既是slave请求待拉取offset,也是slave的ack

如果上报结果为失败,则关闭与master的连接

4、selector阻塞1s

this.selector.select(1000);

5、处理master响应

处理master返回的响应,更新lastWriteTimestamp

如此循环1~5

master

master在收到slave的连接请求后,会将主从服务器的连接SocketChannel封装成HAConnection对象,实现master与slave的读写操作。

实现类:org.apache.rocketmq.store.ha.HAConnection.ReadSocketService

处理slave传过来的ack,通知阻塞等待结果的生产者

1、判断slaveRequestOffset状态

如果slaveRequestOffset为-1,表示未更新过,不处理,sleep(10)

2、计算传输offset位置

如果nextTransferFromWhere为-1,表示初次进行数据传输,需要计算待传输的offset:

如果slaveRequestOffset为0,表示slave没有任何数据,此时返回的offset=(最大offset-(最大offset%1g)),得到的值为最后一个文件的起始offset。如果结果<0,则从0开始同步。

masterOffset = masterOffset-(masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());

如果slaveRequestOffset不为0,则直接用slaveRequestOffset

3、判断上次传输是否成功

根据标志位判断

  • 如果成功,但interval>5s(默认),发送一个心跳包
  • 如果成功,interval<=5s,do nothing
  • 如果未成功,则先传输上一次的数据再判断一次标志位。如果还是未成功状态,则直接continue

4、传输消息

  1. 根据上面计算的nextTransferFromWhere获取该offset后面的所有消息
  2. haTransferBatchSize为一次传输请求的大小限制,默认32kb,通过设置ByteBuffer的limit来控制(可以看出传输到slave的消息会有不完整的消息)
  3. 更新nextTransferFromWhere、标志位等
this.nextTransferFromWhere += size;

循环1~4

最后

以上就是粗暴水蜜桃为你收集整理的rocketmq笔记9-主从同步背景实现的全部内容,希望文章能够帮你解决rocketmq笔记9-主从同步背景实现所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(74)

评论列表共有 0 条评论

立即
投稿
返回
顶部