我是靠谱客的博主 傻傻雪碧,最近开发中收集的这篇文章主要介绍RocketMQ探索-namesrv接收到broker的注册请求,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

上一篇大体看了broker是怎么注册到namesrv中的,这篇文章看看namesrv接收到broker的注册请求的后,namesrv是怎么处理的。

注:因为后面准备单独看看RocketMQ的通信这块,所以这里就直接看namesrv处理broker注册请求的方法。

在前面的讲namesrv时,在initialize---->registerProcessor方法中,向remotingServer(NettyRemotingServer通信)注册了DefaultRequestProcessor对象(在Netty接收到请求后,会调用DefaultRequestProcessor类的processRequest方法,对请求进行处理)。这个对象里面中的processRequest方法就是处理各种请求的。

在processRequest方法中,根据request.getCode()来确定是什么请求(注册broker的code:REGISTER_BROKER)。

case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
else {
return this.registerBroker(ctx, request);
}

可以看到broker的版本大于3_0_11就使用registerBrokerWithFilterServer进行broker的注册。这里又引出了一个FilterServer的概念,后面我们再单独看看。接着我们分别来看看这2个方法:registerBrokerWithFilterServer和registerBroker(这2个方法注册的过程大体一样,只是在处理FilterServer有稍微的不同)。最终处理这个请求的地方是在RouteInfoManager的registerBroker方法中:

public RegisterBrokerResult registerBroker(//
final String clusterName,// 1
final String brokerAddr,// 2
final String brokerName,// 3
final long brokerId,// 4
final String haServerAddr,// 5
final TopicConfigSerializeWrapper topicConfigWrapper,// 6
final List<String> filterServerList, // 7
final Channel channel// 8
) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
//根据集群名字,获取当前集群下面的所有brokerName
//brokerName表示是一组broker(主从):如一个brokerName的值为:broker-a,可能包括一个master跟它的多个slave
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//根据brokerName从brokerAddrTable中获取brokerDate信息
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
// 如果当前不存在brokerDate,即还没有broker向namesrv注册,则直接将当前broker信息put加入
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData();
brokerData.setBrokerName(brokerName);
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerData.setBrokerAddrs(brokerAddrs);
this.brokerAddrTable.put(brokerName, brokerData);
}
// 保存当前注册broker的brokerAddr地址信息都brokerData中去
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
if (null != topicConfigWrapper //
&& MixAll.MASTER_ID == brokerId) { //如果topicConfigWrapper不为空,且当前brokerId == 0,即为当前broker为master
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
|| registerFirst) {// 如果Topic配置信息发生变更或者该broker为第一次注册
ConcurrentHashMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();// 获取所有topic信息
if (tcTable != null) {
for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet()){
this.createAndUpdateQueueData(brokerName, entry.getValue());// 根据brokername及topicconfig(read、write queue数量等)新增或者更新到topicQueueTable中
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, //
new BrokerLiveInfo(//
System.currentTimeMillis(), //
topicConfigWrapper.getDataVersion(),//
channel, //
haServerAddr));// 更新最后变更时间(将brokerLiveTable中保存的对应的broker的更新时间戳,设置为当前时间)
if (null == prevBrokerLiveInfo) {
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
}
// 更新Filter Server列表
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
//如果当前broker为slave节点,则将haServerAddr、masterAddr等信息设置到result返回值中
if (MixAll.MASTER_ID != brokerId) {
// 通过brokename的brokedate获取当前slave节点的master节点addr
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
  1. 将当前请求注册的broker信息保存或者更新到clusterAddrTable、brokerAddrTable中
  2. 将当前请求注册的broker的topic信息,保存或者更新到topicQueueTable中

broker定时上报,namesrv定时更新!

 

转载于:https://my.oschina.net/u/3134950/blog/1439033

最后

以上就是傻傻雪碧为你收集整理的RocketMQ探索-namesrv接收到broker的注册请求的全部内容,希望文章能够帮你解决RocketMQ探索-namesrv接收到broker的注册请求所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部