概述
目录
什么是RPC
了解Java RMI
Java RMI代码实践
JavaRMI源码分析
远程对象发布-类图
远程引用层-类图
源码解读
发布远程对象
来个手写RPC框架show一波吧
什么是RPC
RPC(Remote Procedure Call,远程过程调用),一般用来实现部署在不同机器上的系统之间的方法调用,使得程序能够像访问本地系统资源一样,通过网络传输去访问远端系统资源;对于客户端来说,传输层用什么协议,序列化、反序列化都是透明的。
了解Java RMI
RMI全称是Remote Method Invovation远程方法调用,一种用于远程过程调用的应用程序编程接口,是纯Java的网络分布式应用系统的核心解决方案之一。
RMI目前使用Java远程消息交换协议JRMP(Java Remote Messageing Protocol)进行通信,由于JRMP是专为Java对象制定的,是分布式应用系统的百分之百纯Java解决方案,用Java RMI开发的应用系统可以部署在任何支持JRE的平台上,缺点是,由于JRMP是专门为Java对象指定的,因此RMI对于非Java语言开发的应用系统的支持不足,不能与非Java语言书写的对象进行通信
Java RMI代码实践
远程对象必须实现UnicastRemoteObject,这样才能保证客户端访问获得远程对象时,该远程对象会把自身的一个拷贝以Socket形式传输给客户端,客户端获得的拷贝称为“stub”,而服务器端本身已经存在的远程对象称为“skeleton”,此时客户端的stub是客户端的一个代理,用于与服务器端进行通信,而skeleton是服务器端的一个代理,用于接收客户端的请求之后调用远程方法来相应客户端的请求
JavaRMI源码分析
这里以RMI的一个简单实现Demo,来进入RMI框架源码的神奇世界吧。
/**
* @author King Chen
* @Date: 2019/3/20 20:30
*/
public interface IHelloWorldService extends Remote {
String sayHello(String name) throws RemoteException;
}
/**
* @author King Chen
* @Date: 2019/3/20 20:31
*/
public class HelloWorldServiceImpl extends UnicastRemoteObject implements IHelloWorldService {
protected HelloWorldServiceImpl() throws RemoteException {
super();
}
@Override
public String sayHello(String name) throws RemoteException {
return "Hello " + name;
}
}
/**
* @author King Chen
* @Date: 2019/3/20 20:34
*/
public class Server {
public static void main(String[] args) {
try {
IHelloWorldService service = new HelloWorldServiceImpl();
LocateRegistry.createRegistry(1099);
Naming.rebind("helloWorld", service);
} catch (RemoteException e) {
e.printStackTrace();
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
}
/**
* @author King Chen
* @Date: 2019/3/20 20:39
*/
public class Client {
public static void main(String[] args) {
try {
IHelloWorldService service = (IHelloWorldService) Naming.lookup("helloWorld");
System.out.println(service.sayHello("King"));
} catch (NotBoundException e) {
e.printStackTrace();
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (RemoteException e) {
e.printStackTrace();
}
}
}
远程对象发布-类图
远程引用层-类图
源码解读
发布远程对象
发布远程对象,看到上面的类图可以知道,这个地方会发布两个远程对象,一个是RegistryImpl、另外一个是我们自己写的RMI实现类对象;
从HelloServiceImpl的构造函数看起。调用了父类UnicastRemoteObject的构造方法,追溯到UnicastRemoteObject的私有方法exportObject()。这里做了一个判断,判断服务的实现是不是UnicastRemoteObject的子类,如果是,则直接赋值其ref(RemoteRef)对象为传入的UnicastServerRef对象。反之则调用UnicastRef的exportObject()方法。
因为HelloServiceImpl继承了UnicastRemoteObject,所以在服务启动的时候,会通过UnicastRemoteObject的构造方法把该对象进行发布
protected UnicastRemoteObject(int port) throws RemoteException
{
this.port = port;
exportObject((Remote) this, port);
}
服务端启动Registry服务
LocateRegistry.createRegistry(1099);
从上面这段代码入手,采用神奇的Debug大法,开始往下看。可以发现服务端创建了一个RegistryImpl对象,这里做了一个判断,如果服务端制定的端口是1099并且系统开启了安全管理器,那么就可以在限定的权限集内绕过系统的安全校验。这里纯粹是为了提高效率,真正的逻辑在setup方法中
public RegistryImpl(final int var1) throws RemoteException {
this.bindings = new Hashtable(101);
if (var1 == 1099 && System.getSecurityManager() != null) {
try {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
public Void run() throws RemoteException {
LiveRef var1x = new LiveRef(RegistryImpl.id, var1);
RegistryImpl.this.setup(new UnicastServerRef(var1x, (var0) -> {
return RegistryImpl.registryFilter(var0);
}));
return null;
}
}, (AccessControlContext)null, new SocketPermission("localhost:" + var1, "listen,accept"));
} catch (PrivilegedActionException var3) {
throw (RemoteException)var3.getException();
}
} else {
LiveRef var2 = new LiveRef(id, var1);
this.setup(new UnicastServerRef(var2, RegistryImpl::registryFilter));
}
}
setup方法将指向正在初始化的RegistryImpl对象的远程引用ref(RemoteRef)赋值为传入的UnicastServerRef对象,这里涉及到向上转型,然后继续执行UnicastServerRef的exportObject方法
private void setup(UnicastServerRef var1) throws RemoteException {
this.ref = var1;
var1.exportObject(this, (Object)null, true);
}
进入UnicastServerRef的exportObject()方法。可以看到,这里首先为传入的RegistryImpl创建一个代理,这个代理我们可以推断出就是后面服务于客户端的RegistryImpl的Stub(RegistryImpl_Stub)对象。然后将UnicastServerRef的skel(skeleton)对象设置为当前RegistryImpl对象。最后用skeleton、stub、UnicastServerRef对象、id和一个boolean值构造了一个Target对象,也就是这个Target对象基本上包含了全部的信息,等待TCP调用。调用UnicastServerRef的ref(LiveRef)变量的exportObject()方法。
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {
Class var4 = var1.getClass();
Remote var5;
try {
var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);
} catch (IllegalArgumentException var7) {
throw new ExportException("remote object implements illegal remote interface", var7);
}
if (var5 instanceof RemoteStub) {
this.setSkeleton(var1);
}
Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);
this.ref.exportObject(var6);
this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);
return var5;
}
注:上述参数类型说明【var1=RegistryImpl ; var2=null ; var3=true】,LiveRef与TCP通信的类
到上面为止,我们看到的都是一些变量的赋值和创建工作,还没有到连接层,这些引用对象将会被Stub和Skeleton对象使用。接下来就是连接层上的了。追溯LiveRef的exportObject()方法,很容易找到了TCPTransport的exportObject()方法。这个方法做的事情就是将上面构造的Target对象暴露出去。调用TCPTransport的listen()方法,listen()方法创建了一个ServerSocket,并且启动了一条线程等待客户端的请求。接着调用父类Transport的exportObject()将Target对象存放进ObjectTable中。
public void exportObject(Target var1) throws RemoteException {
synchronized(this) {
this.listen();
++this.exportCount;
}
boolean var2 = false;
boolean var12 = false;
try {
var12 = true;
super.exportObject(var1);
var2 = true;
var12 = false;
} finally {
if (var12) {
if (!var2) {
synchronized(this) {
this.decrementExportCount();
}
}
}
}
if (!var2) {
synchronized(this) {
this.decrementExportCount();
}
}
}
到此,我们已经将RegistryImpl对象创建并起了服务等待客户端的请求。
客户端获取服务端Registry代理
IHelloWorldService service = (IHelloWorldService) Naming.lookup("helloWorld");
从上面代码看起,容易追溯到LocateRegistry的getRegistry()方法。这个方法做的事情是通过传入的host和port构造RemoteRef对象,并创建一个本地代理。这个代理对象其实是RegistryImpl_Stub对象。这样客户端便有了服务端的RegistryImpl的代理(取决于ignoreStubClasses变量)。但注意此时这个代理其实还没有和服务端的RegistryImpl对象关联,毕竟是两个VM上面的对象,这里我们也可以猜测,代理和远程的Registry对象之间是通过sokcet消息来完成的。
public static Registry getRegistry(String host, int port,
RMIClientSocketFactory csf)
throws RemoteException
{
Registry registry = null;
//下面两个if代码块,目的是获取仓库地址
if (port <= 0)
port = Registry.REGISTRY_PORT;
if (host == null || host.length() == 0) {
// If host is blank (as returned by "file:" URL in 1.0.2 used in
// java.rmi.Naming), try to convert to real local host name so
// that the RegistryImpl's checkAccess will not fail.
try {
host = java.net.InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
// If that failed, at least try "" (localhost) anyway...
host = "";
}
}
//与TCP通信的类
LiveRef liveRef =
new LiveRef(new ObjID(ObjID.REGISTRY_ID),
new TCPEndpoint(host, port, csf, null),
false);
RemoteRef ref =
(csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef);
//创建远程代理类,并引用LiveRef,好让动态代理时,能进行TCP通信
return (Registry) Util.createProxy(RegistryImpl.class, ref, false);
}
调用RegistryImpl_Stub的ref(RemoteRef)对象的newCall()方法,将RegistryImpl_Stub对象传了进行,不要忘了构造它的时候,我们将服务器的主机端口等信息传了进去,也就是我们把服务器相关的信息也传进了newCall()方法。newCall()方法做的事情简单来看就是建立了跟远程RegistryImpl的Skeleton对象的连接。
public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException {
clientRefLog.log(Log.BRIEF, "get connection");
Connection var6 = this.ref.getChannel().newConnection();
try {
clientRefLog.log(Log.VERBOSE, "create call context");
if (clientCallLog.isLoggable(Log.VERBOSE)) {
this.logClientCall(var1, var2[var3]);
}
StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4);
try {
this.marshalCustomCallData(var7.getOutputStream());
} catch (IOException var9) {
throw new MarshalException("error marshaling custom call data");
}
return var7;
} catch (RemoteException var10) {
this.ref.getChannel().free(var6, false);
throw var10;
}
}
连接建立之后自然就是发送请求了。我们知道客户端终究只是拥有Registry对象的代理,而不是真正的位于服务端的Registry对象本身,他们位于不同的虚拟机实例中,无法直接调用。必然时通过消息进行交互的。看看this.ref.invoke(var2);这里做了什么吧。容易追溯到StreamRemoteCall的executeCall()方法。看似本地调用,但其实很容易从代码中看出是通过tcp连接发送消息到服务端的。由服务端解析并处理调用。
public void executeCall() throws Exception {
DGCAckHandler var2 = null;
byte var1;
try {
if (this.out != null) {
var2 = this.out.getDGCAckHandler();
}
this.releaseOutputStream();
DataInputStream var3 = new DataInputStream(this.conn.getInputStream());
byte var4 = var3.readByte();
if (var4 != 81) {
if (Transport.transportLog.isLoggable(Log.BRIEF)) {
Transport.transportLog.log(Log.BRIEF, "transport return code invalid: " + var4);
}
throw new UnmarshalException("Transport return code invalid");
}
this.getInputStream();
var1 = this.in.readByte();
this.in.readID();
} catch (UnmarshalException var11) {
throw var11;
} catch (IOException var12) {
throw new UnmarshalException("Error unmarshaling return header", var12);
} finally {
if (var2 != null) {
var2.release();
}
}
switch(var1) {
case 1:
return;
case 2:
Object var14;
try {
var14 = this.in.readObject();
} catch (Exception var10) {
throw new UnmarshalException("Error unmarshaling return", var10);
}
if (!(var14 instanceof Exception)) {
throw new UnmarshalException("Return type not Exception");
} else {
this.exceptionReceivedFromServer((Exception)var14);
}
default:
if (Transport.transportLog.isLoggable(Log.BRIEF)) {
Transport.transportLog.log(Log.BRIEF, "return code invalid: " + var1);
}
throw new UnmarshalException("Return code invalid");
}
}
至此,我们已经将客户端的服务查询请求发出了。服务端接收客户端的服务查询请求并返回给客户端结果。
这里我们继续跟踪server端代码的服务发布代码,一步步往上翻。
private void listen() throws RemoteException {
assert Thread.holdsLock(this);
TCPEndpoint var1 = this.getEndpoint();
int var2 = var1.getPort();
if (this.server == null) {
if (tcpLog.isLoggable(Log.BRIEF)) {
tcpLog.log(Log.BRIEF, "(port " + var2 + ") create server socket");
}
try {
this.server = var1.newServerSocket();
Thread var3 = (Thread)AccessController.doPrivileged(new NewThreadAction(new TCPTransport.AcceptLoop(this.server), "TCP Accept-" + var2, true));
var3.start();
} catch (BindException var4) {
throw new ExportException("Port already in use: " + var2, var4);
} catch (IOException var5) {
throw new ExportException("Listen failed on port: " + var2, var5);
}
} else {
SecurityManager var6 = System.getSecurityManager();
if (var6 != null) {
var6.checkListen(var2);
}
}
}
在TCP协议层发起socket监听,并采用多线程循环接收请求:TCPTransport.AcceptLoop(this.server);
再往下走,一堆判断。。。。。。然后我们直接跳过了,不知道怎么跳可以根据我上面的demo用debug大法走一下。下面直接贴核心代码serviceCall()方法
public boolean serviceCall(final RemoteCall var1) {
try {
ObjID var39;
try {
var39 = ObjID.read(var1.getInputStream());
} catch (IOException var33) {
throw new MarshalException("unable to read objID", var33);
}
Transport var40 = var39.equals(dgcID) ? null : this;
Target var5 = ObjectTable.getTarget(new ObjectEndpoint(var39, var40));
final Remote var37;
if (var5 != null && (var37 = var5.getImpl()) != null) {
final Dispatcher var6 = var5.getDispatcher();
var5.incrementCallCount();
boolean var8;
try {
transportLog.log(Log.VERBOSE, "call dispatcher");
final AccessControlContext var7 = var5.getAccessControlContext();
ClassLoader var41 = var5.getContextClassLoader();
ClassLoader var9 = Thread.currentThread().getContextClassLoader();
try {
setContextClassLoader(var41);
currentTransport.set(this);
try {
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
public Void run() throws IOException {
Transport.this.checkAcceptPermission(var7);
var6.dispatch(var37, var1);
return null;
}
}, var7);
return true;
} catch (PrivilegedActionException var31) {
throw (IOException)var31.getException();
}
} finally {
setContextClassLoader(var9);
currentTransport.set((Object)null);
}
} catch (IOException var34) {
transportLog.log(Log.BRIEF, "exception thrown by dispatcher: ", var34);
var8 = false;
} finally {
var5.decrementCallCount();
}
return var8;
}
throw new NoSuchObjectException("no such object in table");
} catch (RemoteException var36) {
RemoteException var2 = var36;
if (UnicastServerRef.callLog.isLoggable(Log.BRIEF)) {
String var3 = "";
try {
var3 = "[" + RemoteServer.getClientHost() + "] ";
} catch (ServerNotActiveException var30) {
}
String var4 = var3 + "exception: ";
UnicastServerRef.callLog.log(Log.BRIEF, var4, var36);
}
try {
ObjectOutput var38 = var1.getResultStream(false);
UnicastServerRef.clearStackTraces(var2);
var38.writeObject(var2);
var1.releaseOutputStream();
} catch (IOException var29) {
transportLog.log(Log.BRIEF, "exception thrown marshalling exception: ", var29);
return false;
}
}
return true;
}
来个手写RPC框架show一波吧
最后
以上就是花痴鲜花为你收集整理的分布式-通信框架什么是RPC来个手写RPC框架show一波吧的全部内容,希望文章能够帮你解决分布式-通信框架什么是RPC来个手写RPC框架show一波吧所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复