我是靠谱客的博主 狂野裙子,最近开发中收集的这篇文章主要介绍Dubbo之手写RPC框架█ 总体认识█ 公共代码█ 服务端█ 客户端█ 运行服务端█ 运行客户端█ 总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

建议从Dubbo之@SPI开始看。

关键词:Dubbo    RPC

 


纯手写实现一个简单的RPC调用,帮助更好地学习和理解Dubbo

RPC-远程过程调用,我感觉可以理解成客户端(即消费者)通过TCP加上特定的消息协议访问服务端(即提供者),服务端根据消息协议内容调用本地方法并响应给客户端。就好像浏览器采用http协议,通过TCP传输去调用服务端接口一样,只不过http调用的是服务端的接口,接口其实对应着某个特定的方法。而RPC则直接调用服务端的方法。

关于协议:协议就是双方约定好的一种消息格式,这样发送消息的人发送出去的消息,接收消息的人才能够认识。就好比,两个人通信,发信人用的是中文,收信人呢就去查看新华字典来一个个读取信件中的内容,然后便知道和理解发信人的目的和行为了。你想想,如果此时的收信人拿出一本牛津词典查来查去,他是如论如何都不会理解中文的信件的。这里的新华字典就好比协议。

通过简单的代码来实现简单的RPC调用,这样更有助于理解和使用Dubbo,Dubbo包装了很多功能,理解起来还是蛮困难的。我感觉就像上面说的那样,本质还是TCP调用。TCP是传输层的协议了,比较底层,在JAVA中对应着就是Socket和ServerSocket。


█ 总体认识

  • 我代码都放在一个模块里面了,实际的项目中,客户端代码和服务端代码是分开的,有可能就是放在不同的服务器上。公共代码是客户端和服务端都需要的。
  • 客户端代码就是消费者,也就是服务的调用方;服务端代码就是提供者,也就是服务的提供者

█ 公共代码

package common;

/**
 *
 * 接口,面向接口调用。客户端调用的是接口的代理,
 * 服务端真正调用接口的实现类(即CatService或DogService)
 *
 */
public interface AnimalService {

    String say();

    int age(int age);

}

协议类,客户端和服务端都需要使用。客户端根据协议的规定格式创建请求信息,服务端根据协议的规定格式解析请求内容。我这里的协议比较简单,就是字符串拼接,包含了请求的接口,方法和参数等信息(这样都是定位到调用的具体方法必不可少的条件)。dubbo-start的作用是用来标识一段请求消息的内容的。当多个客户端同时请求了服务端,服务端获取到的请求内容可能会被放到同一个字节数组里面,这样dubbo-start方面拆分每一条请求。请求的唯一标识的作用差不多,能够方便客户端解析响应消息的时候,对应上自己的哪一次请求。

package common;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 协议类,用于组装、拆解交流的信息
 *
 * 协议规定:一个请求必须以dubbo-start开头,第二个是请求的唯一标识 第三个是类的全限定名,第四个是方法名 第五个是参数类型列表
 * 第六个是参数值,(参数类型和参数值,多个用逗号分割)
 * 值与值中间用一个空行分割
 *
 */
public class Protocol {

    /**
     * 构建请求信息,即客户端按照协议规定的格式构建请求信息
     * @param requestId 请求唯一标识
     * @param targetClass 请求的目标类
     * @param methodName 请求的目标方法
     * @param paramTypes 目标方法的参数类型
     * @param paramValues 传给目标方法的参数
     * @return
     */
    public byte[] packRequest(String requestId, Class targetClass, String methodName, Class[] paramTypes, Object[] paramValues) {
        StringBuilder sb = new StringBuilder();
        // 拼接协议信息
        sb.append("dubbo-start").append(" ").append(requestId).append(" ").append(targetClass.getName()).append(" ").
                append(methodName).append(" ");

        // 处理参数类型
        if (paramTypes!=null && paramTypes.length>0) {
            StringBuilder paramType = new StringBuilder();
            for (Class type : paramTypes) {
                paramType.append(type.getName()).append(",");
            }
            // 去掉结尾的逗号
            String substring = paramType.toString().substring(0, paramType.toString().length() - 1);
            sb.append(substring).append(" ");
        }
        // 处理参数值
        if (paramValues!=null && paramValues.length>0) {
            StringBuilder paramValue = new StringBuilder();
            for (Object value : paramValues) {
                paramValue.append(value).append(",");
            }
            // 去掉结尾的逗号
            String substring = paramValue.toString().substring(0, paramValue.toString().length() - 1);
            sb.append(substring);
        }
        return sb.toString().getBytes();
    }

    /**
     * 拆解请求信息,即服务端解析客户端的请求信息
     * @param bytes
     * @param len
     * @return
     */
    public Map<String, Object> unpackRequest(byte[] bytes, int len) {
        Map<String, Object> map = new HashMap<>();
        String recv = new String(bytes, 0, len);
        // 根据协议约定,按照空格分隔各个请求信息
        String[] split = recv.split(" ");
        System.out.println("请求信息:"+split);
        // 请求ID
        map.put(Const.REQUEST_ID, split[1]);
        // 接口名
        map.put(Const.INTERFACE_NAME, split[2]);
        // 方法名
        map.put(Const.METHOD_NAME, split[3]);
        // 参数类型
        List<Class> paramTypelist = new ArrayList<>();
        if (split.length>4 && split[4]!=null) {
            String[] types = split[4].split(",");
            for (String type : types) {
                paramTypelist.add(convertParamType(type));
            }
            map.put(Const.PARAM_TYPES, paramTypelist);
        }
        // 参数值
        if (split.length>5 && split[5]!=null) {
            String[] values = split[5].split(",");
            List<Object> valueList = new ArrayList<>(values.length);
            for (int i=0; i<values.length; i++) {
                valueList.add(convertParamValue(paramTypelist.get(i), values[i]));
            }
            map.put(Const.PARAM_VALUES, valueList);
        }

        return map;
    }

    /**
     * 参数类型转换
     * @param type
     * @return
     */
    private Class convertParamType(String type) {
        // 简单地举了几个例子
        if ("int".equals(type)) {
            return int.class;
        } else if ("java.util.ArrayList".equals(type) || "ArrayList".equals(type)) {
            return ArrayList.class;
        }
        return String.class;
    }

    /**
     * 参数值转换。解析的时候都转成了字符串,
     * 这里需要根据参数具体的类型转换
     * @param type
     * @param value
     * @return
     */
    private Object convertParamValue(Class type, String value) {
        // 简单的举了几个例子
        if (type==int.class) {
            return Integer.parseInt(value);
        } else if (type==double.class) {
            return Double.parseDouble(value);
        }
        return value;
    }

}
package common;

/**
 *
 * 常量类,抽出字符串
 *
 */
public class Const {

    // 请求ID
    public static final String REQUEST_ID = "requestId";

    // 请求目标接口类型
    public static final String INTERFACE_NAME = "interfaceName";

    // 请求目标方法
    public static final String METHOD_NAME = "methodName";

    // 请求方法参数类型
    public static final String PARAM_TYPES = "paramTypes";

    // 请求方法参数值
    public static final String PARAM_VALUES = "paramValues";

}

█ 服务端

Service-包装了服务发布的功能,即通过创建的ServerSocket来接收和响应客户端的请求。

package server;

import common.Const;
import common.Protocol;

import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 *
 * 包装服务端服务发布的功能
 * 
 */
public class Service<T> {
    // 用于存放接口和实现类对象的关系,作为服务端服务字典,方便查找
    private static ConcurrentHashMap<String, Object> beanMap = new ConcurrentHashMap<>();
    // 协议,在服务端,协议的作用就是解析消费端发送过来的消息
    private Protocol protocol = new Protocol();

    // 接口的实现类
    private T obejct;
    // 接口类型
    private Class interfaceClass;

    public Service(Class interfaceClass, T object) {
        // 前置校验,借实现类是否实现了接口
        boolean assignableFrom = interfaceClass.isAssignableFrom(object.getClass());
        if (!assignableFrom) {
            throw new IllegalArgumentException("object 必须实现interfaceClass接口");
        }
        this.interfaceClass = interfaceClass;
        this.obejct = object;
        // 缓存起来,方便客户端调用的时候,查找目标类
        beanMap.put(interfaceClass.getName(), object);
    }

    public void export() {
        try {
            ServerSocket serverSocket = ServiceServer.getServerSocket();
            while (true) {
                // 这里会阻塞,等待客户端的连接
                Socket socket = serverSocket.accept();
                // 响应结果
                byte[] result = new byte[1];
                // 接收消费者的请求
                InputStream inputStream = socket.getInputStream();
                byte[] recv = new byte[1024];
                // 这里会阻塞,等待客户端的请求信息
                int len = inputStream.read(recv);
                // 根据协议,转换请求信息
                Map<String, Object> requestMap = protocol.unpackRequest(recv, len);
                // 获取调用的接口
                Object interfaceName = requestMap.get(Const.INTERFACE_NAME);
                // 获取接口实现类对象
                Object object = beanMap.get(interfaceName);
                if (object==null) {
                    result = "请求的接口不存在".getBytes();
                } else {
                    // 方法名
                    Object methodName = requestMap.get(Const.METHOD_NAME);
                    // 参数类型
                    Class[] paramTypes = null;
                    List<Class> paramTypeList = (List<Class>)requestMap.get(Const.PARAM_TYPES);
                    if (paramTypeList!=null && paramTypeList.size()>0) {
                        paramTypes = paramTypeList.toArray(new Class[paramTypeList.size()]);
                    }
                    // 参数值
                    Object[] paramValues = null;
                    List<Object> paramValueList = (List<Object>)requestMap.get(Const.PARAM_VALUES);
                    if (paramValueList!=null && paramValueList.size()>0) {
                        paramValues = paramValueList.toArray();
                    }
                    // 获取到调用的方法
                    Method method = object.getClass().getMethod(methodName.toString(), paramTypes);
                    // 方法调用结果
                    Object invoke = method.invoke(object, paramValues);
                    if (invoke!=null) {
                        result = invoke.toString().getBytes();
                    }
                }
                // 将结果返回给客户端
                OutputStream os = socket.getOutputStream();
                os.write(result);
                os.flush();
                os.close();
                inputStream.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
package server;

import java.net.ServerSocket;

/**
 * 创建ServerSocket,暴露服务
 * 多次服务暴露,都使用一个ServerSocket
 * 这里的端口号就固定写了8899
 *
 */
public class ServiceServer {

    private static volatile ServerSocket serverSocket;

    private ServiceServer() {
        throw new IllegalStateException();
    }

    /**
     * 创建一个ServerSocket,用于暴露服务。
     * @return
     */
    public static ServerSocket getServerSocket() {
        if (serverSocket==null) {
            synchronized (ServiceServer.class) {
                if (serverSocket==null) {
                    try {
                        // 因为没有注册中心的功能,没法让消费者去感知服务的端口,这里端口就写死了
                        serverSocket = new ServerSocket(8899);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        return serverSocket;
    }

}
package server;

import common.AnimalService;

public class DogService implements AnimalService {

    @Override
    public String say() {
        return "this is a dog";
    }

    @Override
    public int age(int age) {
        return age;
    }
    
}

 

package server;

import common.AnimalService;

public class CatService implements AnimalService {

    @Override
    public String say() {
        return "this is a cat";
    }

    @Override
    public int age(int age) {
        return age + 10;
    }
    
}

█ 客户端

package client;

import common.Protocol;

import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
import java.util.UUID;

/**
 * 服务端调用代理类
 * 客户端并不知道在服务端上接口具体的实现类是哪个,只能通过调用接口来获取。
 * 创建接口的代理,主要工作就是创建Socket去连接服务端,并按照协议格式发送请求
 *
 */
public class Reference implements InvocationHandler {

    // 协议
    private Protocol protocol = new Protocol();

    private Class interfaceClass;

    public Reference(Class interfaceClass) {
        this.interfaceClass = interfaceClass;
    }

    public Object getReference() {
        return Proxy.newProxyInstance(Reference.class.getClassLoader(), new Class[]{interfaceClass}, this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Object result = null;

        String name = method.getName();
        // 如果是父类Object中的方法就直接返回方法名,这里举几个例子,不全
        if ("equals".equals(name) || "toString".equals(name)) {
            return name;
        }
        Socket socket = new Socket("localhost", 8899);
        // 客户端发送请求
        OutputStream os = socket.getOutputStream();
        // 通过约定好的协议格式构建请求信息
        byte[] requestByte = protocol.packRequest(UUID.randomUUID().toString(), interfaceClass, name, method.getParameterTypes(), args);
        os.write(requestByte);
        os.flush();
        os.close();
        // 客户端接收响应结果
        InputStream is = socket.getInputStream();
        byte[] recv = new byte[1024];
        int read = is.read(recv);
        is.close();
        // 这里应该还有解码器的,我就简单地写了一下
        Class<?> returnType = method.getReturnType();
        if (String.class==returnType) {
            result = new String(recv, 0, read);
        } else if (int.class==returnType || Integer.class==returnType) {
            return Integer.parseInt(new String(recv, 0, read));
        }

        return result;
    }
}

█ 运行服务端

package server;

import common.AnimalService;

public class Main {

    public static void main(String[] args) {
        // 暴露DogService
        Service service = new Service(AnimalService.class, new DogService());
        // 启动一个ServerSocket,等待客户端的连接和接收消息
        service.export();
    }

}

█ 运行客户端

package client;

import common.AnimalService;

public class Main {

    public static void main(String[] args) {
        // 客户端创建代理对象,代理去实现请求服务端的逻辑
        Reference reference = new Reference(AnimalService.class);
        AnimalService animalService = (AnimalService)reference.getReference();
        // 当调用say方法,实际会调用Reference的invoke方法
        String say = animalService.say();
        System.out.println(say);
        System.out.println("获取到的age="+animalService.age(10));

        // 输出结果:
        // this is a dog
        //获取到的age=10
    }

}

█ 总结

上面代码写的粗糙,功能也粗糙,只是简单地模拟了一下RPC调用,实现了服务端和客户端,根据协议构建和解析协议的功能。希望看完能够帮助更好的去理解Dubbo。在Dubbo中还实现了注册中心,路由选择,负载均衡,容错等等强大的功能,这些功能有机会的话会在后面介绍(希望我能坚持写下去)。我想,只有理解了RPC的工作原理,才能更好的去学习和理解Dubbo的功能。

关于Dubbo的使用、服务暴露、服务注册、负载均衡等,官网已经有很详细的介绍了。我就不重复写了,写了也就是分析源码。具体请移步Dubbo官网慢慢品味:我简述一下自己关于Dubbo的理解:

服务端:

  • 服务暴露:将ServiceConfig对应的接口实现类对外提供调用服务,让客户端(消费者)能够请求调用。
  • 服务注册:将ServiceConfig对应的接口实现类注册到注册中心上,这样是对客户端透明,即提供了一个公示栏,让客户端调用者查看其可以调用哪些服务。
  • 一个<dubbo:service />或@Service,就对应一个ServiceConfig实例。ServiceConfig是Dubbo服务暴露和注册的起点。开始于export方法
  • ServiceConfig在暴露和注册服务时,使用了Invoker这样的代理类,能够灵活地调用具体的接口实现类,而不用写死代码。
  • 服务暴露时,一个Invoker又会被封装成一个Exporter,由这个Exporter负责具体的服务暴露和引用工作。
  • 所有被暴露的服务都会缓存在Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap(),exporterMap之中,key是服务的标识信息(端口,版本号,分组名)。这样当消费者待着请求信息过来请求时,Dubbo提取一些参数值,组装成key,去本地缓存中查找对应的Exporter,而Exporter中又包含了Invoker,这样自然就调用了实际的接口实现类方法了。
  • 真正做服务暴露和注册的是协议Protocol,不如DubboProtocol,RegistryProtocol。

Service -----> ServiceConfig -------> Invoker -----> Exporter

消费端:

  • 一个<dubbo:reference />或@Reference对应一个ReferenceConfig。起点是get方法。
  • 一个ReferenceConfig也是对应一个Invoker,通过Invoker代理服务端的调用。
  • Invoker外包装了一层Directory。这个可以理解成消费者端的本地缓存,缓存了服务端的服务调用。当发起远程调用时,要去Directory这个缓存中去查找对应的Invoker,由Invoker完成具体的调用过程。
  • Directory又被Cluster所有。Cluster根据Directory中缓存的Invoker列表完成路由、负载均衡和容错功能

Reference -----> ReferenceConfig ------> Cluster -------> Directory -----> Invoker------->(Exporter ------>Invoker----->Service)

最后

以上就是狂野裙子为你收集整理的Dubbo之手写RPC框架█ 总体认识█ 公共代码█ 服务端█ 客户端█ 运行服务端█ 运行客户端█ 总结的全部内容,希望文章能够帮你解决Dubbo之手写RPC框架█ 总体认识█ 公共代码█ 服务端█ 客户端█ 运行服务端█ 运行客户端█ 总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(77)

评论列表共有 0 条评论

立即
投稿
返回
顶部