概述
我主要依照以下问题点及顺序来依次讲解:
- Slave角色配置以及基于配置下如何实现不同于Master的运行机制
- Master的数据传输以及Slave的进度上报
首先看第一点,Slave角色的关键配置:brokerId和brokerRole,brokerId指定此Slave在Broker中的序号,0表示Master,1及之后的表示Slave,Broker中Slave可以有多个,当然一般一个就够了,所以brokerId的值一般为1。brokerRole表示其在Broker中的角色定位,有3个值可选:
public enum BrokerRole {
ASYNC_MASTER,
SYNC_MASTER,
SLAVE;
}
ASYNC_MASTER:异步Master,也就是新的消息存储时不需要等Slave同步好;
SYNC_MASTER:同步Master,新消息存现时需要等Slave同步好,也就是返回的 Ack Offset >= 当前消息的CommitLog Offset
Slave角色其brokerRole=SLAVE。
brokerId和brokerRole是关键配置,其他的配置和Master一样,brokerName,brokerClusterName,namesrvAddr等相应配置都不能少。
当Slave启动时:
public class BrokerController {
public boolean initialize() throws CloneNotSupportedException {
......
//当当前角色是Slave时
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
// 如果当前Broker配置中指定了haMasterAddress,则赋值 HAClient 的 masterAddress
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
// 将haMasterAddress的值设置到 HAService 的 HAClient 的masterAddress中
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true; //如果配置中未指定Master的IP,则定期从Namesrv处更新获取
}
//Slave每隔60S从Master处同步TopicConfig,ConsumerOffset,DelayOffset,SubscriptionGroupConfig
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
......
}
}
如果在配置文件中指定了haMasterAddress,那么这个就会作为Master的IP+端口,Slave会向此地址发送请求,建立长连接。
如果未指定,那么看如下:
public class BrokerController {
/**
* 向Namesrv注册Broker信息,每隔30S执行一次
*
* @param checkOrderConfig
* @param oneway
*/
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
......
if (registerBrokerResult != null) {
//如果Slave在配置时没有指定Master的IP
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
// Slave在注册Broker时,Namesrv会将Master的BrokerAddr返回来,更新HAClient上的 masterAddress
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
......
}
}
}
Namesrv:
public class RouteInfoManager {
/**
* broker向Namesrv 注册自身信息
*
* 当 broker 为主节点时:不返回 haServerAddr、masterAddr
* 当 broker 为从节点时:返回 haServerAddr、masterAddr
*/
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
......
// 当注册Slave时,返回Master的addr作为高可用的地址
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
// 获取master的brokerLiveInfo,将其haServerAddr(addr)赋值给Slave的haServerAddr
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
}
}
若Slave未显式指定haMasterAddress,则在向Namesrv注册Broker信息时,Namesrv会以brokerName相同为依据将此Broker下的Master Adress返回给Slave。
通过以上两种手段指定MasterAdress后,将其值赋给HAService里的HAClient的 masterAddress属性。Slave在start时,调用HAClient的start( ),当发现masterAddress有值时,就明白当前是Slave角色,就会启动Slave的运行流程。
Broker启动时,会相应启动HAService,用于主从复制,同时相应启动其内部的HAClient,下面看HAClient的启动过程:
class HAClient extends ServiceThread {
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//当存在masterAdress != null && 连接Master成功
if (this.connectMaster()) {
// 若距离上次上报时间超过5S,上报到Master进度
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
//最多阻塞1S,直到Master有数据同步于过来。若1S满了还是没有接受到数据,中断阻塞,
// 执行processReadEvent(),但结果读入byteBufferRead的大小为0,然后循环到这步
this.selector.select(1000);
// 处理读取事件
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 若进度有变化,上报到Master进度
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// Master超过20S未返回数据,关闭连接
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
}
先看connectMaster( ) 方法,此方法就是向Master发送请求,通了之后建立SocketChannel:
/**
* 连接Master节点
*
* @return 是否连接成功
* @throws ClosedChannelException 当注册读事件失败时
*/
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
//当自身角色是Slave时,会将配置中的MessageStoreConfig中的haMasterAdress赋值给masterAddress,或者在registerBroker时的返回值赋值
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress); // 连接Master节点。如果连接失败,直接关闭,不抛出异常。
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
//更新最近上报Offset,也就是当前CommitLog的maxOffset
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
//更新最近写入时间
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
Slave向Master发送请求,那就需要去Master看看处理这个请求的方式,Broker在启动时会启动HAService内的AcceptSocketService,顾名思义,这个对象就是用来接收Socket请求的服务对象,看其对Slave的请求处理形式:
/**
* Listens to slave connections to create {@link HAConnection}.
*/
class AcceptSocketService extends ServiceThread {
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel)k.channel()).accept();
//接收到Slave的请求,生成一个HAConnection,用户向Channle写数据以及读取Slave传过来的数据
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
}
AcceptSocketService在启动时会监听SelectionKey.OP_ACCEPT事件,当接收到请求时,创建一个SocketChannel长连接对象,然后用HAConnection去封装SocketChannel,之后启动HAConnection,可以先看看HAConnection的start( ) 方法做了什么:
public class HAConnection {
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
this.socketChannel = socketChannel;
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
this.socketChannel.configureBlocking(false);
this.socketChannel.socket().setSoLinger(false, -1);
this.socketChannel.socket().setTcpNoDelay(true);
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
this.socketChannel.socket().setSendBufferSize(1024 * 64);
this.writeSocketService = new WriteSocketService(this.socketChannel);
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();
}
/**
* 启动{@link #readSocketService}读取Slave传过来的数据
* 启动{@link #writeSocketService}向Channel写入消息,同步给Slave
*/
public void start() {
this.readSocketService.start();
this.writeSocketService.start();
}
}
HAConnection 在实例化时生成了writeSocketService 和readSocketService 两个对象。
writeSocketService:负责向SocketChannel写入CommitLog的消息数据;
readSocketService:负责从SocketChannel中读取Slave传递的ACK进度,也就是其CommitLog的maxPhyOffset。
Master在接收到Slave的请求后,创建SocketChannel,封装成一个HAConnection ,执行start( )方法,也就是执行writeSocketService和readSocketService。但是仅仅是start( ),不会主动向Slave发送数据,Slave和Master之间的互相传输的第一步是Slave迈出的。下面我们看Slave迈出的这一步:
class HAClient extends ServiceThread {
//Slave当前进度默认是0
private long currentReportedOffset = 0;
public void run() {
......
//当存在masterAdress != null && 连接Master成功
if (this.connectMaster()) {
// 若距离上次上报时间超过5S,上报到Master进度
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
......
// 若进度有变化,上报到Master进度
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
......
}
}
/**
* 是否满足上报进度的时间
* 距离上一次上报进度时间超过5S , 也就是每5S上报一次进度
*
* @return 是否
*/
private boolean isTimeToReportOffset() {
long interval = HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
return interval > HAService.this.defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval();
}
/**
* 上报Master进度。
* 如果上报失败,关闭连接
*
* @return 是否上报成功
*/
private boolean reportSlaveMaxOffsetPlus() {
boolean result = true;
long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (currentPhyOffset > this.currentReportedOffset) {
this.currentReportedOffset = currentPhyOffset;
result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
}
}
return result;
}
/**
* 上报进度,传递进度的时候仅传递一个Long类型的Offset,8个字节,没有其他数据
*
* @param maxOffset 进度
* @return 是否上报成功
*/
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
return !this.reportOffset.hasRemaining();
}
}
Slave在连接Master成功后,生成SocketChannel,注册SelectionKey.OP_READ事件,当Master有数据传输过来,
触发 SelectionKey.OP_READ,此时就能读取其数据。但Master启动时是不会主动传输数据的,因为其不知道Slave的CommitLog的maxPhyOffset,也就是不知道从哪个位置开始同步,需要Slave先上报当前CommitLog的maxPhyOffset。
reportSlaveMaxOffsetPlus( ) :Slave在启动时如果其maxPhyOffset > currentReportedOffset ,也就是大于0,也就是当前Slave是中途宕机了后再次启动的,那么马上向Master上报进度,从maxPhyOffset 处开始同步数据
isTimeToReportOffset() :Slave在启动时如果maxPhyOffset == 0 ,也就是说当前Slave是第一次启动,那么其会一直等待,等待5S中,触发isTimeToReportOffset(),然后向Master同步进度,从0开始同步数据。
为什么会有这种区别,在之后的代码中我会说说我自己的理解。
Slave将maxPhyOffset 传输给了Master,接下来看看Master收到这个进度后的处理流程。
/**
* 读取Slave进度线程服务
*/
class ReadSocketService extends ServiceThread {
public ReadSocketService(final SocketChannel socketChannel) throws IOException {
this.selector = RemotingUtil.openSelector();
this.socketChannel = socketChannel;
//向SocketChannle注册OP_READ事件
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
this.thread.setDaemon(true);
}
public void run() {
while (!this.isStopped()) {
try {
//最多阻塞1S钟,当有一个Channel发送了数据后,立即中断阻塞
//若1S时间内也没有Channel发送了数据,则processReadEvent()会立即返回true,然后再次阻塞,循环如此
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// Slave超过20S没有返回数据,断开连接
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
//超过20S时间没有收到Slave传递的消息,断开Slave连接
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
......
}
}
当Slave传递了自身的maxPhyOffset时,马上中断 selector.select(1000),执行processReadEvent()方法。
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
// 重置读取缓冲区以及processPosition
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPostion = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
//从Channel里读取数据,可能Slave没有发送过来进度
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
// 设置最后读取时间
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
//传递过来的数据大于8字节,有效的同步,8个字节代表着有一个有效的CommitLog Offset
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
// 读取Slave 请求来的CommitLog的最大位置
// position减去8的余数,这么做也是是为了防止流传输数据的粘包问题
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 读取ByteBuffer中position内最大的8字节的倍数,比如 position = 571,则 571-(571%8)= 571-3 = 568
// 也就是读取 560-568位置的字节,因为Slave可能发送了多个进度过来,Master只读最末尾也就是最大的那个
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
// 设置Slave CommitLog的最大位置
HAConnection.this.slaveAckOffset = readOffset;
// 设置Slave 第一次请求的位置,初始化 slaveRequestOffset = Slave的请求位置
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// 通知目前Slave进度。主要用于Master节点为同步类型的。
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) { //连续读取3此没有数据,break,返回true,休眠
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
HAConnection.this.slaveRequestOffset的默认值是-1,当Slave第一次同步进度时,更新其值为Slave的原始maxPhyOffset,writeSocketService启动后会定期循环,当发现slaveRequestOffset != -1 时,就知道从哪里开始将数据发送给Slave了,然后就会马上从此进度开始向SocketChannel写入数据。
接下来看WriteSocketService,也就是向SocketChannel写入数据的服务:
/**
* 写入Message线程服务
*/
class WriteSocketService extends ServiceThread {
......
private final int headerSize = 8 + 4;
private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
/**
* Slave下一次同步CommigLog的Offset
*/
private long nextTransferFromWhere = -1;
public void run() {
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 从未获得Slave读取进度请求,sleep等待。Slave第一次请求时会将初始化 slaveRequestOffset = Slave原始进度,可能为0,也可能是宕机重启,大于0
// 只有Slave同步了其原始的maxPhyOffset时,Master才知道从哪里开始将数据同步给Slave
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 计算初始化nextTransferFromWhere
if (-1 == this.nextTransferFromWhere) {
//Slave的CommitLog里没有任何数据,当前请求是其第一次请求
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
//从lastMappedFile的0位开始,也就是会忽略之前的MappedFile,只从最后一个文件开始同步
masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
//如果CommitLog是中途宕机了,重启,那么会接着上次的位置继续传输
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
if (this.lastWriteOver) {
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
//如果距离上次同步(写数据给Slave)超过5S,也就是5S内没有新的消息,发送一个空包过去,就是没有实际数据的,刷新Slave的最后写入时间
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver) {
continue;
}
}
} else { // 上次传输未完成,,继续传输
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver) {
continue;
}
}
// 选择新的CommitLog内容进行传输
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
//一次最多同步传输32KB
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size; //更新下次同步的起始位置,当前位置 + 修正过的size
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize); // 头部大小12字节,前8 Offset ; 后4 是 totalSize
this.byteBufferHeader.putLong(thisOffset); // 将同步的CommitLog的Offset存入
this.byteBufferHeader.putInt(size); // 将同步的数据size存入
this.byteBufferHeader.flip(); //转为写模式
this.lastWriteOver = this.transferData();
} else { // 没新的消息,挂起等待
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
......
}
}
以上代码的注释挺详细的了,看注释加代码基本能看明白,有几点需要额外讲解下:
- 当Slave是第一次启动时,也就是其CommitLog里没有任何数据,发送的maxPhyOffset==0,那么Master在同步数据时会仅同步lastMappedFile,也就是从最后一个MappedFile的起始位置开始同步,忽略其前面的所有MappedFile。如果是中途宕机后的再次启动,那么会接着其CommitLog的进度继续同步,否则其CommitLog就会出现数据断层的情况。也正是基于这种机制,当Slave再次启动时,马上就上报进度,开始数据同步,因为其进度与Master的maxPhyOffset可能间隔了一个或多个MappedFile了,要抓紧时间跟上。
- Master每次同步CommitLog的数据大小最大为32KB,不包括Header,也就是8byte的offset,4byte的bodySize。
- 由以上可知,Master传输数据并不是按消息整体来传输的,也就是说很多时候会传递消息的一部分
- Master会在一次数据写入SocketChannel后紧接着就进行下次写入,而不需要等待Slave返回ACK,也就是说不需要等待Slave返回确认收到
- 当没有新消息时,WriteSocketService每休眠100ms运行一次
下面看WriteSocketService 的transferData( )方法,也就是时间数据传输方法:
/**
* 传输数据
*/
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
第一次传输byteBufferHeader,也就是8个字节的起始offset,加上4个字节的bodySize。如果bodySize == 0 ( 也就是 selectMappedBufferResult == null ) ,也就是Slave的进度等于Master的进度了,此时仅传递一个offset,当长时间没有消息生成时需要这样,为了更新Master和Slave的最后同步时间。当Slave进度落后于Master进度,提取Slave的maxPhyOffset到Master的maxPhyOffset之间的数据,写入SocketChannel。
下篇博客我们接着讲Slave接收到数据的处理过程以及SYNC_MASTER下的数据同步流程。
最后
以上就是鲤鱼哈密瓜为你收集整理的RocketMQ主从同步机制源码解析及关键点总结(一)的全部内容,希望文章能够帮你解决RocketMQ主从同步机制源码解析及关键点总结(一)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复