概述
Java打造RPC框架系列第四篇
上一篇文章中 给大家讲了zookeeper作为注册中心的基本原理
http://blog.csdn.net/we_phone/article/details/78993394
这篇文章中 我讲的是RPC框架接入对单点zookeeper的支持的源码实现
详细代码可见:Github MeiZhuoRPC
看懂这篇文章需要的前提
- 看了前面的系列文章
- 熟悉java.util.concurrent包
文章出于详细讲解的目的,篇幅较长 望耐心阅读 有疑问欢迎评论
首先看使用效果
我们启动两个提供者服务
@Test public void multi1and2() throws InterruptedException, IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "file:src/test/java/rpcTest/MultiServer1and2Context.xml" }); context.start(); //启动spring后才可启动 防止容器尚未加载完毕 RPC.start(); } @Test public void multi2() throws InterruptedException, IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "file:src/test/java/rpcTest/MultiServer2Context.xml" }); context.start(); //启动spring后才可启动 防止容器尚未加载完毕 RPC.start(); }
下面是他们对应的spring配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.meizhuo.rpc.server.ServerConfig"> <property name="port" value="9012"></property> <property name="zooKeeperHost" value="127.0.0.1:2181"></property> <property name="serverImplMap"> <map> <!--配置对应的抽象接口及其实现--> <entry key="rpcTest.Service1" value="rpcTest.Service1Impl"></entry> <entry key="rpcTest.Service2" value="rpcTest.Service2Impl"></entry> </map> </property> </bean> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.meizhuo.rpc.server.ServerConfig"> <property name="port" value="9002"></property> <property name="zooKeeperHost" value="127.0.0.1:2181"></property> <property name="serverImplMap"> <map> <!--配置对应的抽象接口及其实现--> <entry key="rpcTest.Service2" value="rpcTest.Service2Impl"></entry> </map> </property> </bean> </beans>
一个提供者只注册了service1 一个提供者service1 2都注册了
接下来调用消费者
ExecutorService executorService= Executors.newFixedThreadPool(8); for (int i = 0; i <1000 ; i++) { int finalI = i+1; executorService.execute(new Runnable() { @Override public void run() { Service1 service1= (Service1) RPC.call(Service1.class); Service2 service2= (Service2) RPC.call(Service2.class); System.out.println("第"+ finalI +"次发出请求"); service1.count(); service2.count(); } }); }
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.meizhuo.rpc.client.ClientConfig"> <property name="zooKeeperHost" value="127.0.0.1:2181"></property> <property name="serviceInterface"> <set> <value>rpcTest.Service1</value> <value>rpcTest.Service2</value> </set> </property> <!--负载均衡策略--> <property name="loadBalance" ref="Random"></property> </bean> <bean scope="prototype" class="org.meizhuo.rpc.zksupport.LoadBalance.RandomBalance" id="Random"> </bean> </beans>
这里用线程池 发出1000个请求,每个提供者者接收到请求后只是计数后输出,这里我就不贴代码了 我github上的单元测试有
调用效果如下
即注册service1也注册2的提供者输出:Service1 计数:1000 Service2 计数:515
只注册service2的输出:Service2 计数:485
485+515 刚好是1000
也就是service1只有1个提供者 请求就全部落在了一个节点上 service2 就分摊给了两个节点
这就是我们通过zookeeper注册中心实现的负载均衡的RPC调用的效果
首先看MeiZhuoRPC中zookeeper数据模型
路径的常量类如下
public class ZKConst { public static final Integer sessionTimeout=2000; public static final String rootPath="/MeiZhuoRPC"; public static final String providersPath="/providers"; public static final String consumersPath="/consumers"; public static final String servicePath="/service"; }
本例中尚无使用到/consumers节点
每个providers都进行监听 监听提供者IP的变化
zookeeper的操作
我封装了ZKTempZnodes 作为对zookeeper的主要操作
ZKServerService是对ZKTempZnodes的进一步封装
类似与数据库操作Service和DAO的封装
ZKServerService的操作如下
- 初始化一些根节点,例如/MeiZhuo /service这些 不存在则需要创建。
- 生成每个服务的providers和ip节点
- 获得并监听所有的IP
//生成所有注册的服务znode public void createServerService() throws KeeperException, InterruptedException { ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper); Map<String,String> serviceMap= RPC.getServerConfig().getServerImplMap(); String ip=RPC.getServerConfig().getServerHost(); for (Map.Entry<String,String> entry:serviceMap.entrySet()){ //获取配置中设置的IP设置为IP顺序节点的值 默认127.0.0.1:8888 zkTempZnodes.createTempZnode(ZKConst.rootPath+ZKConst.servicePath+"/"+entry.getKey()+ZKConst.providersPath+"/"+ip,null); //创建连接数节点 首次增加时连接数为0 // zkTempZnodes.createTempZnode(ZKConst.rootPath+ZKConst.balancePath+"/"+entry.getKey()+"/"+ip,0+""); } } //获得这个服务所有的提供者 包含监听注册 public List<String> getAllServiceIP(String serviceName) throws KeeperException, InterruptedException { ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper); IPWatcher ipWatcher=new IPWatcher(zooKeeper); return zkTempZnodes.getPathChildren(ZKConst.rootPath+ZKConst.servicePath+"/"+serviceName+ZKConst.providersPath,ipWatcher); } //初始化根节点及服务提供者节点 均为持久节点 public void initZnode() throws KeeperException, InterruptedException { ZKTempZnodes zkTempZnodes=new ZKTempZnodes(zooKeeper); StringBuilder pathBuilder=new StringBuilder(ZKConst.rootPath); // String balancePath=ZKConst.rootPath; zkTempZnodes.createSimpleZnode(pathBuilder.toString(),null); // balancePath=balancePath+ZKConst.balancePath; // zkTempZnodes.createSimpleZnode(balancePath,null); pathBuilder.append(ZKConst.servicePath); zkTempZnodes.createSimpleZnode(pathBuilder.toString(),null); Map<String,String> serverImplMap=RPC.getServerConfig().getServerImplMap(); for (Map.Entry<String,String> entry:serverImplMap.entrySet()){ // zkTempZnodes.createSimpleZnode(balancePath+"/"+entry.getKey(),null); StringBuilder serviceBuilder=new StringBuilder(pathBuilder.toString()); serviceBuilder.append("/"); serviceBuilder.append(entry.getKey()); zkTempZnodes.createSimpleZnode(serviceBuilder.toString(),null); serviceBuilder.append(ZKConst.providersPath); zkTempZnodes.createSimpleZnode(serviceBuilder.toString(),null); } }
负载均衡策略
常见的负载均衡策略,(加权)随机,轮询,最小连接数,一致性Hash
这里我只写好了随机和一致性hash方式的负载均衡
一致性hash原理讲解篇幅较多 读者可以自行查阅和看我github上对应的实现
负载均衡我用一个接口抽象出来
/** * Created by wephone on 18-1-8. * 负载均衡策略抽象接口 * 其他模块不耦合负载均衡代码 */ public interface LoadBalance { /** * 负载均衡选择服务中已选中的IP之一 * @param serviceName * @return */ String chooseIP(String serviceName) throws ProvidersNoFoundException; }
各种负载均衡策略对这个接口进行具体的实现
也就是依赖倒置原则 依赖抽象 不依赖具体实现
随机负载均衡的实现如下
/** * Created by wephone on 18-1-18. */ public class RandomBalance implements LoadBalance { @Override public String chooseIP(String serviceName) throws ProvidersNoFoundException { RPCRequestNet.getInstance().serviceLockMap.get(serviceName).readLock().lock(); Set<String> ipSet=RPCRequestNet.getInstance().serviceNameInfoMap.get(serviceName).getServiceIPSet(); int ipNum=ipSet.size(); if (ipNum==0){ throw new ProvidersNoFoundException(); } RPCRequestNet.getInstance().serviceLockMap.get(serviceName).readLock().unlock(); Random random = new Random(); //生成[0,num)区间的整数: int index = random.nextInt(ipNum); int count = 0; for (String ip : ipSet) { if (count == index) { //返回随机生成的索引位置ip return ip; } count++; } return null; } }
服务端的代码
服务端相对比较简单,就是启动netty服务器,然后向zookeeper注册自己的IP节点
RPC.start方法如下
public static void start() throws InterruptedException, IOException { System.out.println("welcome to use MeiZhuoRPC"); ZooKeeper zooKeeper= new ZKConnect().serverConnect(); ZKServerService zkServerService=new ZKServerService(zooKeeper); try { zkServerService.initZnode(); //创建所有提供者服务的znode zkServerService.createServerService(); } catch (KeeperException e) { e.printStackTrace(); } //阻塞服务端不会退出 RPCResponseNet.connect(); }
服务端的更新基本在这里,读配置把提供的服务注册在zookeeper上(初始化节点,创建服务节点)
服务端如此操作就可以了 其余的和前面1.0版本的一样 等待消费者连接即可
接下来看调用者端
调用者端主要使用的两个类
serviceInfo和IPChannelInfo
/** * Created by wephone on 18-1-8. * 每个服务对应的信息存放类 * 用在一个key为服务名字的serviceNameInfoMap里 */ public class ServiceInfo { //用于轮询负载均衡策略 private AtomicInteger index=new AtomicInteger(0); //这个服务所连接的提供者IP Set 只能由负载均衡类操作 private Set<String> serviceIPSet=new HashSet<>(); // public void setServiceIPSet(Set<String> serviceIPSet) { public void setServiceIPSet(List<String> newIPSet) { Set<String> set=new HashSet<>(); set.addAll(newIPSet); this.serviceIPSet.clear(); this.serviceIPSet.addAll(set); } public Set<String> getServiceIPSet() { return serviceIPSet; } public int getConnectIPSetCount(){ return serviceIPSet.size(); } public void addConnectIP(String IP) { serviceIPSet.add(IP); } public void removeConnectIP(String IP){ serviceIPSet.remove(IP); } }
ServiceInfo类
负载均衡以服务作为单位,因为每个服务的提供者IP都可能不一样。
一个服务(即一个抽象接口,例如上面的service1)对应一个这样的类
保存了该服务提供者的所有IP以及轮询策略的索引
后续可能扩展更多属性
/** * Created by wephone on 18-1-8. * IP对应的channel类 用于一个IP映射的Map IPChannelMap * 存放一个IP对应的channel */ public class IPChannelInfo { private EventLoopGroup group; private Channel channel; // //保证多线程修改时引用计数正确 // private AtomicInteger serviceQuoteNum=new AtomicInteger(0);//原子变量要赋初值 public EventLoopGroup getGroup() { return group; } public void setGroup(EventLoopGroup group) { this.group = group; } public Channel getChannel() { return channel; } public void setChannel(Channel channel) { this.channel = channel; } }
IPChannelInfo类
用于存储一个提供者的IP地址对应NIO通道及其Netty线程组(本例中暂无使用到此属性,之前想过可能提取释放连接资源才放了这个属性进来)
这两个基本类在RPCRequestNet中用一个Map维持 都是全局单例的
public class RPCRequestNet { //全局map 每个请求对应的锁 用于同步等待每个异步的RPC请求 public Map requestLockMap=new ConcurrentHashMap<String,RPCRequest>(); //每个IP对应一个锁 防止重复连接一个IP多次 public Map<String,Lock> connectlock=new ConcurrentHashMap<String,Lock>(); //服务名称 映射 服务信息类 public Map<String,ServiceInfo> serviceNameInfoMap=new ConcurrentHashMap<>(); //IP地址 映射 对应的NIO Channel及其引用次数 public Map<String,IPChannelInfo> IPChannelMap=new ConcurrentHashMap<>(); //全局读写锁 更新ip时为写操作 负载均衡选中IP为读操作 public ConcurrentHashMap<String,ReadWriteLock> serviceLockMap=new ConcurrentHashMap<>(); // public CountDownLatch countDownLatch=new CountDownLatch(1); private LoadBalance loadBalance; private static RPCRequestNet instance;
ServiceInfo的key为服务名称 即一个服务对应一个ServiceInfo
IPChannelInfo的key为ip地址 即一个ip地址对应一个IPChannelInfo
说完基本的类,我们来看看消费者端的启动
clientConfig获得IOC容器后 获得所有可用的提供者IP 并作为serviceInfo的初始值进行初始化
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RPC.clientContext=applicationContext; //获得IOC容器后 读取配置中的服务 try { ZooKeeper zooKeeper= new ZKConnect().clientConnect(); ZKServerService zkServerService=new ZKServerService(zooKeeper); Set<String> services=RPC.getClientConfig().getServiceInterface(); //初始化所有可用IP 初始化读写锁 for (String service:services){ List<String> ips=zkServerService.getAllServiceIP(service); for (String ip:ips){ RPCRequestNet.getInstance().IPChannelMap.putIfAbsent(ip,new IPChannelInfo()); } ServiceInfo serviceInfo=new ServiceInfo(); serviceInfo.setServiceIPSet(ips); ReadWriteLock readWriteLock=new ReentrantReadWriteLock(); RPCRequestNet.getInstance().serviceLockMap.putIfAbsent(service,readWriteLock); RPCRequestNet.getInstance().serviceNameInfoMap.putIfAbsent(service,serviceInfo); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
zookeeper的watcher
这里只使用到了一个watcher IPWatcher
/** * Created by wephone on 18-1-7. * 服务提供者和调用者的IP监控器 即监听服务的可用性 */ public class IPWatcher implements Watcher{ private ZooKeeper zooKeeper; public IPWatcher(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } @Override public void process(WatchedEvent watchedEvent) { /** * 监听到节点提供者IP节点变化时被调用 * 调用后进行平衡操作 */ String path=watchedEvent.getPath(); String[] pathArr=path.split("/"); String serviceName=pathArr[3];//第四个部分则为服务名 RPCRequestNet.getInstance().serviceLockMap.get(serviceName).writeLock().lock(); System.out.println("providers changed...Lock write Lock"); try { List<String> children=zooKeeper.getChildren(path,this); for (String ip:children){ RPCRequestNet.getInstance().IPChannelMap.putIfAbsent(ip,new IPChannelInfo()); } RPCRequestNet.getInstance().serviceNameInfoMap.get(serviceName).setServiceIPSet(children); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } RPCRequestNet.getInstance().serviceLockMap.get(serviceName).writeLock().unlock(); } }
也就是收到watch通知后 重新获取可用IP 写入serviceInfo 并再次注册watcher
这里我用到一种锁机制 读写锁
当IP发生更新时,我们改写ServiceInfo中的IP集合
这时需要阻塞IP的获取操作 以防获取到已经不存在的IP节点
所以采用读写锁,各个RPC获取IP时加读锁,相互不阻塞,当IP发生改变时 上写锁,相互阻塞 直至IP更新完毕
发送RPC请求时操作如下
public void send(RPCRequest request) throws ProvidersNoFoundException { String serviceName=request.getClassName(); String ip=loadBalance.chooseIP(serviceName); Channel channel=connect(ip); // System.out.println("Send RPC Thread:"+Thread.currentThread().getName()); try { //编解码对象为json 发送请求 String requestJson= null; try { requestJson = RPC.requestEncode(request); } catch (JsonProcessingException e) { e.printStackTrace(); } ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes()); channel.writeAndFlush(requestBuf); // System.out.println("调用"+request.getRequestID()+"已发送"); //挂起等待实现端处理完毕返回 TODO 后续配置超时时间 synchronized (request) { //放弃对象锁 并阻塞等待notify request.wait(); } // System.out.println("调用"+request.getRequestID()+"接收完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }
即选择IP 进行连接(如果已连接则在Map中取出Channel) 最后发送 基本和前面的版本一致
最后的补充
随机和轮询每个提供者都要和全部的调用者保持长连接的问题,可能导致提供者维持的长连接数可能过多的问题
例如10调用者 5提供者,最优情况下 一个提供者持有2个调用者连接即可,而上面的负载均衡方式,每个提供者都需要持有10个长连接,即使再加多一个提供者 他依旧得持有10个长连接。
一致性hash策略每个调用者只会映射给一个提供者,在调用者数量远远大于提供者时,大大减少了多余的长连接,但在调用者数小于提供者的时候,会有部分提供者一直没有收到请求的情况。这时候建议使用随机等策略,保证每个提供者都能被负载均衡。
框架使用者可以根据提供者调用者数量比较来选择相应的负载均衡策略。
之前想了一些方式来优化使每个提供者只持有够用的长连接,而且不会出现部分提供者不被请求到的情况,但最后都发现不合适,也比较复杂,因此github上的代码会有一些我注释掉和废弃掉的类和方法,但不影响使用。
以上就是MeiZhuoRPC支持zookeeper注册中心的核心代码
主要是接入封装了zookeeper的API和做了相应的负载均衡基础设施
更详细的代码可以在我的github上clone一份看看
https://github.com/wephone/MeiZhuoRPC
后续会继续完善各种负载均衡策略,逐步支持集群式zookeeper以及性能优化等等
欢迎持续关注我的博客及github
转载于:https://my.oschina.net/u/3768288/blog/1628710
最后
以上就是含蓄丝袜为你收集整理的RPC框架原理及从零实现系列文章(四):支持zookeeper注册中心与负载均衡Java打造RPC框架系列第四篇的全部内容,希望文章能够帮你解决RPC框架原理及从零实现系列文章(四):支持zookeeper注册中心与负载均衡Java打造RPC框架系列第四篇所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复