概述
文章目录
- 开发笔记
- 参考文章
- 启动示意图
- 对示意图中出现的单词意义解释
- provider
- consumer
- Zookeeper
- zk节点结构
- step1~6是启动顺序
- 实现细节
- 对于“当server的状态没有还没有同步到consumer的服务器列表里”这种情况该如何获取连接
- 思考第一阶段
- 思考第二阶段
- 思考第三阶段
- PS:增加一个补救措施
- 存在的问题
- 以下是知识笔记
- Netty
- 关于addListener方法
- 关于sync方法
- 关于await()方法
- 关于bootstrap的connect方法
- dubbo
- nicerpc实现的功能
开发笔记
参考文章
- netty源码分析之-Future、ChannelFuture与ChannelPromise详解(3)
- Netty 案例集锦之多线程篇
- Netty Client重连实现
- ChannelFuture的用法
- netty channel详解
- 浅析Netty的异步事件驱动(二)
启动示意图
对示意图中出现的单词意义解释
provider
服务提供者
consumer
服务消费者
Zookeeper
Zookeeper集群,作为服务的注册中心使用
zk节点结构
模仿dubbo
- 根路径:/nicerpc
- 服务路径在根路径下,根据不同的服务名命名:/nicerpc/serviceName,比如/nicerpc/com.w.service.api.UserService,一个服务下有providers,consumers,routers,configurations节点
- 服务提供者都在服务路径下的providers里:/nicerpc/serviceName/providers/providerHost+"#" +providerPort,比如/nicerpc/com.w.service.api.UserService/192.168.12.12#6666;后期框架优化,需要往节点的数据域附带这些host,port等信息。
- consumer类似provider
- 某一个服务的配置信息在/nicerpc/serviceName/configurations下,暂时并未用到
- 路由信息在/nicerpc/serviceName/routers下,暂时并未用到
step1~6是启动顺序
-
step1,provider的服务暴露
- provider启动netty的ServerBootstrap,进入nio的select监听状态
- provider扫描指定包,基于Spring的BeanPostProcessor找到标注了@Remote注解的Bean
- 根据这些Bean实现的接口的名字——ServiceName——去zk路径/nicerpc/serviceName/providers下注册自己为临时带序列号的节点,如果没有这些节点需要进行初始化建立这些节点;
- 同时将这些服务名与其具体实现的映射记录在concurrentHashMap里
-
step2,provider在zk注册完毕后,二者之间就有一条长连接,用于zk监视该机器状态
-
step3,consumer的服务发现
-
consumer启动netty的Bootstrap,进入nio的select监听状态
-
consumer扫描指定包,基于Spring的BeanPostProcessor找到标注了@RemoteInvoke的域,给这些域通过Field.set()方法设置一个基于SpringCGLib的代理类,在该代理类的intercept方法里,我们将会去使用netty进行一次rpc调用
-
consumer通过ServerManager管理与Provider的连接,而在ServerManager内部,通过维护三个ConcurrentHashMap来管理连接
-
realServerPathMap:保存serviceName与当前主机列表的映射
key:com.nicerpc.nicerpc_demo.api.UserService value:set{“192.168.1.2#8081”,“192.168.11.68#8081”}
-
hostAndPortManagerMap:保存serviceName与当前使用的provider所在的主机+端口的映射
key:com.nicerpc.nicerpc_demo.api.UserService
value:host#port
-
connectionManagerMap:保存与某台主机上的一个进程(这个进程可能有多个服务)的一条链接(连接缓存)
key:host#port
value:ChannelFuture实例
-
-
如果是第一次连接,会去将自己注册到zk的consumers下,并进行服务发现,即向zk拉取相应的serviceName/providers下的所有子节点,根据节点信息更新三个Map,并根据realServerPathMap里的内容进行负载均衡选举,使用选举结果进行异步连接,并更新三个Map。
-
-
step4,consumer在zk注册完毕后,二者之间就有一条长连接,用于zk监视该机器状态
-
step5,consumer在拿到了目标provider的地址后,发起rpc调用。
- 通过初始化时,注册的动态代理类的intercept方法实现,在该方法内,封装一个在consumer-provider里传递的媒介——ClientRequest,他封装了一个请求。
- 将clientRequest通过JSON格式化后,编码后发送给provider
- 通过Condition等并发工具类,异步获取服务端返回结果,并进行解码,返回给业务代码
-
step6,provider对consumer的rpc调用的响应
- 接收到新消息后,经过解码,将Json字符串重新格式化为ClientRequest对象
- 拿到请求的serviceName,methodName等,从缓存beanMethodMap中取出bean和method,执行method的invoke方法,将返回值封装进Response内,经过JSON格式化,编码,再异步的发送回consumer
实现细节
对于“当server的状态没有还没有同步到consumer的服务器列表里”这种情况该如何获取连接
一般情况下,因为server的状态通过Zookeeper不能同步的更新到consumer本地的服务器列表(realServerSet)内,所以,当一台甚至几台server同时宕掉而consumer不知道这些server宕掉并且去尝试去调用的时候,可能会造成不应该的消息发送失败。
对于这种情况,我的思考过程如下:
思考第一阶段
当本地对于该主机没有可以用的future缓存的时候,该怎么办?先不考虑真实的server状态是否更新到providers本地缓存表里,也不考虑基于netty的connect操作默认是异步的
我觉得如果没有的话,不管是future == null(的确没连接过)的情况,还是future的channel是isNotActive(可能是provider关闭)的情况,都需要重新建立一个新的连接,原因如下所示:
经过思考,我觉得应该是重新使用负载均衡策略进行选举,因为负载均衡选举是根据最新的providers本地缓存表进行的(至少当时是直接使用providers本地缓存表进行选举的,后面改成了接受一个Set<String>形参,根据这个形参来进行选举),所以他可以保证选举出来的provider都是可用的,假如重连当前的provider的话,如果该provider所在的主机进程关闭,虽然providers本地缓存表会更新,但我们在这里重连还是会失败,所以应该以providers本地缓存表为准,应该直接重新选举,然后根据选举结果进行重新连接,如果连接成功的话就返回,如果连接失败的话就再次递归调用本方法。
思考第二阶段
现在考虑真实的server状态不能同步的更新到consumer的providers本地缓存表里,而且同一台机器被负载均衡选中多次,并且该机器已经下线,该怎么办?
因为providers本地缓存表并不是跟真实的服务器情况同步更新的,所以在真实的服务器情况同步的这段时间内,如果负载均衡每次负载的都是同一台机器(根据负载均衡策略不同是可能发生这种情况的),而这台机器如果是已经下线的(但它的下线状态还没有被更新到providers本地缓存表中),在这段时间内,程序可能会一直递归下去!
所以我们需要在负载均衡前,尝试重连接一次,如果重连接成功的话就返回,如果失败的话,从providers本地缓存表中remove掉,然后进行负载均衡,这样的话,如果所有机器都宕掉,我们也会一台一台的将所有机器删除掉,从而最后在负载均衡的时候发现providers本地缓存表中为空,会抛出异常,终止,就不会无限的递归下去了。
思考第三阶段
考虑基于netty的connect操作默认是异步的
因为netty的connect操作默认是异步的,所以没办法同步的获取到连接是否成功的结果。当然,future支持sync,await等操作(这里,使用netty时,需要注意IO超时,与await超时的区别),但是这些都需要阻塞,都会付出不小的代价,会影响框架的性能。但是,如果不使用这些函数,我们就没办法达到同步获取连接结果的需求,也就没办法通过连接结果来决定是否在providers本地缓存表remove掉当前集群选举出来的这个provider,就还是可能出现思考阶段二出现的问题。
所以,我想到了针对这种想到了名为fastGetConnection的解决方案,中和了以上性能与程序无限递归之间的矛盾。方案如下:
在发现当前主机没有可用的future缓存的时候,首先关闭该future,然后构造一个从providers本地缓存表的副本,从该副本中剔除当前provider,使用这个副本进行fastGetConnection(Set<String> serverSet)操作。
fastGetConnection(Set<String> serverSet):
每次都使用形参serverSet进行负载均衡选举,并使用选举结果进行连接,并使用返回的future.await(long,TimeUnit)设置等待超时时间,当await阻塞完毕后(不管future是否连接完毕,阻塞完毕可能是超时,可能是连接完毕),检查是否连接成功,如果连接成功则返回,否则在传进来的serverSet中remove掉刚刚那个server,并递归调用fastGetConnection,这里我们可以通过对超时时间进行调优,快速的获取一个客户端的连接了,当遇到网络波动的时候,我们选取到的是一个连通最快的provider,而且serverSet.remove(server)这个操作是在副本上进行的,不会影响真实的server列表的更新。
也就是在providers本地缓存表的副本上,进行选举,快速连接,根据连接结果在副本上remove或者返回连接,真实的providers情况不去管它,任其自由地更新,哪怕我们remove掉一个只是网络比较慢的但还可以用的provider也不会影响到它在providers本地缓存表是否存在。
PS:增加一个补救措施
然后又经过思考,决定在加一个补救措施,就是normalGetConnection,如果通过fastGetConnection获取不到(这种情况只会发生在把providers本地缓存表副本掏空后,负载均衡选举发生异常的时候),会接着通过normalGetConnection获取。normalGetConnection就仅仅是根据当前的providers本地缓存表列表做一个负载均衡选举,然后根据选举结果直接进行异步连接,不管连接结果是不是成功
存在的问题
await超时时间的设置多少合适,而且也不清楚使用await带来的性能损耗。
以下是知识笔记
Netty
关于addListener方法
- 经过测试,channelFuture.channel().xxx().addListener(listener)
方法添加的监听器只对单种IO事件的单次操作有效。 - 比如,channelFuture.channel().connect().addListener()添加的监听器只会在connect完成后调用,不会在…channel().writeAndFlush()完成后调用
- 再比如,channelFuture.channel().writeAndFlush().addListener()添加的监听器只会在本次writeAndFlush()完成后执行,假如接着执行了另一次writeAndFlush()操作,该监听器不会再次触发
- ps:监听器会在isDone()返回true后被立即调用
关于sync方法
- sync会阻塞到当前IO操作直到其isDone()返回true。
- isDone()代表着完成了,结果可能是成功,失败,被中断等。
- 该IO操作是否成功还是得看isSuccess()方法。
关于await()方法
- await(),await(long,TimeUnit),awaitUnInterrupt等方法,底层调用了Object的wait方法,他会阻塞到isDone返回true,上面说到了,isDone返回true有多种结果,不一定成功。
- 当设置了超时时间时,只要到达了超时时间,就不管isDone是否返回true了,会直接返回。
关于bootstrap的connect方法
- 经测试,使用同一个bootstrap去连接同一个主机上的同一个进程(ip、port)都相同,返回的是不同的两个future(也就是两个连接),二者可以并发的读写数据。
dubbo
- 自动发现: 基于注册中心目录服务,使服务消费方能动态的查找服务提供方,使地址透明,使服务提供方可以平滑增加或减少机器。
- 集群容错: 提供基于接口方法的透明远程过程调用,包括多协议支持,以及软负载均衡,失败容错,地址路由,动态配置等集群支持。
- 远程通讯: 提供对多种基于长连接的NIO框架抽象封装,包括多种线程模型,序列化,以及“请求-响应”模式的信息交换方式。
nicerpc实现的功能
- 客户端超时,超时的链接自动关闭
- 分离业务模块
- 增加zk模块,从zk获取服务器列表
- 客户端动态管理连接
- Netty实现RPC服务器
- 定义自己的简单通信协议
- Client与RPC服务器使用长连接进行异步通信
- 客户端动态代理使用SpringCGLib,BeanPostProcessor接口
- 服务器注册到Zookeeper,客户端通过Zookeeper监听服务器状态
最后
以上就是敏感鸭子为你收集整理的基于netty+Spring+Zookeeper的分布式RPC框架开发笔记以下是知识笔记的全部内容,希望文章能够帮你解决基于netty+Spring+Zookeeper的分布式RPC框架开发笔记以下是知识笔记所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复