概述
建议从Dubbo之@SPI开始看。
关键词:Dubbo RPC
纯手写实现一个简单的RPC调用,帮助更好地学习和理解Dubbo
RPC-远程过程调用,我感觉可以理解成客户端(即消费者)通过TCP加上特定的消息协议访问服务端(即提供者),服务端根据消息协议内容调用本地方法并响应给客户端。就好像浏览器采用http协议,通过TCP传输去调用服务端接口一样,只不过http调用的是服务端的接口,接口其实对应着某个特定的方法。而RPC则直接调用服务端的方法。
关于协议:协议就是双方约定好的一种消息格式,这样发送消息的人发送出去的消息,接收消息的人才能够认识。就好比,两个人通信,发信人用的是中文,收信人呢就去查看新华字典来一个个读取信件中的内容,然后便知道和理解发信人的目的和行为了。你想想,如果此时的收信人拿出一本牛津词典查来查去,他是如论如何都不会理解中文的信件的。这里的新华字典就好比协议。
通过简单的代码来实现简单的RPC调用,这样更有助于理解和使用Dubbo,Dubbo包装了很多功能,理解起来还是蛮困难的。我感觉就像上面说的那样,本质还是TCP调用。TCP是传输层的协议了,比较底层,在JAVA中对应着就是Socket和ServerSocket。
█ 总体认识
- 我代码都放在一个模块里面了,实际的项目中,客户端代码和服务端代码是分开的,有可能就是放在不同的服务器上。公共代码是客户端和服务端都需要的。
- 客户端代码就是消费者,也就是服务的调用方;服务端代码就是提供者,也就是服务的提供者
█ 公共代码
package common;
/**
*
* 接口,面向接口调用。客户端调用的是接口的代理,
* 服务端真正调用接口的实现类(即CatService或DogService)
*
*/
public interface AnimalService {
String say();
int age(int age);
}
协议类,客户端和服务端都需要使用。客户端根据协议的规定格式创建请求信息,服务端根据协议的规定格式解析请求内容。我这里的协议比较简单,就是字符串拼接,包含了请求的接口,方法和参数等信息(这样都是定位到调用的具体方法必不可少的条件)。dubbo-start的作用是用来标识一段请求消息的内容的。当多个客户端同时请求了服务端,服务端获取到的请求内容可能会被放到同一个字节数组里面,这样dubbo-start方面拆分每一条请求。请求的唯一标识的作用差不多,能够方便客户端解析响应消息的时候,对应上自己的哪一次请求。
package common;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 协议类,用于组装、拆解交流的信息
*
* 协议规定:一个请求必须以dubbo-start开头,第二个是请求的唯一标识 第三个是类的全限定名,第四个是方法名 第五个是参数类型列表
* 第六个是参数值,(参数类型和参数值,多个用逗号分割)
* 值与值中间用一个空行分割
*
*/
public class Protocol {
/**
* 构建请求信息,即客户端按照协议规定的格式构建请求信息
* @param requestId 请求唯一标识
* @param targetClass 请求的目标类
* @param methodName 请求的目标方法
* @param paramTypes 目标方法的参数类型
* @param paramValues 传给目标方法的参数
* @return
*/
public byte[] packRequest(String requestId, Class targetClass, String methodName, Class[] paramTypes, Object[] paramValues) {
StringBuilder sb = new StringBuilder();
// 拼接协议信息
sb.append("dubbo-start").append(" ").append(requestId).append(" ").append(targetClass.getName()).append(" ").
append(methodName).append(" ");
// 处理参数类型
if (paramTypes!=null && paramTypes.length>0) {
StringBuilder paramType = new StringBuilder();
for (Class type : paramTypes) {
paramType.append(type.getName()).append(",");
}
// 去掉结尾的逗号
String substring = paramType.toString().substring(0, paramType.toString().length() - 1);
sb.append(substring).append(" ");
}
// 处理参数值
if (paramValues!=null && paramValues.length>0) {
StringBuilder paramValue = new StringBuilder();
for (Object value : paramValues) {
paramValue.append(value).append(",");
}
// 去掉结尾的逗号
String substring = paramValue.toString().substring(0, paramValue.toString().length() - 1);
sb.append(substring);
}
return sb.toString().getBytes();
}
/**
* 拆解请求信息,即服务端解析客户端的请求信息
* @param bytes
* @param len
* @return
*/
public Map<String, Object> unpackRequest(byte[] bytes, int len) {
Map<String, Object> map = new HashMap<>();
String recv = new String(bytes, 0, len);
// 根据协议约定,按照空格分隔各个请求信息
String[] split = recv.split(" ");
System.out.println("请求信息:"+split);
// 请求ID
map.put(Const.REQUEST_ID, split[1]);
// 接口名
map.put(Const.INTERFACE_NAME, split[2]);
// 方法名
map.put(Const.METHOD_NAME, split[3]);
// 参数类型
List<Class> paramTypelist = new ArrayList<>();
if (split.length>4 && split[4]!=null) {
String[] types = split[4].split(",");
for (String type : types) {
paramTypelist.add(convertParamType(type));
}
map.put(Const.PARAM_TYPES, paramTypelist);
}
// 参数值
if (split.length>5 && split[5]!=null) {
String[] values = split[5].split(",");
List<Object> valueList = new ArrayList<>(values.length);
for (int i=0; i<values.length; i++) {
valueList.add(convertParamValue(paramTypelist.get(i), values[i]));
}
map.put(Const.PARAM_VALUES, valueList);
}
return map;
}
/**
* 参数类型转换
* @param type
* @return
*/
private Class convertParamType(String type) {
// 简单地举了几个例子
if ("int".equals(type)) {
return int.class;
} else if ("java.util.ArrayList".equals(type) || "ArrayList".equals(type)) {
return ArrayList.class;
}
return String.class;
}
/**
* 参数值转换。解析的时候都转成了字符串,
* 这里需要根据参数具体的类型转换
* @param type
* @param value
* @return
*/
private Object convertParamValue(Class type, String value) {
// 简单的举了几个例子
if (type==int.class) {
return Integer.parseInt(value);
} else if (type==double.class) {
return Double.parseDouble(value);
}
return value;
}
}
package common;
/**
*
* 常量类,抽出字符串
*
*/
public class Const {
// 请求ID
public static final String REQUEST_ID = "requestId";
// 请求目标接口类型
public static final String INTERFACE_NAME = "interfaceName";
// 请求目标方法
public static final String METHOD_NAME = "methodName";
// 请求方法参数类型
public static final String PARAM_TYPES = "paramTypes";
// 请求方法参数值
public static final String PARAM_VALUES = "paramValues";
}
█ 服务端
Service-包装了服务发布的功能,即通过创建的ServerSocket来接收和响应客户端的请求。
package server;
import common.Const;
import common.Protocol;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* 包装服务端服务发布的功能
*
*/
public class Service<T> {
// 用于存放接口和实现类对象的关系,作为服务端服务字典,方便查找
private static ConcurrentHashMap<String, Object> beanMap = new ConcurrentHashMap<>();
// 协议,在服务端,协议的作用就是解析消费端发送过来的消息
private Protocol protocol = new Protocol();
// 接口的实现类
private T obejct;
// 接口类型
private Class interfaceClass;
public Service(Class interfaceClass, T object) {
// 前置校验,借实现类是否实现了接口
boolean assignableFrom = interfaceClass.isAssignableFrom(object.getClass());
if (!assignableFrom) {
throw new IllegalArgumentException("object 必须实现interfaceClass接口");
}
this.interfaceClass = interfaceClass;
this.obejct = object;
// 缓存起来,方便客户端调用的时候,查找目标类
beanMap.put(interfaceClass.getName(), object);
}
public void export() {
try {
ServerSocket serverSocket = ServiceServer.getServerSocket();
while (true) {
// 这里会阻塞,等待客户端的连接
Socket socket = serverSocket.accept();
// 响应结果
byte[] result = new byte[1];
// 接收消费者的请求
InputStream inputStream = socket.getInputStream();
byte[] recv = new byte[1024];
// 这里会阻塞,等待客户端的请求信息
int len = inputStream.read(recv);
// 根据协议,转换请求信息
Map<String, Object> requestMap = protocol.unpackRequest(recv, len);
// 获取调用的接口
Object interfaceName = requestMap.get(Const.INTERFACE_NAME);
// 获取接口实现类对象
Object object = beanMap.get(interfaceName);
if (object==null) {
result = "请求的接口不存在".getBytes();
} else {
// 方法名
Object methodName = requestMap.get(Const.METHOD_NAME);
// 参数类型
Class[] paramTypes = null;
List<Class> paramTypeList = (List<Class>)requestMap.get(Const.PARAM_TYPES);
if (paramTypeList!=null && paramTypeList.size()>0) {
paramTypes = paramTypeList.toArray(new Class[paramTypeList.size()]);
}
// 参数值
Object[] paramValues = null;
List<Object> paramValueList = (List<Object>)requestMap.get(Const.PARAM_VALUES);
if (paramValueList!=null && paramValueList.size()>0) {
paramValues = paramValueList.toArray();
}
// 获取到调用的方法
Method method = object.getClass().getMethod(methodName.toString(), paramTypes);
// 方法调用结果
Object invoke = method.invoke(object, paramValues);
if (invoke!=null) {
result = invoke.toString().getBytes();
}
}
// 将结果返回给客户端
OutputStream os = socket.getOutputStream();
os.write(result);
os.flush();
os.close();
inputStream.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package server;
import java.net.ServerSocket;
/**
* 创建ServerSocket,暴露服务
* 多次服务暴露,都使用一个ServerSocket
* 这里的端口号就固定写了8899
*
*/
public class ServiceServer {
private static volatile ServerSocket serverSocket;
private ServiceServer() {
throw new IllegalStateException();
}
/**
* 创建一个ServerSocket,用于暴露服务。
* @return
*/
public static ServerSocket getServerSocket() {
if (serverSocket==null) {
synchronized (ServiceServer.class) {
if (serverSocket==null) {
try {
// 因为没有注册中心的功能,没法让消费者去感知服务的端口,这里端口就写死了
serverSocket = new ServerSocket(8899);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return serverSocket;
}
}
package server;
import common.AnimalService;
public class DogService implements AnimalService {
@Override
public String say() {
return "this is a dog";
}
@Override
public int age(int age) {
return age;
}
}
package server;
import common.AnimalService;
public class CatService implements AnimalService {
@Override
public String say() {
return "this is a cat";
}
@Override
public int age(int age) {
return age + 10;
}
}
█ 客户端
package client;
import common.Protocol;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
import java.util.UUID;
/**
* 服务端调用代理类
* 客户端并不知道在服务端上接口具体的实现类是哪个,只能通过调用接口来获取。
* 创建接口的代理,主要工作就是创建Socket去连接服务端,并按照协议格式发送请求
*
*/
public class Reference implements InvocationHandler {
// 协议
private Protocol protocol = new Protocol();
private Class interfaceClass;
public Reference(Class interfaceClass) {
this.interfaceClass = interfaceClass;
}
public Object getReference() {
return Proxy.newProxyInstance(Reference.class.getClassLoader(), new Class[]{interfaceClass}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object result = null;
String name = method.getName();
// 如果是父类Object中的方法就直接返回方法名,这里举几个例子,不全
if ("equals".equals(name) || "toString".equals(name)) {
return name;
}
Socket socket = new Socket("localhost", 8899);
// 客户端发送请求
OutputStream os = socket.getOutputStream();
// 通过约定好的协议格式构建请求信息
byte[] requestByte = protocol.packRequest(UUID.randomUUID().toString(), interfaceClass, name, method.getParameterTypes(), args);
os.write(requestByte);
os.flush();
os.close();
// 客户端接收响应结果
InputStream is = socket.getInputStream();
byte[] recv = new byte[1024];
int read = is.read(recv);
is.close();
// 这里应该还有解码器的,我就简单地写了一下
Class<?> returnType = method.getReturnType();
if (String.class==returnType) {
result = new String(recv, 0, read);
} else if (int.class==returnType || Integer.class==returnType) {
return Integer.parseInt(new String(recv, 0, read));
}
return result;
}
}
█ 运行服务端
package server;
import common.AnimalService;
public class Main {
public static void main(String[] args) {
// 暴露DogService
Service service = new Service(AnimalService.class, new DogService());
// 启动一个ServerSocket,等待客户端的连接和接收消息
service.export();
}
}
█ 运行客户端
package client;
import common.AnimalService;
public class Main {
public static void main(String[] args) {
// 客户端创建代理对象,代理去实现请求服务端的逻辑
Reference reference = new Reference(AnimalService.class);
AnimalService animalService = (AnimalService)reference.getReference();
// 当调用say方法,实际会调用Reference的invoke方法
String say = animalService.say();
System.out.println(say);
System.out.println("获取到的age="+animalService.age(10));
// 输出结果:
// this is a dog
//获取到的age=10
}
}
█ 总结
上面代码写的粗糙,功能也粗糙,只是简单地模拟了一下RPC调用,实现了服务端和客户端,根据协议构建和解析协议的功能。希望看完能够帮助更好的去理解Dubbo。在Dubbo中还实现了注册中心,路由选择,负载均衡,容错等等强大的功能,这些功能有机会的话会在后面介绍(希望我能坚持写下去)。我想,只有理解了RPC的工作原理,才能更好的去学习和理解Dubbo的功能。
关于Dubbo的使用、服务暴露、服务注册、负载均衡等,官网已经有很详细的介绍了。我就不重复写了,写了也就是分析源码。具体请移步Dubbo官网慢慢品味:我简述一下自己关于Dubbo的理解:
服务端:
- 服务暴露:将ServiceConfig对应的接口实现类对外提供调用服务,让客户端(消费者)能够请求调用。
- 服务注册:将ServiceConfig对应的接口实现类注册到注册中心上,这样是对客户端透明,即提供了一个公示栏,让客户端调用者查看其可以调用哪些服务。
- 一个<dubbo:service />或@Service,就对应一个ServiceConfig实例。ServiceConfig是Dubbo服务暴露和注册的起点。开始于export方法
- ServiceConfig在暴露和注册服务时,使用了Invoker这样的代理类,能够灵活地调用具体的接口实现类,而不用写死代码。
- 服务暴露时,一个Invoker又会被封装成一个Exporter,由这个Exporter负责具体的服务暴露和引用工作。
- 所有被暴露的服务都会缓存在Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap(),exporterMap之中,key是服务的标识信息(端口,版本号,分组名)。这样当消费者待着请求信息过来请求时,Dubbo提取一些参数值,组装成key,去本地缓存中查找对应的Exporter,而Exporter中又包含了Invoker,这样自然就调用了实际的接口实现类方法了。
- 真正做服务暴露和注册的是协议Protocol,不如DubboProtocol,RegistryProtocol。
Service -----> ServiceConfig -------> Invoker -----> Exporter
消费端:
- 一个<dubbo:reference />或@Reference对应一个ReferenceConfig。起点是get方法。
- 一个ReferenceConfig也是对应一个Invoker,通过Invoker代理服务端的调用。
- Invoker外包装了一层Directory。这个可以理解成消费者端的本地缓存,缓存了服务端的服务调用。当发起远程调用时,要去Directory这个缓存中去查找对应的Invoker,由Invoker完成具体的调用过程。
- Directory又被Cluster所有。Cluster根据Directory中缓存的Invoker列表完成路由、负载均衡和容错功能
Reference -----> ReferenceConfig ------> Cluster -------> Directory -----> Invoker------->(Exporter ------>Invoker----->Service)
最后
以上就是狂野裙子为你收集整理的Dubbo之手写RPC框架█ 总体认识█ 公共代码█ 服务端█ 客户端█ 运行服务端█ 运行客户端█ 总结的全部内容,希望文章能够帮你解决Dubbo之手写RPC框架█ 总体认识█ 公共代码█ 服务端█ 客户端█ 运行服务端█ 运行客户端█ 总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复