我是靠谱客的博主 大力星星,最近开发中收集的这篇文章主要介绍Zookeeper实现简单的分布式RPC框架,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Zookeeper实现简单的分布式RPC框架

  • 博客分类: 
  • zookeeper
 

      RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现

       服务器端,通过SocketServer,持续接收客户端的请求,并将客户端的请求分发到指定的处理器出去处理。

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ 
  5.  */  
  6. public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {  
  7.   
  8.     /**服务端口号**/  
  9.     private int                port       = 12000;  
  10.   
  11.     private ServerSocket       server;  
  12.   
  13.     //线程池  
  14.     @Autowired  
  15.     private Executor           executorService;  
  16.   
  17.     public Map<String, Object> handlerMap = new ConcurrentHashMap<>();  
  18.   
  19.     private void publishedService() throws Exception {  
  20.   
  21.         server = new ServerSocket(port);  
  22.   
  23.         // 一直服务  
  24.         for (;;) {  
  25.             try {  
  26.                 // 获取socket  
  27.                 final Socket socket = server.accept();  
  28.                 executorService.execute(new Runnable() {  
  29.   
  30.                     @Override  
  31.                     public void run() {  
  32.                         try {  
  33.                             // 获取input  
  34.                             ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
  35.                             ObjectOutputStream output = new ObjectOutputStream(socket  
  36.                                 .getOutputStream());  
  37.                             try {  
  38.                                 // 获取引用  
  39.                                 String interfaceName = input.readUTF();  
  40.                                 //获取 方法名  
  41.                                 String methodName = input.readUTF();  
  42.                                 //  
  43.                                 Class<?>[] parameterTypes = (Class<?>[]) input.readObject();  
  44.                                 Object[] arguments = (Object[]) input.readObject();  
  45.                                 try {  
  46.                                     Object service = handlerMap.get(interfaceName);  
  47.                                     Method method = service.getClass().getMethod(methodName,  
  48.                                         parameterTypes);  
  49.                                     Object result = method.invoke(service, arguments);  
  50.                                     output.writeObject(result);  
  51.                                 } catch (Throwable t) {  
  52.                                     output.writeObject(t);  
  53.                                 } finally {  
  54.                                     input.close();  
  55.                                 }  
  56.                             } finally {  
  57.                                 socket.close();  
  58.                             }  
  59.   
  60.                         } catch (Exception e) {  
  61.   
  62.                         }  
  63.                     }  
  64.                 });  
  65.             } catch (Exception e) {  
  66.   
  67.             }  
  68.         }  
  69.   
  70.     }  
  71.   
  72.     public void init() {  
  73.   
  74.     }  
  75.   
  76.     /** 
  77.      * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 
  78.      */  
  79.     @Override  
  80.     public void afterPropertiesSet() throws Exception {  
  81.         //发布服务  
  82.         publishedService();  
  83.     }  
  84.   
  85.     /** 
  86.      * @see org.springframework.context.Lifecycle#start() 
  87.      */  
  88.     @Override  
  89.     public void start() {  
  90.     }  
  91.   
  92.     /** 
  93.      * @see org.springframework.context.Lifecycle#stop() 
  94.      */  
  95.     @Override  
  96.     public void stop() {  
  97.         if (server != null) {  
  98.             try {  
  99.                 server.close();  
  100.   
  101.             } catch (IOException e) {  
  102.   
  103.             }  
  104.         }  
  105.     }  
  106.   
  107.     /** 
  108.      * @see org.springframework.context.Lifecycle#isRunning() 
  109.      */  
  110.     @Override  
  111.     public boolean isRunning() {  
  112.         return false;  
  113.     }  
  114.   
  115.     /** 
  116.      * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) 
  117.      */  
  118.     @Override  
  119.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {  
  120.         Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);  
  121.         System.out.println(serviceBeanMap);  
  122.         if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {  
  123.             for (Object serviceBean : serviceBeanMap.values()) {  
  124.                 String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()  
  125.                         .getName();  
  126.                 handlerMap.put(interfaceName, serviceBean);  
  127.             }  
  128.         }  
  129.     }  
  130.   
  131.     /** 
  132.      * Setter method for property <tt>executorService</tt>. 
  133.      * 
  134.      * @param executorService value to be assigned to property executorService 
  135.      */  
  136.     public void setExecutorService(Executor executorService) {  
  137.         this.executorService = executorService;  
  138.     }  
  139.   
  140. }  

 

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $ 
  5.  */  
  6. @Documented  
  7. @Target({ ElementType.TYPE })  
  8. @Retention(RetentionPolicy.RUNTIME)  
  9. @Component  
  10. public @interface SRPC {  
  11.   
  12.     public Class<?> interf();  
  13. }  

 

 至此就实现了服务的自动发现自动注册,当然这个仅针对单机环境下的自动注册。

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ 
  5.  */  
  6. public class Client {  
  7.     /** 
  8.      * 引用服务 
  9.      * 
  10.      * @param <T> 接口泛型 
  11.      * @param interfaceClass 接口类型 
  12.      * @param host 服务器主机名 
  13.      * @param port 服务器端口 
  14.      * @return 远程服务 
  15.      * @throws Exception 
  16.      */  
  17.     @SuppressWarnings("unchecked")  
  18.     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)  
  19.                                                                                                throws Exception {  
  20.         if (interfaceClass == null || !interfaceClass.isInterface()) {  
  21.             throw new IllegalArgumentException("必须指定服务接口");  
  22.         }  
  23.   
  24.         if (host == null || host.length() == 0) {  
  25.             throw new IllegalArgumentException("必须指定服务器的地址和端口号");  
  26.         }  
  27.   
  28.         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),  
  29.             new Class<?>[] { interfaceClass }, new InvocationHandler() {  
  30.                 @Override  
  31.                 public Object invoke(Object proxy, Method method, Object[] arguments)  
  32.                                                                                      throws Throwable {  
  33.                     Socket socket = new Socket(host, port);  
  34.                     try {  
  35.                         ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
  36.                         try {  
  37.                             output.writeUTF(interfaceClass.getName());  
  38.                             output.writeUTF(method.getName());  
  39.                             output.writeObject(method.getParameterTypes());  
  40.                             output.writeObject(arguments);  
  41.                             ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
  42.                             try {  
  43.                                 Object result = input.readObject();  
  44.                                 if (result instanceof Throwable) {  
  45.                                     throw (Throwable) result;  
  46.                                 }  
  47.                                 return result;  
  48.                             } finally {  
  49.                                 input.close();  
  50.                             }  
  51.                         } finally {  
  52.                             output.close();  
  53.                         }  
  54.                     } finally {  
  55.                         socket.close();  
  56.                     }  
  57.                 }  
  58.             });  
  59.     }  
  60. }  

    上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $ 
  5.  */  
  6. public class SrpcRequest implements Serializable {  
  7.   
  8.     /**  */  
  9.     private static final long serialVersionUID = 6132853628325824727L;  
  10.     // 请求Id  
  11.     private String            requestId;  
  12.     // 远程调用接口名称  
  13.     private String            interfaceName;  
  14.     //远程调用方法名称  
  15.     private String            methodName;  
  16.     // 参数类型  
  17.     private Class<?>[]        parameterTypes;  
  18.     // 参数值  
  19.     private Object[]          parameters;  
  20.   
  21.     /** 
  22.      * Getter method for property <tt>requestId</tt>. 
  23.      * 
  24.      * @return property value of requestId 
  25.      */  
  26.     public String getRequestId() {  
  27.         return requestId;  
  28.     }  
  29.   
  30.     /** 
  31.      * Setter method for property <tt>requestId</tt>. 
  32.      * 
  33.      * @param requestId value to be assigned to property requestId 
  34.      */  
  35.     public void setRequestId(String requestId) {  
  36.         this.requestId = requestId;  
  37.     }  
  38.   
  39.     /** 
  40.      * Getter method for property <tt>interfaceName</tt>. 
  41.      * 
  42.      * @return property value of interfaceName 
  43.      */  
  44.     public String getInterfaceName() {  
  45.         return interfaceName;  
  46.     }  
  47.   
  48.     /** 
  49.      * Setter method for property <tt>interfaceName</tt>. 
  50.      * 
  51.      * @param interfaceName value to be assigned to property interfaceName 
  52.      */  
  53.     public void setInterfaceName(String interfaceName) {  
  54.         this.interfaceName = interfaceName;  
  55.     }  
  56.   
  57.     /** 
  58.      * Getter method for property <tt>methodName</tt>. 
  59.      * 
  60.      * @return property value of methodName 
  61.      */  
  62.     public String getMethodName() {  
  63.         return methodName;  
  64.     }  
  65.   
  66.     /** 
  67.      * Setter method for property <tt>methodName</tt>. 
  68.      * 
  69.      * @param methodName value to be assigned to property methodName 
  70.      */  
  71.     public void setMethodName(String methodName) {  
  72.         this.methodName = methodName;  
  73.     }  
  74.   
  75.     /** 
  76.      * Getter method for property <tt>parameterTypes</tt>. 
  77.      * 
  78.      * @return property value of parameterTypes 
  79.      */  
  80.     public Class<?>[] getParameterTypes() {  
  81.         return parameterTypes;  
  82.     }  
  83.   
  84.     /** 
  85.      * Setter method for property <tt>parameterTypes</tt>. 
  86.      * 
  87.      * @param parameterTypes value to be assigned to property parameterTypes 
  88.      */  
  89.     public void setParameterTypes(Class<?>[] parameterTypes) {  
  90.         this.parameterTypes = parameterTypes;  
  91.     }  
  92.   
  93.     /** 
  94.      * Getter method for property <tt>parameters</tt>. 
  95.      * 
  96.      * @return property value of parameters 
  97.      */  
  98.     public Object[] getParameters() {  
  99.         return parameters;  
  100.     }  
  101.   
  102.     /** 
  103.      * Setter method for property <tt>parameters</tt>. 
  104.      * 
  105.      * @param parameters value to be assigned to property parameters 
  106.      */  
  107.     public void setParameters(Object[] parameters) {  
  108.         this.parameters = parameters;  
  109.     }  
  110.   
  111. }  

 

 

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $ 
  5.  */  
  6. public class SrpcResponse implements Serializable {  
  7.     /**  */  
  8.     private static final long serialVersionUID = -5934073769679010930L;  
  9.     // 请求的Id  
  10.     private String            requestId;  
  11.     // 异常  
  12.     private Throwable         error;  
  13.     // 响应  
  14.     private Object            result;  
  15.   
  16.     /** 
  17.      * Getter method for property <tt>requestId</tt>. 
  18.      * 
  19.      * @return property value of requestId 
  20.      */  
  21.     public String getRequestId() {  
  22.         return requestId;  
  23.     }  
  24.   
  25.     /** 
  26.      * Setter method for property <tt>requestId</tt>. 
  27.      * 
  28.      * @param requestId value to be assigned to property requestId 
  29.      */  
  30.     public void setRequestId(String requestId) {  
  31.         this.requestId = requestId;  
  32.     }  
  33.   
  34.     /** 
  35.      * Getter method for property <tt>error</tt>. 
  36.      * 
  37.      * @return property value of error 
  38.      */  
  39.     public Throwable getError() {  
  40.         return error;  
  41.     }  
  42.   
  43.     /** 
  44.      * Setter method for property <tt>error</tt>. 
  45.      * 
  46.      * @param error value to be assigned to property error 
  47.      */  
  48.     public void setError(Throwable error) {  
  49.         this.error = error;  
  50.     }  
  51.   
  52.     /** 
  53.      * Getter method for property <tt>result</tt>. 
  54.      * 
  55.      * @return property value of result 
  56.      */  
  57.     public Object getResult() {  
  58.         return result;  
  59.     }  
  60.   
  61.     /** 
  62.      * Setter method for property <tt>result</tt>. 
  63.      * 
  64.      * @param result value to be assigned to property result 
  65.      */  
  66.     public void setResult(Object result) {  
  67.         this.result = result;  
  68.     }  
  69.   
  70. }  

 

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ 
  5.  */  
  6. public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {  
  7.   
  8.     /**服务端口号**/  
  9.     private int                port       = 12000;  
  10.   
  11.     private ServerSocket       server;  
  12.   
  13.     //线程池  
  14.     @Autowired  
  15.     private Executor           executorService;  
  16.   
  17.     public Map<String, Object> handlerMap = new ConcurrentHashMap<>();  
  18.   
  19.     private void publishedService() throws Exception {  
  20.   
  21.         server = new ServerSocket(port);  
  22.   
  23.         // 一直服务  
  24.         for (;;) {  
  25.             try {  
  26.                 // 获取socket  
  27.                 final Socket socket = server.accept();  
  28.                 executorService.execute(new Runnable() {  
  29.   
  30.                     @Override  
  31.                     public void run() {  
  32.                         try {  
  33.                             // 获取input  
  34.                             ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
  35.                             try {  
  36.                                 // 获取RPC请求  
  37.                                 SrpcRequest request = (SrpcRequest) input.readObject();  
  38.                                 ObjectOutputStream output = new ObjectOutputStream(socket  
  39.                                     .getOutputStream());  
  40.                                 try {  
  41.                                     SrpcResponse response = doHandle(request);  
  42.                                     output.writeObject(response);  
  43.                                 } finally {  
  44.                                     input.close();  
  45.                                 }  
  46.                             } finally {  
  47.                                 socket.close();  
  48.                             }  
  49.   
  50.                         } catch (Exception e) {  
  51.   
  52.                         }  
  53.                     }  
  54.                 });  
  55.             } catch (Exception e) {  
  56.   
  57.             }  
  58.         }  
  59.   
  60.     }  
  61.   
  62.     private SrpcResponse doHandle(SrpcRequest request) {  
  63.         SrpcResponse response = new SrpcResponse();  
  64.         response.setRequestId(request.getRequestId());  
  65.         try {  
  66.             Object service = handlerMap.get(request.getInterfaceName());  
  67.             Method method = service.getClass().getMethod(request.getMethodName(),  
  68.                 request.getParameterTypes());  
  69.             response.setResult(method.invoke(service, request.getParameters()));  
  70.   
  71.         } catch (Exception e) {  
  72.             response.setError(e);  
  73.         }  
  74.         return response;  
  75.     }  
  76.   
  77.     public void init() {  
  78.   
  79.     }  
  80.   
  81.     /** 
  82.      * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 
  83.      */  
  84.     @Override  
  85.     public void afterPropertiesSet() throws Exception {  
  86.         //发布  
  87.         publishedService();  
  88.     }  
  89.   
  90.     /** 
  91.      * @see org.springframework.context.Lifecycle#start() 
  92.      */  
  93.     @Override  
  94.     public void start() {  
  95.     }  
  96.   
  97.     /** 
  98.      * @see org.springframework.context.Lifecycle#stop() 
  99.      */  
  100.     @Override  
  101.     public void stop() {  
  102.         if (server != null) {  
  103.             try {  
  104.                 server.close();  
  105.   
  106.             } catch (IOException e) {  
  107.   
  108.             }  
  109.         }  
  110.     }  
  111.   
  112.     /** 
  113.      * @see org.springframework.context.Lifecycle#isRunning() 
  114.      */  
  115.     @Override  
  116.     public boolean isRunning() {  
  117.         return false;  
  118.     }  
  119.   
  120.     /** 
  121.      * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) 
  122.      */  
  123.     @Override  
  124.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {  
  125.         Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);  
  126.         System.out.println(serviceBeanMap);  
  127.         if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {  
  128.             for (Object serviceBean : serviceBeanMap.values()) {  
  129.                 String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()  
  130.                         .getName();  
  131.                 handlerMap.put(interfaceName, serviceBean);  
  132.             }  
  133.         }  
  134.     }  
  135.   
  136.     /** 
  137.      * Setter method for property <tt>executorService</tt>. 
  138.      * 
  139.      * @param executorService value to be assigned to property executorService 
  140.      */  
  141.     public void setExecutorService(Executor executorService) {  
  142.         this.executorService = executorService;  
  143.     }  
  144.   
  145. }  

 

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ 
  5.  */  
  6. public class Client {  
  7.     /** 
  8.      * 引用服务 
  9.      * 
  10.      * @param <T> 接口泛型 
  11.      * @param interfaceClass 接口类型 
  12.      * @param host 服务器主机名 
  13.      * @param port 服务器端口 
  14.      * @return 远程服务 
  15.      * @throws Exception 
  16.      */  
  17.     @SuppressWarnings("unchecked")  
  18.     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)  
  19.             throws Exception {  
  20.         if (interfaceClass == null || !interfaceClass.isInterface()) {  
  21.             throw new IllegalArgumentException("必须指定服务接口");  
  22.         }  
  23.   
  24.         if (host == null || host.length() == 0) {  
  25.             throw new IllegalArgumentException("必须指定服务器的地址和端口号");  
  26.         }  
  27.   
  28.         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),  
  29.             new Class<?>[] { interfaceClass }, new InvocationHandler() {  
  30.             @Override  
  31.             public Object invoke(Object proxy, Method method, Object[] arguments)  
  32.                     throws Throwable {  
  33.                 Socket socket = new Socket(host, port);  
  34.                 try {  
  35.                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
  36.                     try {  
  37.                         SrpcRequest request = new SrpcRequest();  
  38.                         request.setRequestId(UUID.randomUUID().toString());  
  39.                         request.setInterfaceName(interfaceClass.getName());  
  40.                         request.setMethodName(method.getName());  
  41.                         request.setParameterTypes(method.getParameterTypes());  
  42.                         request.setParameters(arguments);  
  43.                         output.writeObject(request);  
  44.                         ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
  45.                         try {  
  46.                             SrpcResponse response = (SrpcResponse) input.readObject();  
  47.                             if (response.getError() != null  
  48.                                     && response.getError() instanceof Throwable) {  
  49.                                 throw response.getError();  
  50.                             }  
  51.                             return response.getResult();  
  52.                         } finally {  
  53.                             input.close();  
  54.                         }  
  55.                     } finally {  
  56.                         output.close();  
  57.                     }  
  58.                 } finally {  
  59.                     socket.close();  
  60.                 }  
  61.             }  
  62.         });  
  63.     }  
  64. }  

     后续继续优化序列化和NIO优化

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ 
  5.  */  
  6. public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {  
  7.   
  8.     /**服务端口号**/  
  9.     private int                 port       = 12000;  
  10.   
  11.     private Selector            selector;  
  12.   
  13.     private ServerSocketChannel serverSocketChannel;  
  14.   
  15.     public Map<String, Object>  handlerMap = new ConcurrentHashMap<>();  
  16.   
  17.     private void publishedService() throws Exception {  
  18.   
  19.         // 一直服务  
  20.         for (;;) {  
  21.             try {  
  22.                 //超时1s  
  23.                 selector.select(1000);  
  24.                 Set<SelectionKey> selectionKeys = selector.selectedKeys();  
  25.                 Iterator<SelectionKey> it = selectionKeys.iterator();  
  26.                 SelectionKey key = null;  
  27.                 while (it.hasNext()) {  
  28.                     key = it.next();  
  29.                     it.remove();  
  30.                     try {  
  31.                         handleInput(key);  
  32.                     } catch (Exception e) {  
  33.                         e.printStackTrace();  
  34.                     }  
  35.                 }  
  36.             } catch (Exception e) {  
  37.                 e.printStackTrace();  
  38.             }  
  39.         }  
  40.   
  41.     }  
  42.   
  43.     private void handleInput(SelectionKey key) throws IOException {  
  44.         if (key.isValid()) {  
  45.             if (key.isAcceptable()) {  
  46.                 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();  
  47.                 SocketChannel sc = ssc.accept();  
  48.                 sc.configureBlocking(false);  
  49.                 sc.register(selector, SelectionKey.OP_READ);  
  50.             }  
  51.             if (key.isReadable()) {  
  52.                 SocketChannel sc = (SocketChannel) key.channel();  
  53.                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);  
  54.   
  55.                 int readBytes = sc.read(readBuffer);  
  56.                 if (readBytes > 0) {  
  57.                     readBuffer.flip();  
  58.                     byte[] bytes = new byte[readBuffer.remaining()];  
  59.                     readBuffer.get(bytes);  
  60.                     SrpcRequest request = SerializationUtil.deserializer(bytes, SrpcRequest.class);  
  61.                     SrpcResponse response = doHandle(request);  
  62.                     doWriteResponse(sc, response);  
  63.                 } else if (readBytes < 0) {  
  64.                     key.cancel();  
  65.                     sc.close();  
  66.                 }  
  67.             }  
  68.         }  
  69.     }  
  70.   
  71.     private void doWriteResponse(SocketChannel channel, SrpcResponse response) throws IOException {  
  72.         if (response == null) {  
  73.             return;  
  74.         }  
  75.         byte[] bytes = SerializationUtil.serializer(response);  
  76.         ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);  
  77.         writeBuffer.put(bytes);  
  78.         writeBuffer.flip();  
  79.         channel.write(writeBuffer);  
  80.     }  
  81.   
  82.     /** 
  83.      * 
  84.      * @throws IOException 
  85.      * @throws ClosedChannelException 
  86.      */  
  87.     private void init() throws IOException, ClosedChannelException {  
  88.         // 打开socketChannel  
  89.         serverSocketChannel = ServerSocketChannel.open();  
  90.         //设置非阻塞模式  
  91.         serverSocketChannel.configureBlocking(false);  
  92.         // 绑定端口  
  93.         serverSocketChannel.socket().bind(new InetSocketAddress(port));  
  94.         // 创建selector  
  95.         selector = Selector.open();  
  96.   
  97.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  
  98.     }  
  99.   
  100.     private SrpcResponse doHandle(SrpcRequest request) {  
  101.         SrpcResponse response = new SrpcResponse();  
  102.         if (StringUtils.isBlank(request.getRequestId())) {  
  103.             response.setError(new IllegalArgumentException("request id must be not null"));  
  104.         }  
  105.         response.setRequestId(request.getRequestId());  
  106.         try {  
  107.             Object service = handlerMap.get(request.getInterfaceName());  
  108.             Method method = service.getClass().getMethod(request.getMethodName(),  
  109.                 request.getParameterTypes());  
  110.             response.setResult(method.invoke(service, request.getParameters()));  
  111.   
  112.         } catch (Exception e) {  
  113.             response.setError(e);  
  114.         }  
  115.         return response;  
  116.     }  
  117.   
  118.     /** 
  119.      * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 
  120.      */  
  121.     @Override  
  122.     public void afterPropertiesSet() throws Exception {  
  123.         init();  
  124.         //发布  
  125.         publishedService();  
  126.     }  
  127.   
  128.     /** 
  129.      * @see org.springframework.context.Lifecycle#start() 
  130.      */  
  131.     @Override  
  132.     public void start() {  
  133.     }  
  134.   
  135.     /** 
  136.      * @see org.springframework.context.Lifecycle#stop() 
  137.      */  
  138.     @Override  
  139.     public void stop() {  
  140.   
  141.     }  
  142.   
  143.     /** 
  144.      * @see org.springframework.context.Lifecycle#isRunning() 
  145.      */  
  146.     @Override  
  147.     public boolean isRunning() {  
  148.         return false;  
  149.     }  
  150.   
  151.     /** 
  152.      * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) 
  153.      */  
  154.     @Override  
  155.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {  
  156.         Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);  
  157.         System.out.println(serviceBeanMap);  
  158.         if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {  
  159.             for (Object serviceBean : serviceBeanMap.values()) {  
  160.                 String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf()  
  161.                     .getName();  
  162.                 handlerMap.put(interfaceName, serviceBean);  
  163.             }  
  164.         }  
  165.     }  
  166.   
  167. }  

 

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ 
  5.  */  
  6. public class Client {  
  7.     /** 
  8.      * 引用服务 
  9.      * 
  10.      * @param <T> 接口泛型 
  11.      * @param interfaceClass 接口类型 
  12.      * @param host 服务器主机名 
  13.      * @param port 服务器端口 
  14.      * @return 远程服务 
  15.      * @throws Exception 
  16.      */  
  17.     @SuppressWarnings("unchecked")  
  18.     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port)  
  19.                                                                                                throws Exception {  
  20.         if (interfaceClass == null || !interfaceClass.isInterface()) {  
  21.             throw new IllegalArgumentException("必须指定服务接口");  
  22.         }  
  23.   
  24.         if (host == null || host.length() == 0) {  
  25.             throw new IllegalArgumentException("必须指定服务器的地址和端口号");  
  26.         }  
  27.   
  28.         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),  
  29.             new Class<?>[] { interfaceClass }, new InvocationHandler() {  
  30.                 @Override  
  31.                 public Object invoke(Object proxy, Method method, Object[] arguments)  
  32.                                                                                      throws Throwable {  
  33.                     //创建请求  
  34.                     SrpcRequest request = new SrpcRequest();  
  35.                     request.setRequestId(UUID.randomUUID().toString());  
  36.                     request.setInterfaceName(interfaceClass.getName());  
  37.                     request.setMethodName(method.getName());  
  38.                     request.setParameterTypes(method.getParameterTypes());  
  39.                     request.setParameters(arguments);  
  40.                     SrpcResponse response = sendReqeust(request, host, port);  
  41.                     if (response == null  
  42.                         || !StringUtils.equals(request.getRequestId(), response.getRequestId())) {  
  43.                         return null;  
  44.                     }  
  45.                     if (response.getError() != null) {  
  46.                         throw response.getError();  
  47.                     }  
  48.                     return response.getResult();  
  49.   
  50.                 }  
  51.             });  
  52.   
  53.     }  
  54.   
  55.     public static SrpcResponse sendReqeust(SrpcRequest request, String host, int port)  
  56.                                                                                       throws IOException {  
  57.         SocketChannel socketChannel = connect(host, port);  
  58.         byte[] requestBytes = SerializationUtil.serializer(request);  
  59.         ByteBuffer writeBuffer = ByteBuffer.allocate(requestBytes.length);  
  60.         writeBuffer.put(requestBytes);  
  61.         writeBuffer.flip();  
  62.         socketChannel.write(writeBuffer);  
  63.         return readResoponse(socketChannel);  
  64.     }  
  65.   
  66.     /** 
  67.      * 
  68.      * @return 
  69.      * @throws IOException 
  70.      */  
  71.     private static SrpcResponse readResoponse(SocketChannel socketChannel) throws IOException {  
  72.         try {  
  73.             ByteBuffer readBuffer = ByteBuffer.allocate(1024);  
  74.             while (socketChannel.read(readBuffer) != -1) {  
  75.                 readBuffer.flip();  
  76.                 byte[] bytes = new byte[readBuffer.remaining()];  
  77.                 readBuffer.get(bytes);  
  78.                 return SerializationUtil.deserializer(bytes, SrpcResponse.class);  
  79.             }  
  80.             return null;  
  81.         } finally {  
  82.             socketChannel.close();  
  83.         }  
  84.   
  85.     }  
  86.   
  87.     private static SocketChannel connect(String host, int port) throws IOException {//连接到CSDN  
  88.         InetSocketAddress socketAddress = new InetSocketAddress(host, port);  
  89.         return SocketChannel.open(socketAddress);  
  90.   
  91.     }  
  92. }  

 

 

     在分布式系统中,为了提供系统的可用性和稳定性一般都会将服务部署在多台服务器上,为了实现自动注册自动发现远程服务,通过ZK,和ProtocolBuffe 以及Netty实现一个简单的分布式RPC框架。

   首先简单介绍一下Zookeeper和ProtocalBuffer

   Zookeeper 是由Apache Handoop的子项目发展而来。是知名的互联网公司Yahoo创建的。Zookeeper为分布式应用提供了高效且可靠的分布式协调服务。

  ProtocolBuffer是用于结构化数据串行化的灵活、高效、自动的方法,有如XML,不过它更小、更快、也更简单。你可以定义自己的数据结构,然后使用代码生成器生成的代码来读写这个数据结构。你甚至可以在无需重新部署程序的情况下更新数据结构。

   RPC 就是Remote Procedure Call Protocol 远程过程调用协议。

    JAVA对象要能够在网络上传输都必须序列化,使用高效的序列化框架ProtocolBuffer实现序列化。

Java代码   收藏代码
  1. /** 
  2.  * 序列化工具 
  3.  * @author zhangwei_david 
  4.  * @version $Id: SerializationUtil.java, v 0.1 2014年12月31日 下午5:41:35 zhangwei_david Exp $ 
  5.  */  
  6. public class SerializationUtil {  
  7.   
  8.     private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();  
  9.   
  10.     private static Objenesis                objenesis    = new ObjenesisStd(true);  
  11.   
  12.     private static <T> Schema<T> getSchema(Class<T> clazz) {  
  13.         @SuppressWarnings("unchecked")  
  14.         Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);  
  15.         if (schema == null) {  
  16.             schema = RuntimeSchema.getSchema(clazz);  
  17.             if (schema != null) {  
  18.                 cachedSchema.put(clazz, schema);  
  19.             }  
  20.         }  
  21.         return schema;  
  22.     }  
  23.   
  24.     /** 
  25.      * 序列化 
  26.      * 
  27.      * @param obj 
  28.      * @return 
  29.      */  
  30.     public static <T> byte[] serializer(T obj) {  
  31.         @SuppressWarnings("unchecked")  
  32.         Class<T> clazz = (Class<T>) obj.getClass();  
  33.         LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);  
  34.         try {  
  35.             Schema<T> schema = getSchema(clazz);  
  36.             return ProtostuffIOUtil.toByteArray(obj, schema, buffer);  
  37.         } catch (Exception e) {  
  38.             throw new IllegalStateException(e.getMessage(), e);  
  39.         } finally {  
  40.             buffer.clear();  
  41.         }  
  42.     }  
  43.   
  44.     /** 
  45.      * 反序列化 
  46.      * 
  47.      * @param data 
  48.      * @param clazz 
  49.      * @return 
  50.      */  
  51.     public static <T> T deserializer(byte[] data, Class<T> clazz) {  
  52.         try {  
  53.             T obj = objenesis.newInstance(clazz);  
  54.             Schema<T> schema = getSchema(clazz);  
  55.             ProtostuffIOUtil.mergeFrom(data, obj, schema);  
  56.             return obj;  
  57.         } catch (Exception e) {  
  58.             throw new IllegalStateException(e.getMessage(), e);  
  59.         }  
  60.     }  
  61. }  

 

远程调用的请求对象

Java代码   收藏代码
  1. /** 
  2.  *Rpc 请求的主体 
  3.  * @author zhangwei_david 
  4.  * @version $Id: SrRequest.java, v 0.1 2014年12月31日 下午6:06:25 zhangwei_david Exp $ 
  5.  */  
  6. public class RpcRequest {  
  7.     // 请求Id  
  8.     private String     requestId;  
  9.     // 远程调用类名称  
  10.     private String     className;  
  11.     //远程调用方法名称  
  12.     private String     methodName;  
  13.     // 参数类型  
  14.     private Class<?>[] parameterTypes;  
  15.     // 参数值  
  16.     private Object[]   parameters;  
  17.   
  18.     /** 
  19.      * Getter method for property <tt>requestId</tt>. 
  20.      * 
  21.      * @return property value of requestId 
  22.      */  
  23.     public String getRequestId() {  
  24.         return requestId;  
  25.     }  
  26.   
  27.     /** 
  28.      * Setter method for property <tt>requestId</tt>. 
  29.      * 
  30.      * @param requestId value to be assigned to property requestId 
  31.      */  
  32.     public void setRequestId(String requestId) {  
  33.         this.requestId = requestId;  
  34.     }  
  35.   
  36.     /** 
  37.      * Getter method for property <tt>className</tt>. 
  38.      * 
  39.      * @return property value of className 
  40.      */  
  41.     public String getClassName() {  
  42.         return className;  
  43.     }  
  44.   
  45.     /** 
  46.      * Setter method for property <tt>className</tt>. 
  47.      * 
  48.      * @param className value to be assigned to property className 
  49.      */  
  50.     public void setClassName(String className) {  
  51.         this.className = className;  
  52.     }  
  53.   
  54.     /** 
  55.      * Getter method for property <tt>methodName</tt>. 
  56.      * 
  57.      * @return property value of methodName 
  58.      */  
  59.     public String getMethodName() {  
  60.         return methodName;  
  61.     }  
  62.   
  63.     /** 
  64.      * Setter method for property <tt>methodName</tt>. 
  65.      * 
  66.      * @param methodName value to be assigned to property methodName 
  67.      */  
  68.     public void setMethodName(String methodName) {  
  69.         this.methodName = methodName;  
  70.     }  
  71.   
  72.     /** 
  73.      * Getter method for property <tt>parameterTypes</tt>. 
  74.      * 
  75.      * @return property value of parameterTypes 
  76.      */  
  77.     public Class<?>[] getParameterTypes() {  
  78.         return parameterTypes;  
  79.     }  
  80.   
  81.     /** 
  82.      * Setter method for property <tt>parameterTypes</tt>. 
  83.      * 
  84.      * @param parameterTypes value to be assigned to property parameterTypes 
  85.      */  
  86.     public void setParameterTypes(Class<?>[] parameterTypes) {  
  87.         this.parameterTypes = parameterTypes;  
  88.     }  
  89.   
  90.     /** 
  91.      * Getter method for property <tt>parameters</tt>. 
  92.      * 
  93.      * @return property value of parameters 
  94.      */  
  95.     public Object[] getParameters() {  
  96.         return parameters;  
  97.     }  
  98.   
  99.     /** 
  100.      * Setter method for property <tt>parameters</tt>. 
  101.      * 
  102.      * @param parameters value to be assigned to property parameters 
  103.      */  
  104.     public void setParameters(Object[] parameters) {  
  105.         this.parameters = parameters;  
  106.     }  
  107.   
  108.     /** 
  109.      * @see java.lang.Object#toString() 
  110.      */  
  111.     @Override  
  112.     public String toString() {  
  113.         return "RpcRequest [requestId=" + requestId + ", className=" + className + ", methodName="  
  114.                 + methodName + ", parameterTypes=" + Arrays.toString(parameterTypes)  
  115.                 + ", parameters=" + Arrays.toString(parameters) + "]";  
  116.     }  
  117.   
  118. }  

 远程调用的响应对象

Java代码   收藏代码
  1. /** 
  2.  *Rpc 响应的主体 
  3.  * @author zhangwei_david 
  4.  * @version $Id: SrResponse.java, v 0.1 2014年12月31日 下午6:07:27 zhangwei_david Exp $ 
  5.  */  
  6. public class RpcResponse {  
  7.     // 请求的Id  
  8.     private String    requestId;  
  9.     // 异常  
  10.     private Throwable error;  
  11.     // 响应  
  12.     private Object    result;  
  13.   
  14.     /** 
  15.      * Getter method for property <tt>requestId</tt>. 
  16.      * 
  17.      * @return property value of requestId 
  18.      */  
  19.     public String getRequestId() {  
  20.         return requestId;  
  21.     }  
  22.   
  23.     /** 
  24.      * Setter method for property <tt>requestId</tt>. 
  25.      * 
  26.      * @param requestId value to be assigned to property requestId 
  27.      */  
  28.     public void setRequestId(String requestId) {  
  29.         this.requestId = requestId;  
  30.     }  
  31.   
  32.     /** 
  33.      * Getter method for property <tt>error</tt>. 
  34.      * 
  35.      * @return property value of error 
  36.      */  
  37.     public Throwable getError() {  
  38.         return error;  
  39.     }  
  40.   
  41.     /** 
  42.      * Setter method for property <tt>error</tt>. 
  43.      * 
  44.      * @param error value to be assigned to property error 
  45.      */  
  46.     public void setError(Throwable error) {  
  47.         this.error = error;  
  48.     }  
  49.   
  50.     /** 
  51.      * Getter method for property <tt>result</tt>. 
  52.      * 
  53.      * @return property value of result 
  54.      */  
  55.     public Object getResult() {  
  56.         return result;  
  57.     }  
  58.   
  59.     /** 
  60.      * Setter method for property <tt>result</tt>. 
  61.      * 
  62.      * @param result value to be assigned to property result 
  63.      */  
  64.     public void setResult(Object result) {  
  65.         this.result = result;  
  66.     }  
  67.   
  68.     /** 
  69.      *如果有异常则表示失败 
  70.      * @return 
  71.      */  
  72.     public boolean isError() {  
  73.         return error != null;  
  74.     }  
  75.   
  76.     /** 
  77.      * @see java.lang.Object#toString() 
  78.      */  
  79.     @Override  
  80.     public String toString() {  
  81.         return "RpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result  
  82.                 + "]";  
  83.     }  
  84.   
  85. }  

 RPC编码与解码

Java代码   收藏代码
  1. /** 
  2.  *RPC 解码 
  3.  * @author zhangwei_david 
  4.  * @version $Id: RpcDecoder.java, v 0.1 2014年12月31日 下午8:53:16 zhangwei_david Exp $ 
  5.  */  
  6. public class RpcDecoder extends ByteToMessageDecoder {  
  7.   
  8.     private Class<?> genericClass;  
  9.   
  10.     public RpcDecoder(Class<?> genericClass) {  
  11.         this.genericClass = genericClass;  
  12.     }  
  13.   
  14.     @Override  
  15.     public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  
  16.                                                                                      throws Exception {  
  17.         if (in.readableBytes() < 4) {  
  18.             return;  
  19.         }  
  20.         in.markReaderIndex();  
  21.         int dataLength = in.readInt();  
  22.         if (dataLength < 0) {  
  23.             ctx.close();  
  24.         }  
  25.         if (in.readableBytes() < dataLength) {  
  26.             in.resetReaderIndex();  
  27.         }  
  28.         byte[] data = new byte[dataLength];  
  29.         in.readBytes(data);  
  30.   
  31.         Object obj = SerializationUtil.deserializer(data, genericClass);  
  32.         out.add(obj);  
  33.     }  
  34. }  

 

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: RpcEncoder.java, v 0.1 2014年12月31日 下午8:55:25 zhangwei_david Exp $ 
  5.  */  
  6. @SuppressWarnings("rawtypes")  
  7. public class RpcEncoder extends MessageToByteEncoder {  
  8.   
  9.     private Class<?> genericClass;  
  10.   
  11.     public RpcEncoder(Class<?> genericClass) {  
  12.         this.genericClass = genericClass;  
  13.     }  
  14.   
  15.     @Override  
  16.     public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {  
  17.         if (genericClass.isInstance(in)) {  
  18.             byte[] data = SerializationUtil.serializer(in);  
  19.             out.writeInt(data.length);  
  20.             out.writeBytes(data);  
  21.         }  
  22.     }  
  23. }  

 RPC的请求处理器

Java代码   收藏代码
  1. /** 
  2.  *RPC请求处理器 
  3.  * @author zhangwei_david 
  4.  * @version $Id: RpcHandler.java, v 0.1 2014年12月31日 下午9:04:52 zhangwei_david Exp $ 
  5.  */  
  6. public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {  
  7.   
  8.     private static final Logger       logger = LogManager.getLogger(RpcHandler.class);  
  9.   
  10.     private final Map<String, Object> handlerMap;  
  11.   
  12.     public RpcHandler(Map<String, Object> handlerMap) {  
  13.         this.handlerMap = handlerMap;  
  14.     }  
  15.   
  16.     @Override  
  17.     public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {  
  18.         RpcResponse response = new RpcResponse();  
  19.         // 将请求的Id写入Response  
  20.         response.setRequestId(request.getRequestId());  
  21.         try {  
  22.             LogUtils.info(logger, "处理请求:{0}", request);  
  23.             Object result = handle(request);  
  24.             response.setResult(result);  
  25.         } catch (Throwable t) {  
  26.             response.setError(t);  
  27.         }  
  28.         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  
  29.     }  
  30.   
  31.     /** 
  32.      * 请求的处理主体 
  33.      * 
  34.      * @param request 
  35.      * @return 
  36.      * @throws Throwable 
  37.      */  
  38.     private Object handle(RpcRequest request) throws Throwable {  
  39.         String className = request.getClassName();  
  40.         Object serviceBean = handlerMap.get(className);  
  41.   
  42.         Class<?> serviceClass = serviceBean.getClass();  
  43.         String methodName = request.getMethodName();  
  44.         Class<?>[] parameterTypes = request.getParameterTypes();  
  45.         Object[] parameters = request.getParameters();  
  46.   
  47.         FastClass serviceFastClass = FastClass.create(serviceClass);  
  48.         FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);  
  49.         return serviceFastMethod.invoke(serviceBean, parameters);  
  50.     }  
  51.   
  52.     @Override  
  53.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
  54.         ctx.close();  
  55.     }  
  56. }  

 为了方便实现服务的注册,定义一个注解

Java代码   收藏代码
  1. /** 
  2.  * 简单的RPC协议的方法的注解 
  3.  * @author zhangwei_david 
  4.  * @version $Id: STRService.java, v 0.1 2014年12月31日 下午4:33:14 zhangwei_david Exp $ 
  5.  */  
  6. @Target({ ElementType.TYPE })  
  7. @Retention(RetentionPolicy.RUNTIME)  
  8. @Component  
  9. public @interface RpcService {  
  10.   
  11.     String value() default "";  
  12.   
  13.     Class<?> inf();  
  14. }  

 

将远程服务注册到ZK

Java代码   收藏代码
  1. /** 
  2.  * 简单RPC服务注册 
  3.  * <ul> 
  4.  * 注册方法是register(),该方法的主要功能如下: 
  5.  * <li> 对目标服务器创建一个ZooKeeper实例</li> 
  6.  * <li> 如果可以成功创建ZooKeeper实例,则创建一个节点</li> 
  7.  * </ul> 
  8.  * @author zhangwei_david 
  9.  * @version $Id: ServiceRegistry.java, v 0.1 2014年12月31日 下午6:08:47 zhangwei_david Exp $ 
  10.  */  
  11. public class ServiceRegistry {  
  12.     // 日期记录器  
  13.     private static final Logger logger       = LogManager.getLogger(ServiceRegistry.class);  
  14.   
  15.     // 使用计数器实现同步  
  16.     private CountDownLatch      latch        = new CountDownLatch(1);  
  17.   
  18.     private int                 timeout      = Constant.DEFAULT_ZK_SESSION_TIMEOUT;  
  19.   
  20.     private String              registerPath = Constant.DEFAULT_ZK_REGISTRY_PATH;  
  21.   
  22.     private String              registerAddress;  
  23.   
  24.     public void register(String data) {  
  25.         LogUtils.debug(logger, "注册服务{0}", data);  
  26.         if (data != null) {  
  27.             ZooKeeper zk = connectServer();  
  28.             if (zk != null) {  
  29.                 // 创建节点  
  30.                 createNode(zk, data);  
  31.             }  
  32.         }  
  33.     }  
  34.   
  35.     /** 
  36.      * 
  37.      *创建zooKeeper 
  38.      * @return 
  39.      */  
  40.     private ZooKeeper connectServer() {  
  41.         ZooKeeper zk = null;  
  42.         try {  
  43.             LogUtils.info(logger, "创建zk,参数是:address:{0},timeout:{1}", registerAddress, timeout);  
  44.             // 创建一个zooKeeper实例,第一个参数是目标服务器地址和端口,第二个参数是session 超时时间,第三个参数是节点发生变化时的回调方法  
  45.             zk = new ZooKeeper(registerAddress, timeout, new Watcher() {  
  46.   
  47.                 public void process(WatchedEvent event) {  
  48.                     if (event.getState() == Event.KeeperState.SyncConnected) {  
  49.                         // 计数器减一  
  50.                         latch.countDown();  
  51.                     }  
  52.                 }  
  53.             });  
  54.             // 阻塞到计数器为0,直到节点的变化回调方法执行完成  
  55.             latch.await();  
  56.   
  57.         } catch (Exception e) {  
  58.             LogUtils.error(logger, "connectServer exception", e);  
  59.         }  
  60.         // 返回ZooKeeper实例  
  61.         return zk;  
  62.     }  
  63.   
  64.     /** 
  65.      * 
  66.      * 
  67.      * @param zk ZooKeeper的实例 
  68.      * @param data 注册数据 
  69.      */  
  70.     private void createNode(ZooKeeper zk, String data) {  
  71.         try {  
  72.             byte[] bytes = data.getBytes();  
  73.             /** 
  74.              * 创建一个节点,第一个参数是该节点的路径,第二个参数是该节点的初始化数据,第三个参数是该节点的ACL,第四个参数指定节点的创建策略 
  75.              */  
  76.             String createResult = zk.create(registerPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,  
  77.                 CreateMode.EPHEMERAL_SEQUENTIAL);  
  78.             LogUtils.info(logger, "创建的结果是:{0}", createResult);  
  79.         } catch (Exception e) {  
  80.             LogUtils.error(logger, "createNode exception", e);  
  81.         }  
  82.     }  
  83.   
  84.     /** 
  85.      * Getter method for property <tt>timeout</tt>. 
  86.      * 
  87.      * @return property value of timeout 
  88.      */  
  89.     public int getTimeout() {  
  90.         return timeout;  
  91.     }  
  92.   
  93.     /** 
  94.      * Setter method for property <tt>timeout</tt>. 
  95.      * 
  96.      * @param timeout value to be assigned to property timeout 
  97.      */  
  98.     public void setTimeout(int timeout) {  
  99.         this.timeout = timeout;  
  100.     }  
  101.   
  102.     /** 
  103.      * Getter method for property <tt>registerPath</tt>. 
  104.      * 
  105.      * @return property value of registerPath 
  106.      */  
  107.     public String getRegisterPath() {  
  108.         return registerPath;  
  109.     }  
  110.   
  111.     /** 
  112.      * Setter method for property <tt>registerPath</tt>. 
  113.      * 
  114.      * @param registerPath value to be assigned to property registerPath 
  115.      */  
  116.     public void setRegisterPath(String registerPath) {  
  117.         this.registerPath = registerPath;  
  118.     }  
  119.   
  120.     /** 
  121.      * Getter method for property <tt>registerAddress</tt>. 
  122.      * 
  123.      * @return property value of registerAddress 
  124.      */  
  125.     public String getRegisterAddress() {  
  126.         return registerAddress;  
  127.     }  
  128.   
  129.     /** 
  130.      * Setter method for property <tt>registerAddress</tt>. 
  131.      * 
  132.      * @param registerAddress value to be assigned to property registerAddress 
  133.      */  
  134.     public void setRegisterAddress(String registerAddress) {  
  135.         this.registerAddress = registerAddress;  
  136.     }  
  137.   
  138. }  

 至此在服务启动时就可以方便地注册到ZK

RPC调用客户端

Java代码   收藏代码
  1. /** 
  2.  *RPC客户端 
  3.  * @author zhangwei_david 
  4.  * @version $Id: RpcClient.java, v 0.1 2014年12月31日 下午9:18:34 zhangwei_david Exp $ 
  5.  */  
  6. public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {  
  7.   
  8.     private String       host;  
  9.     private int          port;  
  10.   
  11.     private RpcResponse  response;  
  12.   
  13.     private final Object obj = new Object();  
  14.   
  15.     public RpcClient(String host, int port) {  
  16.         this.host = host;  
  17.         this.port = port;  
  18.     }  
  19.   
  20.     @Override  
  21.     public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {  
  22.         this.response = response;  
  23.   
  24.         synchronized (obj) {  
  25.             obj.notifyAll(); // 收到响应,唤醒线程  
  26.         }  
  27.     }  
  28.   
  29.     @Override  
  30.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
  31.         ctx.close();  
  32.     }  
  33.   
  34.     public RpcResponse send(RpcRequest request) throws Exception {  
  35.         EventLoopGroup group = new NioEventLoopGroup();  
  36.         try {  
  37.             Bootstrap bootstrap = new Bootstrap();  
  38.             bootstrap.group(group).channel(NioSocketChannel.class)  
  39.                 .handler(new ChannelInitializer<SocketChannel>() {  
  40.                     @Override  
  41.                     public void initChannel(SocketChannel channel) throws Exception {  
  42.                         channel.pipeline().addLast(new RpcEncoder(RpcRequest.class)) // 将 RPC 请求进行编码(为了发送请求)  
  43.                             .addLast(new RpcDecoder(RpcResponse.class)) // 将 RPC 响应进行解码(为了处理响应)  
  44.                             .addLast(RpcClient.this); // 使用 RpcClient 发送 RPC 请求  
  45.                     }  
  46.                 }).option(ChannelOption.SO_KEEPALIVE, true);  
  47.   
  48.             ChannelFuture future = bootstrap.connect(host, port).sync();  
  49.             future.channel().writeAndFlush(request).sync();  
  50.   
  51.             synchronized (obj) {  
  52.                 obj.wait(); // 未收到响应,使线程等待  
  53.             }  
  54.   
  55.             if (response != null) {  
  56.                 future.channel().closeFuture().sync();  
  57.             }  
  58.             return response;  
  59.         } finally {  
  60.             group.shutdownGracefully();  
  61.         }  
  62.     }  
  63. }  

 RPC服务发现:

Java代码   收藏代码
  1. /** 
  2.  *Rpc 服务发现 
  3.  * @author zhangwei_david 
  4.  * @version $Id: ServiceDiscovery.java, v 0.1 2014年12月31日 下午9:10:23 zhangwei_david Exp $ 
  5.  */  
  6. public class ServiceDiscovery {  
  7.   
  8.     // 日志  
  9.     private static final Logger   logger   = LogManager.getLogger(ServiceDiscovery.class);  
  10.   
  11.     private CountDownLatch        latch    = new CountDownLatch(1);  
  12.   
  13.     private volatile List<String> dataList = new ArrayList<String>();  
  14.   
  15.     private String                registryAddress;  
  16.   
  17.     public void init() {  
  18.         LogUtils.debug(logger, "Rpc 服务发现初始化...");  
  19.         ZooKeeper zk = connectServer();  
  20.         if (zk != null) {  
  21.             watchNode(zk);  
  22.         }  
  23.     }  
  24.   
  25.     public String discover() {  
  26.         String data = null;  
  27.         int size = dataList.size();  
  28.         if (size > 0) {  
  29.             if (size == 1) {  
  30.                 data = dataList.get(0);  
  31.   
  32.             } else {  
  33.                 data = dataList.get(ThreadLocalRandom.current().nextInt(size));  
  34.   
  35.             }  
  36.         }  
  37.         return data;  
  38.     }  
  39.   
  40.     private ZooKeeper connectServer() {  
  41.         ZooKeeper zk = null;  
  42.         try {  
  43.             zk = new ZooKeeper(registryAddress, Constant.DEFAULT_ZK_SESSION_TIMEOUT, new Watcher() {  
  44.                 public void process(WatchedEvent event) {  
  45.                     if (event.getState() == Event.KeeperState.SyncConnected) {  
  46.                         latch.countDown();  
  47.                     }  
  48.                 }  
  49.             });  
  50.             latch.await();  
  51.         } catch (Exception e) {  
  52.         }  
  53.         LogUtils.debug(logger, "zk 是{0}", zk);  
  54.         return zk;  
  55.     }  
  56.   
  57.     private void watchNode(final ZooKeeper zk) {  
  58.         try {  
  59.             List<String> nodeList = zk.getChildren(Constant.ROOT, new Watcher() {  
  60.                 public void process(WatchedEvent event) {  
  61.                     if (event.getType() == Event.EventType.NodeChildrenChanged) {  
  62.                         watchNode(zk);  
  63.                     }  
  64.                 }  
  65.             });  
  66.             LogUtils.debug(logger, "zk 节点有  {0}", nodeList);  
  67.             List<String> dataList = new ArrayList<String>();  
  68.             for (String node : nodeList) {  
  69.                 byte[] bytes = zk.getData(Constant.ROOT + node, falsenull);  
  70.                 dataList.add(new String(bytes));  
  71.             }  
  72.             this.dataList = dataList;  
  73.             if (dataList.isEmpty()) {  
  74.                 throw new RuntimeException("尚未注册任何服务");  
  75.             }  
  76.         } catch (Exception e) {  
  77.             LogUtils.error(logger, "发现节点异常", e);  
  78.         }  
  79.     }  
  80.   
  81.     /** 
  82.      * Setter method for property <tt>registryAddress</tt>. 
  83.      * 
  84.      * @param registryAddress value to be assigned to property registryAddress 
  85.      */  
  86.     public void setRegistryAddress(String registryAddress) {  
  87.         this.registryAddress = registryAddress;  
  88.     }  
  89.   
  90. }  

 

测试:

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: HelloService.java, v 0.1 2014年12月31日 下午9:27:28 zhangwei_david Exp $ 
  5.  */  
  6. public interface HelloService {  
  7.   
  8.     String hello();  
  9. }  

 

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: HelloServiceImpl.java, v 0.1 2014年12月31日 下午9:28:02 zhangwei_david Exp $ 
  5.  */  
  6. @RpcService(value = "helloService", inf = HelloService.class)  
  7. public class HelloServiceImpl implements HelloService {  
  8.   
  9.     public String hello() {  
  10.         return "Hello! ";  
  11.     }  
  12. }  

 服务端配置:

Java代码   收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.     xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"  
  5.     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee"  
  6.     xmlns:task="http://www.springframework.org/schema/task"  
  7.     xsi:schemaLocation="  
  8.         http://www.springframework.org/schema/beans  
  9.         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  10.         http://www.springframework.org/schema/context  
  11.         http://www.springframework.org/schema/context/spring-context-3.0.xsd  
  12.         http://www.springframework.org/schema/aop   
  13.         http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
  14.         http://www.springframework.org/schema/tx  
  15.         http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
  16.         http://www.springframework.org/schema/jee   
  17.         http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  
  18.         http://www.springframework.org/schema/task    
  19.         http://www.springframework.org/schema/task/spring-task-3.1.xsd    
  20.         ">  
  21.     <context:component-scan base-package="com.david.common.test"/>  
  22.    
  23.    
  24.     <!-- 配置服务注册组件 -->  
  25.     <bean id="serviceRegistry" class="com.david.common.rpc.registry.ServiceRegistry">  
  26.         <property name="registerAddress" value="127.0.0.1:2181"/>  
  27.     </bean>  
  28.    
  29.     <!-- 配置 RPC 服务器 -->  
  30.     <bean id="rpcServer" class="com.david.common.rpc.server.RpcServer">  
  31.         <property name="serverAddress" value="127.0.0.1:8000"/>  
  32.         <property name="serviceRegistry" ref="serviceRegistry"/>  
  33.     </bean>  
  34.       
  35.      
  36. </beans>  

 客户端配置:

Java代码   收藏代码
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.     xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"  
  5.     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee"  
  6.     xmlns:task="http://www.springframework.org/schema/task"  
  7.     xsi:schemaLocation="  
  8.         http://www.springframework.org/schema/beans  
  9.         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  10.         http://www.springframework.org/schema/context  
  11.         http://www.springframework.org/schema/context/spring-context-3.0.xsd  
  12.         http://www.springframework.org/schema/aop   
  13.         http://www.springframework.org/schema/aop/spring-aop-3.0.xsd  
  14.         http://www.springframework.org/schema/tx  
  15.         http://www.springframework.org/schema/tx/spring-tx-3.0.xsd  
  16.         http://www.springframework.org/schema/jee   
  17.         http://www.springframework.org/schema/jee/spring-jee-3.0.xsd  
  18.         http://www.springframework.org/schema/task    
  19.         http://www.springframework.org/schema/task/spring-task-3.1.xsd    
  20.         ">  
  21.     <context:component-scan base-package="com.david.common.*"/>  
  22.    
  23.   
  24.     <bean id="serviceDiscovery" class="com.david.common.rpc.discovery.ServiceDiscovery" init-method="init">  
  25.        <property name="registryAddress" value="127.0.0.1:2181"/>  
  26.     </bean>  
  27.    
  28.     <!-- 配置 RPC 代理 -->  
  29.     <bean id="rpcProxy" class="com.david.common.rpc.proxy.RpcProxyFactory">  
  30.         <property name="serviceDiscovery" ref="serviceDiscovery"/>  
  31.     </bean>  
  32. </beans>  

 服务端:

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: Server.java, v 0.1 2014年12月31日 下午9:56:37 zhangwei_david Exp $ 
  5.  */  
  6. @RunWith(SpringJUnit4ClassRunner.class)  
  7. @ContextConfiguration(locations = "classpath:spring.xml")  
  8. public class Server {  
  9.     @Test  
  10.     public void helloTest() throws InterruptedException {  
  11.         System.out.println("启动");  
  12.         TimeUnit.HOURS.sleep(1);  
  13.     }  
  14. }  

 客户端:

Java代码   收藏代码
  1. /** 
  2.  * 
  3.  * @author zhangwei_david 
  4.  * @version $Id: MyTest.java, v 0.1 2014年12月31日 下午9:25:49 zhangwei_david Exp $ 
  5.  */  
  6. @RunWith(SpringJUnit4ClassRunner.class)  
  7. @ContextConfiguration(locations = "classpath:client.xml")  
  8. public class HelloServiceTest {  
  9.   
  10.     @Autowired  
  11.     private RpcProxyFactory rpcProxy;  
  12.   
  13.     @Test  
  14.     public void helloTest() {  
  15.         HelloService helloService = rpcProxy.create(HelloService.class);  
  16.         String result = helloService.hello();  
  17.         Assert.assertEquals("Hello! ", result);  
  18.     }  
  19. }  

 

  • cathy-common-util.zip (352.7 KB)
  • 下载次数: 99
  • pom.zip (1.6 KB)
  • 下载次数: 64

最后

以上就是大力星星为你收集整理的Zookeeper实现简单的分布式RPC框架的全部内容,希望文章能够帮你解决Zookeeper实现简单的分布式RPC框架所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部