我是靠谱客的博主 虚心项链,最近开发中收集的这篇文章主要介绍【RPC】手写简易 RPC 框架 --重构,实现基于 Netty 通信,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在上一篇文章,我们通过 BIO 实现了一个简易的 RPC 框架,使用 BIO 的优点是编码简单,但是问题也很明显,因为是同步阻塞式 IO,所以为了实现并发处理,需要给每个连接都分配一个线程,这样势必很浪费资源,导致业务体量很容易因为硬件出现瓶颈。

本篇我们就将这个 RPC 框架的底层通信方式升级为 Netty,Netty 是对 NIO 的封装,而 NIO 是基于 IO 多路复用的设计模式,通过轮询处理各个事件,大大节约了系统资源。下面我们就来看看具体该怎么做吧

在这里插入图片描述
可以看到,框架的整体结构并没有改变,还是那3个包,还是那几个类。但在此之上,我还添加了一个 @RpcService 注解,去优化手动发布服务为扫描注解自动发布。

注:如果相对于 version1 没有改变的类,我会在标题后标注上“未变”,看过 version1 的同学可以直接看变化的地方。

1.RpcRequest(未变)

RpcRequest 封装了消费者要调用方法的具体信息,是我们的自定义协议,或者说是消息格式。

涉及两个过程:

  • Consumer 编码:RpcRequest -> 数据流(二进制) => 告诉 Provider 要执行哪个方法
  • Provider 解码:数据流(二进制)-> RpcRequest => 获取 Consumer 要调用哪个方法
// 注:只有实现了序列化接口,才能实现远程传输
public class RpcRequest implements Serializable {

    private String className;  // 类(接口/服务)
    private String methodName;  // 方法
    private Object[] parameters;  // 参数

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }
}

2.RpcService(新增)

用来标识要发布的服务

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {

}

3.RpcServer(修改)*

这里不再是像 version1 的 BIO 那样通过 new ServerSocket() 创建服务端,然后再用 accept() 接受连接了,而是通过 Netty 去进行通信:

public class  RpcServer{

    // 初始化主线程池,Selector
    NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    // 初始化子线程池,对应具体客户端处理逻辑
    NioEventLoopGroup workerGrop = new  NioEventLoopGroup();
	
	/**
	* 发布服务
	* 注:这里因为改造成了@RpcService自动扫描,所以不再需要传入服务实例了,只用传一个端口就行
	*/
    public void publisher(int port) throws InterruptedException {

		// 创建服务端
        ServerBootstrap server = new ServerBootstrap();
        
        // 无锁式串行化编程
        server.group(bossGroup, workerGrop)
                .channel(NioServerSocketChannel.class)
                // 当有请求来的时候会将 Pipeline 中的所有 ChannelHandler 的走一遍
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
						
					   /** 
						 * LengthFieldBasedFrameDecoder入参有5个,分别解释如下
                         * maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                         * lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                         * lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
                         * lengthAdjustment:要添加到长度字段值的补偿值
                         * initialBytesToStrip:从解码帧中去除的第一个字节数
                        */
                        // 1.自定义协议(InvokerProtocol)的解码器 
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                        pipeline.addLast(new LengthFieldPrepender(4));
                        
                        // 2.对象参数类型的编解码器  
                        pipeline.addLast("encoder",new ObjectEncoder());
                        pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));

                        // 3.处理请求的自定义 Handler
                        pipeline.addLast(new ProcessorHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128) // 最大SelectionKey数量
                .childOption(ChannelOption.SO_KEEPALIVE, true); // 保证所有子线程是长连接,且可以重复利用

        // 绑定端口,并阻塞
        ChannelFuture future = server.bind(port).sync();
        // 正式启动服务,相当于用一个死循环开始轮训
        future.channel().closeFuture().sync();
    }
}

4.ProcessorHandler(修改)*

Provider 线程的具体调用逻辑:

  1. 扫描指定包下的所有所有 Java 文件,获取它们的全类名
  2. 将标有 @RpcService 的服务实现创建实例对象,然后放入map中保存(单例)
  3. 等有请求来到后获取到相应 Method,然后执行方法,写出结果

注:由于在上面已经配置好了编解码器,所以可以直接获取到 RpcRequest 对象,然后在写出时编码器会对内容编码成二进制流;但 BIO 这里就需要 ObjectInputStream 和 ObjectOutputStream 手动编码。

public class ProcessorHandler extends SimpleChannelInboundHandler<RpcRequest> {

    // 扫描目标包后所有 Java 文件的全类名
    private List<String> classNames = new ArrayList<>();
    // 有 @RpcService 注解的服务bean的(serviceName,instance)
    private Map<String, Object> registryMap = new ConcurrentHashMap<>();

    public ProcessorHandler() {
        try {
            scannerClass("com.xupt.yzh.provider");
            doRegistry();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void scannerClass(String packageName) {
        // 根据包名获取绝对路径
        URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\.", "/"));
        File classpath = new File(url.getFile());

        for (File file : classpath.listFiles()) {
            if (file.isDirectory()) {
                scannerClass(packageName + "." + file.getName());
            } else {
                // 将扫描到的类的全类名放入 classNames
                classNames.add(packageName + "." + file.getName().replace(".class", ""));
            }
        }
    }

    private void doRegistry() throws Exception {
        if (classNames.isEmpty()) {
            return;
        }

        for (String className : classNames) {
            Class<?> clazz = Class.forName(className);

            RpcService annotation = clazz.getAnnotation(RpcService.class);
            if (annotation != null) {

                // 注:如果实现了多个接口,只取第一个
                Class<?> i = clazz.getInterfaces()[0];
                String serviceName = i.getName();

                // 注册
                registryMap.put(serviceName, clazz.newInstance());
            }
        }
    }

	// Consumer 发送请求过来时触发回调
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
        if (registryMap.containsKey(request.getClassName())) {
            Object service = registryMap.get(request.getClassName());

            // 根据实参获取方法的形参
            // 注:获取形参列表后才能确定一个方法
            Object[] args = request.getParameters();
            Class<?>[] types = new Class[args.length];
            for (int i = 0; i < args.length; i++) {
                types[i] = args[i].getClass();
            }

            // 获取 Method
            Method method = service.getClass().getMethod(request.getMethodName(), types);

            // 执行方法
            Object res = method.invoke(service, args);

            ctx.writeAndFlush(res);
            ctx.close();
        }
    }
}

5.RpcProxyClient(未变)

代理对象,通过 JDK 动态代理生成一个代理对象

PS:这里创建一个代理对象是因为,服务的实现实例在 Provider,但 Consumer 调用服务的具体方法时也需要一个实例,而 Consumer 并没有这个实例。

public class RpcProxyClient {

    public <T>T clientProxy(final  Class<T> interfaceCls, final String host, final int port) {
        return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
                new Class[]{interfaceCls},
                new RemoteInvocationHandler(host, port));
    }
}

6.RemoteInvocationHandler(未变)

代理对象的具体逻辑,核心是 invoke 方法,当 Consumer 调用了服务的方法时,就会走到 invoke():

  1. 构建请求信息 RpcRequest
  2. 发送(编码):将 RpcRequest 转换成二进制流,发送
  3. 等待 Provider 处理结果
  4. 接收(解码):接收 Provider 的处理结果,将二进制流转换成基本类型/Java对象,并返回给上层函数

注:234步的逻辑都是网络 IO 相关,所以后面单独封装了一个 RpcNetTransport 类去实现

public class RemoteInvocationHandler implements InvocationHandler {

    String host;
    int port;

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 构建调用Provider的请求参数
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName()); //
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setParameters(args);

        // 进行远程调用,并返回执行结果
        RpcNetTransport netTransport = new RpcNetTransport(host, port);
        Object res = netTransport.send(rpcRequest);

        return res;
    }
}

7.RpcNetTransport(修改)*

将原来的 BIO 网络通信修改成 Netty 代码,总体与 RpcServer 的代码有点类似

public class RpcNetTransport {

    String host;
    int port;

    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Object send(RpcRequest rpcRequest) throws InterruptedException {
		
        NioEventLoopGroup wokergroup = new NioEventLoopGroup();
		
        TransportHandler transportHandler = new TransportHandler();
		
		// 创建客户端
        Bootstrap client = new Bootstrap();
        client.group(wokergroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();

                         // 1.自定义协议(InvokerProtocol)的解码器 
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                        pipeline.addLast(new LengthFieldPrepender(4));
                        
                        // 2.对象参数类型的编解码器  
                        pipeline.addLast("encoder",new ObjectEncoder());
                        pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, 

                        // 3.接收 Provider 处理结果的自定义 Channel Handler
                        pipeline.addLast(transportHandler);
                    }
                });

        // 建立连接
        ChannelFuture future = client.connect(host, port).sync();
        // 发送请求信息
        future.channel().writeAndFlush(rpcRequest);
        future.channel().closeFuture().sync();

        // 返回请求信息
        return transportHandler.getResponse();
    }
	
	// 内部类
    static class TransportHandler extends ChannelInboundHandlerAdapter {

        private Object response;

        // 接收到消息时触发该回调
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            response = msg;
        }

        public Object getResponse() {
            return response;
        }
    }
}

好了,到此我们对 BIO 到 Netty 的改造就完成了,由于我们只是做了底层实现的优化,所以测试代码并不用改变,这也是实际开发中项目迭代的一个基本要求,要兼容老版本。

结果测试(未变)

在这里插入图片描述
api

public interface TestService {

    String test(String name);
}

Provider

服务实现类:

@RpcService
public class TestServiceImpl implements TestService {

    @Override
    public String test(String name) {
        System.out.println("new requst coming..." + name);

        Random random = new Random();
        String json = "{"name":" + """ + name + """ + ", "age":" + random.nextInt(40) + "}";
        return json;
    }
}

发布服务:

public class Provider {

    public static void main(String[] args) throws InterruptedException {

        RpcServer proxyServer = new RpcServer();
        // 注:这里不用再传入服务实例
        proxyServer.publisher(8080);
    }
}

Consumer

远程调用服务:

public class Consumer {

    public static void main(String[] args) {

        // 创建代理对象
        RpcProxyClient rpcProxyClient = new RpcProxyClient();

        // 创建一个代理对象
        TestService service = rpcProxyClient.clientProxy(TestService.class, "localhost", 8080);
        // test() 会进行远程调用
        String json = service.test("老五");
        System.out.println(json);
    }
}

结果如下:
在这里插入图片描述

完整代码我放到 GitHub 上了,有需要参考的同学点击这里跳转…

最后

以上就是虚心项链为你收集整理的【RPC】手写简易 RPC 框架 --重构,实现基于 Netty 通信的全部内容,希望文章能够帮你解决【RPC】手写简易 RPC 框架 --重构,实现基于 Netty 通信所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(62)

评论列表共有 0 条评论

立即
投稿
返回
顶部