我是靠谱客的博主 花痴鲜花,这篇文章主要介绍分布式-通信框架什么是RPC来个手写RPC框架show一波吧,现在分享给大家,希望可以做个参考。

目录

 

什么是RPC

了解Java RMI

Java RMI代码实践

JavaRMI源码分析

 远程对象发布-类图

远程引用层-类图

源码解读

发布远程对象

来个手写RPC框架show一波吧


什么是RPC

RPC(Remote Procedure Call,远程过程调用),一般用来实现部署在不同机器上的系统之间的方法调用,使得程序能够像访问本地系统资源一样,通过网络传输去访问远端系统资源;对于客户端来说,传输层用什么协议,序列化、反序列化都是透明的。

了解Java RMI

RMI全称是Remote Method Invovation远程方法调用,一种用于远程过程调用的应用程序编程接口,是纯Java的网络分布式应用系统的核心解决方案之一。

RMI目前使用Java远程消息交换协议JRMP(Java Remote Messageing Protocol)进行通信,由于JRMP是专为Java对象制定的,是分布式应用系统的百分之百纯Java解决方案,用Java RMI开发的应用系统可以部署在任何支持JRE的平台上,缺点是,由于JRMP是专门为Java对象指定的,因此RMI对于非Java语言开发的应用系统的支持不足,不能与非Java语言书写的对象进行通信

Java RMI代码实践

远程对象必须实现UnicastRemoteObject,这样才能保证客户端访问获得远程对象时,该远程对象会把自身的一个拷贝以Socket形式传输给客户端,客户端获得的拷贝称为“stub”,而服务器端本身已经存在的远程对象称为“skeleton”,此时客户端的stub是客户端的一个代理,用于与服务器端进行通信,而skeleton是服务器端的一个代理,用于接收客户端的请求之后调用远程方法来相应客户端的请求

JavaRMI源码分析

这里以RMI的一个简单实现Demo,来进入RMI框架源码的神奇世界吧。

复制代码
1
2
3
4
5
6
7
8
/** * @author King Chen * @Date: 2019/3/20 20:30 */ public interface IHelloWorldService extends Remote { String sayHello(String name) throws RemoteException; }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** * @author King Chen * @Date: 2019/3/20 20:31 */ public class HelloWorldServiceImpl extends UnicastRemoteObject implements IHelloWorldService { protected HelloWorldServiceImpl() throws RemoteException { super(); } @Override public String sayHello(String name) throws RemoteException { return "Hello " + name; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** * @author King Chen * @Date: 2019/3/20 20:34 */ public class Server { public static void main(String[] args) { try { IHelloWorldService service = new HelloWorldServiceImpl(); LocateRegistry.createRegistry(1099); Naming.rebind("helloWorld", service); } catch (RemoteException e) { e.printStackTrace(); } catch (MalformedURLException e) { e.printStackTrace(); } } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** * @author King Chen * @Date: 2019/3/20 20:39 */ public class Client { public static void main(String[] args) { try { IHelloWorldService service = (IHelloWorldService) Naming.lookup("helloWorld"); System.out.println(service.sayHello("King")); } catch (NotBoundException e) { e.printStackTrace(); } catch (MalformedURLException e) { e.printStackTrace(); } catch (RemoteException e) { e.printStackTrace(); } } }

 远程对象发布-类图

远程引用层-类图

源码解读

发布远程对象

发布远程对象,看到上面的类图可以知道,这个地方会发布两个远程对象,一个是RegistryImpl、另外一个是我们自己写的RMI实现类对象;

从HelloServiceImpl的构造函数看起。调用了父类UnicastRemoteObject的构造方法,追溯到UnicastRemoteObject的私有方法exportObject()。这里做了一个判断,判断服务的实现是不是UnicastRemoteObject的子类,如果是,则直接赋值其ref(RemoteRef)对象为传入的UnicastServerRef对象。反之则调用UnicastRef的exportObject()方法。

因为HelloServiceImpl继承了UnicastRemoteObject,所以在服务启动的时候,会通过UnicastRemoteObject的构造方法把该对象进行发布

复制代码
1
2
3
4
5
protected UnicastRemoteObject(int port) throws RemoteException { this.port = port; exportObject((Remote) this, port); }

服务端启动Registry服务

复制代码
1
LocateRegistry.createRegistry(1099);

从上面这段代码入手,采用神奇的Debug大法,开始往下看。可以发现服务端创建了一个RegistryImpl对象,这里做了一个判断,如果服务端制定的端口是1099并且系统开启了安全管理器,那么就可以在限定的权限集内绕过系统的安全校验。这里纯粹是为了提高效率,真正的逻辑在setup方法中

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public RegistryImpl(final int var1) throws RemoteException { this.bindings = new Hashtable(101); if (var1 == 1099 && System.getSecurityManager() != null) { try { AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { public Void run() throws RemoteException { LiveRef var1x = new LiveRef(RegistryImpl.id, var1); RegistryImpl.this.setup(new UnicastServerRef(var1x, (var0) -> { return RegistryImpl.registryFilter(var0); })); return null; } }, (AccessControlContext)null, new SocketPermission("localhost:" + var1, "listen,accept")); } catch (PrivilegedActionException var3) { throw (RemoteException)var3.getException(); } } else { LiveRef var2 = new LiveRef(id, var1); this.setup(new UnicastServerRef(var2, RegistryImpl::registryFilter)); } }

setup方法将指向正在初始化的RegistryImpl对象的远程引用ref(RemoteRef)赋值为传入的UnicastServerRef对象,这里涉及到向上转型,然后继续执行UnicastServerRef的exportObject方法

复制代码
1
2
3
4
private void setup(UnicastServerRef var1) throws RemoteException { this.ref = var1; var1.exportObject(this, (Object)null, true); }

进入UnicastServerRef的exportObject()方法。可以看到,这里首先为传入的RegistryImpl创建一个代理,这个代理我们可以推断出就是后面服务于客户端的RegistryImpl的Stub(RegistryImpl_Stub)对象。然后将UnicastServerRef的skel(skeleton)对象设置为当前RegistryImpl对象。最后用skeleton、stub、UnicastServerRef对象、id和一个boolean值构造了一个Target对象,也就是这个Target对象基本上包含了全部的信息,等待TCP调用。调用UnicastServerRef的ref(LiveRef)变量的exportObject()方法。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException { Class var4 = var1.getClass(); Remote var5; try { var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse); } catch (IllegalArgumentException var7) { throw new ExportException("remote object implements illegal remote interface", var7); } if (var5 instanceof RemoteStub) { this.setSkeleton(var1); } Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3); this.ref.exportObject(var6); this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4); return var5; }

注:上述参数类型说明【var1=RegistryImpl ; var2=null ; var3=true】,LiveRef与TCP通信的类

到上面为止,我们看到的都是一些变量的赋值和创建工作,还没有到连接层,这些引用对象将会被Stub和Skeleton对象使用。接下来就是连接层上的了。追溯LiveRef的exportObject()方法,很容易找到了TCPTransport的exportObject()方法。这个方法做的事情就是将上面构造的Target对象暴露出去。调用TCPTransport的listen()方法,listen()方法创建了一个ServerSocket,并且启动了一条线程等待客户端的请求。接着调用父类Transport的exportObject()将Target对象存放进ObjectTable中。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void exportObject(Target var1) throws RemoteException { synchronized(this) { this.listen(); ++this.exportCount; } boolean var2 = false; boolean var12 = false; try { var12 = true; super.exportObject(var1); var2 = true; var12 = false; } finally { if (var12) { if (!var2) { synchronized(this) { this.decrementExportCount(); } } } } if (!var2) { synchronized(this) { this.decrementExportCount(); } } }

到此,我们已经将RegistryImpl对象创建并起了服务等待客户端的请求。

客户端获取服务端Registry代理

复制代码
1
IHelloWorldService service = (IHelloWorldService) Naming.lookup("helloWorld");

从上面代码看起,容易追溯到LocateRegistry的getRegistry()方法。这个方法做的事情是通过传入的host和port构造RemoteRef对象,并创建一个本地代理。这个代理对象其实是RegistryImpl_Stub对象。这样客户端便有了服务端的RegistryImpl的代理(取决于ignoreStubClasses变量)。但注意此时这个代理其实还没有和服务端的RegistryImpl对象关联,毕竟是两个VM上面的对象,这里我们也可以猜测,代理和远程的Registry对象之间是通过sokcet消息来完成的。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static Registry getRegistry(String host, int port, RMIClientSocketFactory csf) throws RemoteException { Registry registry = null; //下面两个if代码块,目的是获取仓库地址 if (port <= 0) port = Registry.REGISTRY_PORT; if (host == null || host.length() == 0) { // If host is blank (as returned by "file:" URL in 1.0.2 used in // java.rmi.Naming), try to convert to real local host name so // that the RegistryImpl's checkAccess will not fail. try { host = java.net.InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { // If that failed, at least try "" (localhost) anyway... host = ""; } } //与TCP通信的类 LiveRef liveRef = new LiveRef(new ObjID(ObjID.REGISTRY_ID), new TCPEndpoint(host, port, csf, null), false); RemoteRef ref = (csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef); //创建远程代理类,并引用LiveRef,好让动态代理时,能进行TCP通信 return (Registry) Util.createProxy(RegistryImpl.class, ref, false); }

调用RegistryImpl_Stub的ref(RemoteRef)对象的newCall()方法,将RegistryImpl_Stub对象传了进行,不要忘了构造它的时候,我们将服务器的主机端口等信息传了进去,也就是我们把服务器相关的信息也传进了newCall()方法。newCall()方法做的事情简单来看就是建立了跟远程RegistryImpl的Skeleton对象的连接。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException { clientRefLog.log(Log.BRIEF, "get connection"); Connection var6 = this.ref.getChannel().newConnection(); try { clientRefLog.log(Log.VERBOSE, "create call context"); if (clientCallLog.isLoggable(Log.VERBOSE)) { this.logClientCall(var1, var2[var3]); } StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4); try { this.marshalCustomCallData(var7.getOutputStream()); } catch (IOException var9) { throw new MarshalException("error marshaling custom call data"); } return var7; } catch (RemoteException var10) { this.ref.getChannel().free(var6, false); throw var10; } }

连接建立之后自然就是发送请求了。我们知道客户端终究只是拥有Registry对象的代理,而不是真正的位于服务端的Registry对象本身,他们位于不同的虚拟机实例中,无法直接调用。必然时通过消息进行交互的。看看this.ref.invoke(var2);这里做了什么吧。容易追溯到StreamRemoteCall的executeCall()方法。看似本地调用,但其实很容易从代码中看出是通过tcp连接发送消息到服务端的。由服务端解析并处理调用。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void executeCall() throws Exception { DGCAckHandler var2 = null; byte var1; try { if (this.out != null) { var2 = this.out.getDGCAckHandler(); } this.releaseOutputStream(); DataInputStream var3 = new DataInputStream(this.conn.getInputStream()); byte var4 = var3.readByte(); if (var4 != 81) { if (Transport.transportLog.isLoggable(Log.BRIEF)) { Transport.transportLog.log(Log.BRIEF, "transport return code invalid: " + var4); } throw new UnmarshalException("Transport return code invalid"); } this.getInputStream(); var1 = this.in.readByte(); this.in.readID(); } catch (UnmarshalException var11) { throw var11; } catch (IOException var12) { throw new UnmarshalException("Error unmarshaling return header", var12); } finally { if (var2 != null) { var2.release(); } } switch(var1) { case 1: return; case 2: Object var14; try { var14 = this.in.readObject(); } catch (Exception var10) { throw new UnmarshalException("Error unmarshaling return", var10); } if (!(var14 instanceof Exception)) { throw new UnmarshalException("Return type not Exception"); } else { this.exceptionReceivedFromServer((Exception)var14); } default: if (Transport.transportLog.isLoggable(Log.BRIEF)) { Transport.transportLog.log(Log.BRIEF, "return code invalid: " + var1); } throw new UnmarshalException("Return code invalid"); } }

至此,我们已经将客户端的服务查询请求发出了。服务端接收客户端的服务查询请求并返回给客户端结果。

这里我们继续跟踪server端代码的服务发布代码,一步步往上翻。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void listen() throws RemoteException { assert Thread.holdsLock(this); TCPEndpoint var1 = this.getEndpoint(); int var2 = var1.getPort(); if (this.server == null) { if (tcpLog.isLoggable(Log.BRIEF)) { tcpLog.log(Log.BRIEF, "(port " + var2 + ") create server socket"); } try { this.server = var1.newServerSocket(); Thread var3 = (Thread)AccessController.doPrivileged(new NewThreadAction(new TCPTransport.AcceptLoop(this.server), "TCP Accept-" + var2, true)); var3.start(); } catch (BindException var4) { throw new ExportException("Port already in use: " + var2, var4); } catch (IOException var5) { throw new ExportException("Listen failed on port: " + var2, var5); } } else { SecurityManager var6 = System.getSecurityManager(); if (var6 != null) { var6.checkListen(var2); } } }

在TCP协议层发起socket监听,并采用多线程循环接收请求:TCPTransport.AcceptLoop(this.server);

再往下走,一堆判断。。。。。。然后我们直接跳过了,不知道怎么跳可以根据我上面的demo用debug大法走一下。下面直接贴核心代码serviceCall()方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public boolean serviceCall(final RemoteCall var1) { try { ObjID var39; try { var39 = ObjID.read(var1.getInputStream()); } catch (IOException var33) { throw new MarshalException("unable to read objID", var33); } Transport var40 = var39.equals(dgcID) ? null : this; Target var5 = ObjectTable.getTarget(new ObjectEndpoint(var39, var40)); final Remote var37; if (var5 != null && (var37 = var5.getImpl()) != null) { final Dispatcher var6 = var5.getDispatcher(); var5.incrementCallCount(); boolean var8; try { transportLog.log(Log.VERBOSE, "call dispatcher"); final AccessControlContext var7 = var5.getAccessControlContext(); ClassLoader var41 = var5.getContextClassLoader(); ClassLoader var9 = Thread.currentThread().getContextClassLoader(); try { setContextClassLoader(var41); currentTransport.set(this); try { AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { public Void run() throws IOException { Transport.this.checkAcceptPermission(var7); var6.dispatch(var37, var1); return null; } }, var7); return true; } catch (PrivilegedActionException var31) { throw (IOException)var31.getException(); } } finally { setContextClassLoader(var9); currentTransport.set((Object)null); } } catch (IOException var34) { transportLog.log(Log.BRIEF, "exception thrown by dispatcher: ", var34); var8 = false; } finally { var5.decrementCallCount(); } return var8; } throw new NoSuchObjectException("no such object in table"); } catch (RemoteException var36) { RemoteException var2 = var36; if (UnicastServerRef.callLog.isLoggable(Log.BRIEF)) { String var3 = ""; try { var3 = "[" + RemoteServer.getClientHost() + "] "; } catch (ServerNotActiveException var30) { } String var4 = var3 + "exception: "; UnicastServerRef.callLog.log(Log.BRIEF, var4, var36); } try { ObjectOutput var38 = var1.getResultStream(false); UnicastServerRef.clearStackTraces(var2); var38.writeObject(var2); var1.releaseOutputStream(); } catch (IOException var29) { transportLog.log(Log.BRIEF, "exception thrown marshalling exception: ", var29); return false; } } return true; }

来个手写RPC框架show一波吧

最后

以上就是花痴鲜花最近收集整理的关于分布式-通信框架什么是RPC来个手写RPC框架show一波吧的全部内容,更多相关分布式-通信框架什么是RPC来个手写RPC框架show一波吧内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(92)

评论列表共有 0 条评论

立即
投稿
返回
顶部