概述
1、组件结构图
HAService:主从同步入口类
AcceptSockerService:HA Master端监听客户端连接实现类
GroupTransferService:主从同步通知实现类
HAClient:HA Client端实现类
HAConnection:HA Master服务端HA连接对象的封装,与Broker从服务器的网络读写交互
ReadSocketService:HA Master网络读实现类
WriteSocketService:HA Master网络写实现类
2、原理
主服务器启动,并在特定端口上监听从服务器的连接,从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接。从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器。从服务器保存消息并继续发送新的消息同步请求。
2.1 启动
监听从服务器的连接。启动socket监听线程及组数据转移线程,启动ha客户端。
public void start() throws Exception {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}
2.2 AccpetSocketServcie
socketAddressListen:监听套接字
serverSocketChannel:服务端Socket通道
selector:事件选择器
2.2.1 beginAccept
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
创建ServerSocketChannel,selector,设置地址重用,绑定监听器端口,设置为非阻塞,注册ACCEPT事件
2.2.2 run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
选择器每1s处理一次连接就绪事件,连接事件就绪后,调用ServerSocketChannel的accept方法创建SocketChannel。然后为每个连接创建一个HAConnection对象,负责主从数据同步。
2.3 GroupTransferService
2.3.1 putRequest
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
this.wakeup();
}
将请求放入到requestWrite列表中,同时唤醒当前线程
2.3.2 run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
每隔10ms,执行等待复制完成。如果push2SlaveMaxOffset大于等于请求的偏移,则说明复制已经完成。如果小于并且当前时间差小于存储配置的同步刷新超时时间,则等待直到超时或者复制完成。
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead.clear();
}
}
}
2.4 HAClient
masterAddress:Master broker的地址
reportOffset:从向主发起主从同步的拉取偏移量
socketChannel:网络传输通道
selector:NIO事件选择器
lastWriteTimestamp:上次上报是写入传输通道的时间
currentReportedOffset:反馈slave当前的复制进度,commitlog文件最大偏移量
dipatchPosition:本次已处理读缓存的指针
byteBufferRead:读缓存区,大小为4M
byteBufferBackup:读缓存区备份,与byteBufferRead进行交换。
2.4.1 connectMaster
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
slave服务器连接master服务器,如果socketChannel为空,则尝试连接master。如果master地址为空,则返回false。如果master地址不为空,则建立到master的TCP连接,然后注册OP_READ,初始化currentReportedOffset为commitlog文件的最大偏移量,lastWriteTimestamp为当前时间戳。
2.4.2 isTimeToReportOffset
private boolean isTimeToReportOffset() {
long interval =
HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
.getHaSendHeartbeatInterval();
return needHeart;
}
判断是否需要向master反馈当前待拉取偏移量,master与slave的ha心跳发送间隔为5s。
2.4.3 reportSlaveMaxOffset
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
向master服务器反馈拉取偏移量。
2.4.4 processReadEvent
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
处理网络请求,从master服务器传加的消息数据,判断readByteBuffer是否有剩余空间,如果有,则调用read,将通道中的数据读入到缓存区中。如果读取到的字节数大于0,重置读取到0字节的次数,调用dispatchReadRequest将读取到的所有消息全部追加到消息内存映射文件 中,然后再次返回拉取进度给服务器。如果连续3次从网络通道读取到0个字节,则结束本次读,返回true。如果读取到的字节数小于0或发生io异常,则返回false.
2.5 HAConnection
slaveRequestOffset:从服务器请求拉取数据的偏移量
slaveAckOffset:从服务器反馈已拉取完成的数据偏移量。
byteBufferRead:网络读写缓存区,默认为1M
processPosition:当前处理指针
lastReadTimestamp:上次读取数据的时间戳。
最后
以上就是无辜万宝路为你收集整理的RocketMQ中的主从复制1、组件结构图2、原理的全部内容,希望文章能够帮你解决RocketMQ中的主从复制1、组件结构图2、原理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复