我是靠谱客的博主 细腻未来,这篇文章主要介绍一个简单的rpc框架实现(一),现在分享给大家,希望可以做个参考。

系统长大后为了保证可维护性,拆分系统是一个比较常见的解决方案。系统拆分后,原来的接口直接调用方法不再可行,需要被替换成为远程调用过程。远程调用可以直接使用http协议post 一个请求到服务提供端,然后服务提供端返回一个结果给调用者。这种方案将原本数据service层的DO操作过程上升成为了web服务,我个人并不反感。第二种方案就是使用rmi 实现,但是rmi client 和server的地址耦合到一起,一旦server更换地址client端需要同步修改。 第三种方案是直接读对方的数据库,当然便利性和可维护性更差,需要把server 端的底层DAO业务冗余到client。 最后一种就是rpc 远程调用框架。

rpc框架需要很多组件:调用者,提供者,服务注册中心, 通信总线 和通信协议。 其中每一个组件都有很多技术点要谈。本文在这里只讨论一个最简单的原型:简单分隔符的通信协议,使用socket 实现通信总线。 CS直接耦合绑定在一起,后期再考虑注册中心和服务暴漏的问题。 Socket通信方式也采用最简单的直连接方式,不使用nio, 也不维护连接池。


首先先看一个简单场景

打印接口:

复制代码
1
2
3
4
public interface PrintText { String print(String text); }

接口实现:

复制代码
1
2
3
4
5
6
7
public class SystemPrint implements PrintText { public String print(String text) { System.out.println(text); return "系统已打印:" + text; } }

业务调用方:

复制代码
1
2
3
4
5
6
7
8
9
public class SpringClient { public static void main(String[] args){ BeanFactory apx = new ClassPathXmlApplicationContext("classpath:spring-config.xml"); PrintText pt = (PrintText) apx.getBean("printText"); System.out.println(pt.print("springClient")); } }

其中图1能够很好的描述这个场景, 虚线A表示业务方并不知道实现者是谁,只知道接口的存在。

简单场景
图1

现在业务系统升级了,负责打印的bean复杂到独立成了一个系统,要从业务系统中剥离出去,该怎么办?我们选择使用socket实现远程调用。 由于控制反转的设计,业务方几乎不需要改动任何代码就能实现升级。远程调用方案如图2所示:

这里写图片描述
图2

接口PrintText的实现被代理成为socket client, 将调用请求通过socket发送出去。 服务提供者接受到请求后,解析完毕,调用具体的实现类SystemPrintText, 然后将返回值发送回socket client。 最后返回给业务方。


下面详细讲一下各个组件的实现方案:

1.通讯协议:
使用|##|隔开字段(并不推荐这种方式,后面系列进一步讲协议的设计)

复制代码
1
2
3
4
5
协议设计: version:版本号|##|cypher:加密串|##| interfaceName:接口bean名称|##| interface:接口全路径|##|method:调用方法|##|params:参数1的类+参数1对象,单数2类+参数2对象, Demo: `version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:com.tmall.beans.PrintText|##|method:print|##|params:java.lang.String+演示远程调用`

2. Client客户端: 
动态代理,截获业务方的接口调用后将调用参数组装成为上述协议,发送给服务端,并将服务端的返回结果返回给业务方。代码实现如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class SocketConsumerProxy implements InvocationHandler { private Object target; private RemoteDataSource dataSource; public SocketConsumerProxy(RemoteDataSource dataSource){ this.dataSource = dataSource; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { StringBuffer stream = this.buildRpcRequest(proxy, method, args); Object object = null; try{ long startTime = System.currentTimeMillis(); System.out.println("New rpc client send " + stream.toString() + " time:" + startTime); // socket connect Socket socket=new Socket(dataSource.getIp(), dataSource.getPort()); // request PrintWriter os = new PrintWriter(socket.getOutputStream()); os.println(stream.toString()); os.flush(); // read response BufferedReader br = new BufferedReader( new InputStreamReader(socket.getInputStream())); object = br.readLine(); long endTime = System.currentTimeMillis(); System.out.println("client read from service:" + object + " time:" + (endTime - startTime)); }catch (IOException e){ e.printStackTrace(); object = e.toString(); } return object; } private StringBuffer buildRpcRequest(Object proxy, Method method, Object[] args) { StringBuffer buffer = new StringBuffer(); buffer.append(String.format("version:%s|##|cypher:%s|##|interfaceName:%s|##|interface:%s|##|method:%s|##|params:" , dataSource.getVersion(), dataSource.getCypher(), dataSource.getInterfaceName(), dataSource.getInterfaces(), method.getName())); for (Object obj : args){ buffer.append(obj.getClass().getName() + "+" + obj.toString() + ","); } return buffer; } }

对于动态代理InvocationHander的介绍这里不再论述。

3. 服务端实现: 
服务端监听端口,获取客户端访问。 校验数据有效性:是否为空,加密参数,版本号,调用接口,调用方法。 然后通过spring容器找到最终的实现bean,通过反射的方式调用对应方法。 最后将返回值发送给客户端。代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
public class SocketProvider{ private RemoteDataSource dataSource; public SocketProvider(RemoteDataSource dataSource) { this.dataSource = dataSource; } public Object provide() throws Throwable { ServerSocket serverSocket = new ServerSocket(dataSource.getPort()); while (true) { Socket socket = null; try { //接收客户连接,只要客户进行了连接,就会触发accept();从而建立连接 socket = serverSocket.accept(); this.getRpcRequest(socket); } catch (Exception e) { e.printStackTrace(); } } } private void getRpcRequest(Socket socket) { try { System.out.println("rpc client accepted " + socket.getInetAddress() + ":" + socket.getPort() + " time:" + System.currentTimeMillis()); // 接收服务器的反馈 BufferedReader br = new BufferedReader( new InputStreamReader(socket.getInputStream())); String msg = br.readLine(); System.out.println("读到远程调用请求:" + msg); Object obj = this.parseRpcClientRequest(msg); // 接收服务器的反馈 PrintWriter os = new PrintWriter(socket.getOutputStream()); os.println(obj.toString()); os.flush(); System.out.println("rpc server return " + obj + ":" + socket.getPort() + " time:" + System.currentTimeMillis()); } catch (IOException e) { e.printStackTrace(); } } public Object parseRpcClientRequest(String msg) { Object result = ""; if (StringUtils.isEmpty(msg)){ return result; } String[] infos = msg.split("\|##\|"); Map<String, String> infoMap = new HashMap<String, String>(); for(String info : infos){ String[] pair = info.split(":"); infoMap.put(pair[0], pair[1]); } if (infoMap.isEmpty()){ return "无调用参数"; } if (!dataSource.getCypher().equals(infoMap.get("cypher"))){ return "加密串不对"; } if (!dataSource.getVersion().equals(infoMap.get("version"))){ return "服务版本号不对"; } String interfaces = infoMap.get("interface"); String interfaceName = infoMap.get("interfaceName"); String methodName = infoMap.get("method"); String params = infoMap.get("params"); if(StringUtils.isEmpty(interfaces) || StringUtils.isEmpty(methodName) || StringUtils.isEmpty(interfaceName)){ return "无调用接口或者方法 或者接口beanName"; } // bean Object obj = RemoteDataSource.beanFactory.getBean(interfaceName); if(obj == null){ return "未找到对应的服务"; } // bean 和 interface对应关系 boolean isInterfaceRight = false; Class<?>[] clazzArray = obj.getClass().getInterfaces(); for (Class<?> clazz : clazzArray){ if (clazz.getName().equals(interfaces)){ isInterfaceRight = true; break; } } if (isInterfaceRight == false){ return "错误的bean Name 和 interface对应关系"; } // 参数对应的类和对象 String[] paramsArray = params.split(","); Class<?>[] paramsClazzArray = new Class<?>[paramsArray.length]; Object[] paramObjArray = new Object[paramsArray.length]; try{ for (int i = 0; i < paramsArray.length; i ++){ String paramInfo = paramsArray[i]; if (StringUtils.isEmpty(paramInfo)) { // 过滤掉多余的逗号 continue; } String[] paramsInfos = paramInfo.split("\+"); Class c = Class.forName(paramsInfos[0]); paramsClazzArray[i] = c; paramObjArray[i] = paramsInfos[1]; } }catch (ClassNotFoundException e){ e.printStackTrace(); return "未找到参数对应的类名:" +e.toString(); } try { Method method = obj.getClass().getMethod(methodName, paramsClazzArray); result = method.invoke(obj, paramObjArray); } catch (NoSuchMethodException e) { e.printStackTrace(); return "未找到参数对应的方法名称:" +e.toString(); } catch (IllegalAccessException e) { e.printStackTrace(); return "无效的调用:" +e.toString(); } catch (InvocationTargetException e) { e.printStackTrace(); return "无效的目标地址:" +e.toString(); } result = "我是服务器代理aop, result=" + result; return result; } }

4.中转bean:
通过spring的FactoryBean实现,通过getObject方法能够中转任何接口的调用请求。具体细节请参考spring AOP和动态代理实现方案。Demo版本就非常简单,就是将业务的请求发送给2中的动态代理。 代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CaishengRemoteSimpleConsumerBean implements FactoryBean, InitializingBean{ private RemoteDataSource remoteDataSource = new RemoteDataSource(); // 远程数据源 public void afterPropertiesSet() throws Exception { remoteDataSource.init(); } public Object getObject() throws Exception { return remoteDataSource.getClientObject(); } public Class<?> getObjectType() { return remoteDataSource.getClientObject().getClass(); } ……//一些不重要的get,set操作 }

5.RemoteDataSource:
远程数据源,耦合了客户端和服务端的ip地址、加密信息以及版本号。代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RemoteDataSource { private String ip = "127.0.0.1"; private int port = 9090; // 端口 private String interfaces; // 接口全路径 private String interfaceName; // 接口名 private String version; private String cypher ="default"; // 密码 private Class<?> interfaceClass; // 类文件 private SocketRemotionFactory remotionFactory; public void init(){ remotionFactory = new SocketRemotionFactory(this); try { interfaceClass = Class.forName(interfaces); } catch (ClassNotFoundException e) { } } public Object getClientObject(){ return remotionFactory.getRemoteClientProxy(); } ……//一些不重要的get,set操作 }

6.remotionFactory: 
一个简单工厂,生成以PrintText为接口,2中的SocketConsumerProxy的为实现的代理对象给业务方。具体代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class SocketRemotionFactory { private RemoteDataSource dataSource; public SocketRemotionFactory(RemoteDataSource dataSource) { this.dataSource = dataSource; } public Object getRemoteClientProxy(){ Object result = null; Class<?> clazz = dataSource.getInterfaceClass(); if (clazz == null){ return "错误的client 代理,无对应的class"; } Class<?>[] clazzArray = new Class[1]; clazzArray[0] = clazz; try{ result = Proxy.newProxyInstance(clazz.getClassLoader(), clazzArray, new SocketConsumerProxy(dataSource)); }catch (Exception e){ e.printStackTrace(); } return result; } }

7.服务端: 
需要在spring容器加载bean时提供服务,代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class CaishengRemoteSimpleProviderBean implements FactoryBean, InitializingBean{ private RemoteDataSource remoteDataSource = new RemoteDataSource(); // 远程数据源 private SocketProvider provider; public void init() throws Exception { this.afterPropertiesSet(); } public void afterPropertiesSet() throws Exception { remoteDataSource.init(); provider = new SocketProvider(remoteDataSource); } public Object getObject() throws Exception { try { provider.provide(); } catch (Throwable throwable) { throwable.printStackTrace(); } return null; } ……//一些不重要的get,set操作 }

8.Spring中bean配置:

客户端:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
<bean id="caishengSimpleRemoteProxyPrintText" class="com.tmall.proxy.remotesimple.CaishengRemoteSimpleConsumerBean"> <property name="interfaces"> <value>com.tmall.beans.PrintText</value> </property> <property name="interfaceName"> <value>proxyPrintText</value> </property> <property name="version"> <value>1.0.0</value> </property> </bean>

服务端:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
<bean id="printText" class="com.tmall.beans.impl.SystemPrint"></bean> <bean id="caishengSimpleRemoteProvider" class="com.tmall.proxy.remotesimple.CaishengRemoteSimpleProviderBean" init-method="init"> <property name="interfaces"> <value>com.tmall.beans.PrintText</value> </property> <property name="interfaceName"> <value>proxyPrintText</value> </property> <property name="version"> <value>1.0.0</value> </property> </bean>

9.业务方代码:

复制代码
1
2
3
4
5
6
7
8
public class Client { public static void main(String[] args){ BeanFactory apx = new ClassPathXmlApplicationContext("classpath:spring-config.xml"); PrintText pt = (PrintText) apx.getBean("caishengSimpleRemoteProxyPrintText"); System.out.println(pt.print("远程调用演示")); } }

一个简单的rpc框架就实现完毕

服务方未启动:socket 调用异常

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:com.tmall.beans.PrintText|##|method:print|##|params:java.lang.String+远程调用演示, time:1464108758488 java.net.ConnectException: Connection refused: connect java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.connect0(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:69) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:157) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391) at java.net.Socket.connect(Socket.java:579) at java.net.Socket.connect(Socket.java:528) at java.net.Socket.<init>(Socket.java:425) at java.net.Socket.<init>(Socket.java:208) at com.tmall.proxy.remotesimple.socket.SocketConsumerProxy.invoke(SocketConsumerProxy.java:33) at com.sun.proxy.$Proxy0.print(Unknown Source) at com.tmall.client.simpleRemote.Client.main(Client.java:16) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Process finished with exit code 0

服务方启动:

复制代码
1
2
3
4
5
rpc client accepted /127.0.0.1:54730 time:1464108803359 读到远程调用请求:version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:com.tmall.beans.PrintText|##|method:print|##|params:java.lang.String+远程调用演示, 远程调用演示 rpc server return 我是服务器代理aop, result=系统已打印:远程调用演示:54730 time:1464108803380

服务方日志:

复制代码
1
2
3
4
5
New rpc client send version:1.0.0|##|cypher:default|##|interfaceName:proxyPrintText|##|interface:com.tmall.beans.Print Text|##|method:print|##|params:java.lang.String+远程调用演示, time:1464108803353 client read from service:我是服务器代理aop, result=系统已打印:远程调用演示 time:27 我是服务器代理aop, result=系统已打印:远程调用演示 Process finished with exit code 0

客户端日志:用时27ms, 很多地方需要优化。

最后

以上就是细腻未来最近收集整理的关于一个简单的rpc框架实现(一)的全部内容,更多相关一个简单内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(75)

评论列表共有 0 条评论

立即
投稿
返回
顶部