概述
2019独角兽企业重金招聘Python工程师标准>>>
上一篇大体看了broker是怎么注册到namesrv中的,这篇文章看看namesrv接收到broker的注册请求的后,namesrv是怎么处理的。
注:因为后面准备单独看看RocketMQ的通信这块,所以这里就直接看namesrv处理broker注册请求的方法。
在前面的讲namesrv时,在initialize---->registerProcessor方法中,向remotingServer(NettyRemotingServer通信)注册了DefaultRequestProcessor对象(在Netty接收到请求后,会调用DefaultRequestProcessor类的processRequest方法,对请求进行处理)。这个对象里面中的processRequest方法就是处理各种请求的。
在processRequest方法中,根据request.getCode()来确定是什么请求(注册broker的code:REGISTER_BROKER)。
case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); }
可以看到broker的版本大于3_0_11就使用registerBrokerWithFilterServer进行broker的注册。这里又引出了一个FilterServer的概念,后面我们再单独看看。接着我们分别来看看这2个方法:registerBrokerWithFilterServer和registerBroker(这2个方法注册的过程大体一样,只是在处理FilterServer有稍微的不同)。最终处理这个请求的地方是在RouteInfoManager的registerBroker方法中:
public RegisterBrokerResult registerBroker(// final String clusterName,// 1 final String brokerAddr,// 2 final String brokerName,// 3 final long brokerId,// 4 final String haServerAddr,// 5 final TopicConfigSerializeWrapper topicConfigWrapper,// 6 final List<String> filterServerList, // 7 final Channel channel// 8 ) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly(); //根据集群名字,获取当前集群下面的所有brokerName //brokerName表示是一组broker(主从):如一个brokerName的值为:broker-a,可能包括一个master跟它的多个slave Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; //根据brokerName从brokerAddrTable中获取brokerDate信息 BrokerData brokerData = this.brokerAddrTable.get(brokerName); // 如果当前不存在brokerDate,即还没有broker向namesrv注册,则直接将当前broker信息put加入 if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(); brokerData.setBrokerName(brokerName); HashMap<Long, String> brokerAddrs = new HashMap<Long, String>(); brokerData.setBrokerAddrs(brokerAddrs); this.brokerAddrTable.put(brokerName, brokerData); } // 保存当前注册broker的brokerAddr地址信息都brokerData中去 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); if (null != topicConfigWrapper // && MixAll.MASTER_ID == brokerId) { //如果topicConfigWrapper不为空,且当前brokerId == 0,即为当前broker为master if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// || registerFirst) {// 如果Topic配置信息发生变更或者该broker为第一次注册 ConcurrentHashMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();// 获取所有topic信息 if (tcTable != null) { for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet()){ this.createAndUpdateQueueData(brokerName, entry.getValue());// 根据brokername及topicconfig(read、write queue数量等)新增或者更新到topicQueueTable中 } } } } BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, // new BrokerLiveInfo(// System.currentTimeMillis(), // topicConfigWrapper.getDataVersion(),// channel, // haServerAddr));// 更新最后变更时间(将brokerLiveTable中保存的对应的broker的更新时间戳,设置为当前时间) if (null == prevBrokerLiveInfo) { log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr); } // 更新Filter Server列表 if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } //如果当前broker为slave节点,则将haServerAddr、masterAddr等信息设置到result返回值中 if (MixAll.MASTER_ID != brokerId) { // 通过brokename的brokedate获取当前slave节点的master节点addr String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
- 将当前请求注册的broker信息保存或者更新到clusterAddrTable、brokerAddrTable中
- 将当前请求注册的broker的topic信息,保存或者更新到topicQueueTable中
broker定时上报,namesrv定时更新!
转载于:https://my.oschina.net/u/3134950/blog/1439033
最后
以上就是傻傻雪碧为你收集整理的RocketMQ探索-namesrv接收到broker的注册请求的全部内容,希望文章能够帮你解决RocketMQ探索-namesrv接收到broker的注册请求所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复