我是靠谱客的博主 欢喜紫菜,最近开发中收集的这篇文章主要介绍手写一个简单的RPC框架,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

最近在学习Netty框架,为了学以致用,决定自己实现一个简单的RPC框架,该项目采用Netty进行通信,使用Spring进行搭建,目前已经实现了RPC框架的基本功能,其余的改进尚在进一步完善中,github地址:https://github.com/13350747533/HahaRpc

在这里记录一下简单的思路:
服务注册与发现:
首先是服务的注册与发现,本项目使用的是zookeeper来作为注册中心,使用ZKClient来对zookeeper的节点进行管理。

服务发现的代码:

public String discovery(String serviceName) {
        //创建zookeeper节点
        ZkClient zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT, Constant.ZK_CONNECTION_TIMEOUT);
        LOGGER.debug("connect zookeeper");
        try{
            //获取service节点
            String servicePath = Constant.ZK_REGISTRY_PATH + "/" + serviceName;
            if(!zkClient.exists(servicePath)){
                throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));
            }
            List<String> addressList = zkClient.getChildren(servicePath);
            if(CollectionUtil.isEmpty(addressList)){
                throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));
            }
            //获取 address 节点

            String address;
            int size = addressList.size();
            if(size == 1) {
                //若只有一个地址
                address = addressList.get(0);
                LOGGER.debug("get only address node: {}", address);
            }else{
                address = addressList.get(ThreadLocalRandom.current().nextInt(size));
                LOGGER.debug("get random address node: {}", address);
            }
            //获取address节点的值
            String addressPath = servicePath + "/" + address;
            return zkClient.readData(addressPath);
        }finally {
            zkClient.close();
        }

    }

服务注册的代码:

private final ZkClient zkClient;

    public ZooKeeperServiceRegistry(String zkAddress) {
        zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT,Constant.ZK_CONNECTION_TIMEOUT);
        LOGGER.debug("connect zookeeper");
    }

    @Override
    public void registry(String serviceName, String serviceAddress) {
        //创建registry节点 (持久)
        String registryPath = Constant.ZK_REGISTRY_PATH;
        if(!zkClient.exists(registryPath)){
            zkClient.createPersistent(registryPath);
            LOGGER.debug("create registry node :{}", registryPath);
        }
        //创建Service节点 (持久)
        String servicePath = registryPath + "/" + serviceName;
        if(!zkClient.exists(servicePath)){
            zkClient.createPersistent(servicePath);
            LOGGER.debug("create service node : {}", servicePath);
        }
        //创建Address节点 (临时)
        String addressPath = servicePath + "/address-";
        String addressNode = zkClient.createEphemeralSequential(addressPath,serviceAddress);
        LOGGER.debug("create address node : {}", addressNode);
    }

处理完了服务的注册与发现以后,我们先来看一下服务端的代码。
首先我们要确定哪些方法需要被远程调用,为此我们自定义一个RPCService注解。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {

    /**
     * 服务接口类
     * @return
     */
    Class<?> value();

    /**
     * 服务版本号
     * @return
     */
    String version() default "";

    /**
     * 序列化方式
     * @return
     */
//    String serlize() default "protostuff";


}

该注解继承了spring的@component,因此在spring容器被初始化的时候所有带有@RPCService注解的方法都会被扫描到。
我们定义一个NettyServer类,它要实现ApplicationContextAware, InitializingBean这两个接口。然后实现setApplicationContext方法,这个方法会完成上面提到的扫描。

@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //扫描带有RpcService注解的类并初始化 handleMapduixiang
        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);
        if(MapUtils.isNotEmpty(serviceBeanMap)){
            for(Object serviceBean : serviceBeanMap.values()) {
                RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
                String serviceName = rpcService.value().getName();
                String serviceVersion = rpcService.version();
//                String serialize = rpcService.serlize();
                if (StringUtil.isNotEmpty(serviceVersion)){
                    serviceName += "-" + serviceVersion;
                }
                handlerMap.put(serviceName, serviceBean);
//                LOGGER.debug("handlerMap is {}", handlerMap);
            }
        }
    }

同时,我们还要实现 afterPropertiesSet()方法,该方法在spring的bean的生命周期中会在属性填充之后调用,我们利用这个方法来开启netty服务端:

public void afterPropertiesSet() throws Exception {
        super.start();
    }

在父类中,有我们的netty服务器的相关代码和zookeeper注册的相关代码:

public void run() {
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                try{
                    //创建并初始化Netty服务器 BootStrap 对象
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup);
                    bootstrap.channel(NioServerSocketChannel.class);
                    bootstrap.childHandler((new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //解码,编码,并处理请求
                            pipeline.addLast(new RpcDecoder(RpcRequest.class, serlizer));
                            pipeline.addLast(new RpcEncoder(RpcResponse.class, serlizer));
                            pipeline.addLast(new RpcServerHandler(handlerMap, threadPoolExecutor));
                        }
                    }));
                    bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
                    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
                    //获取RPC服务器的IP和端口号
                    String[] addressArray = StringUtil.split(serviceAddress,":");
                    String ip = addressArray[0];
                    int port = Integer.parseInt(addressArray[1]);
                    //启动RPC服务器
                    ChannelFuture future = bootstrap.bind(ip, port).sync();
                    //注册RPC服务器
//                    LOGGER.debug("serviceRegistry is {}", serviceRegistry);
                    if (serviceRegistry != null) {
                        LOGGER.debug("1 handleMap is {}", handlerMap);
                        for(String interfaceName : handlerMap.keySet()){
                            serviceRegistry.registry(interfaceName, serviceAddress);
                            LOGGER.debug("registry service: {} => {}", interfaceName, serviceAddress);
                        }
                    }
                    LOGGER.debug("server started on port {}", port);
                    //关闭 RPC服务器
                    future.channel().closeFuture().sync();
                }finally{
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                }
            }
        });

        thread.start();

这里我们可以看到自定义的解码和编码的类RpcEncoder和RpcDecoder,以及Rpc的响应和请求的自定义类RpcRequest,RpcResponse,现在让我们展时先放一放,稍后再来看看这几个类。
现在服务端收到了来自客户端的请求,我们需要对其进行处理,定义RpcServerHandler类继承自SimpleChannelInboundHandler,在其中重写channelRead0方法:

public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
        threadPoolExecutor.execute(new Runnable() {
            public void run() {
                //创建并初始化RPC响应对象
                RpcResponse response = new RpcResponse();
                LOGGER.debug("cannelRead0start, request is {}", request);
                response.setRequestId(request.getRequestId());
                try{
                    Object result = handle(request);
                    response.setResult(result);
                } catch (Exception e) {
                    LOGGER.error("handler result failuer", e);
                    response.setException(e);
                }
                LOGGER.debug("channelread0 voer, response is {}", response);
                //写入RPC响应对象并自动关闭连接
                ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
            }
        });
    }

在handle方法中,使用反射的方法进行调用:

private Object handle(RpcRequest request) throws Exception {
        //获取服务对象
//        LOGGER.debug("Request is {}", request);
        String serviceName = request.getInterfaceName();
        String serviceVersion = request.getServiceVersion();
        if(StringUtil.isNotEmpty(serviceVersion)) {
            serviceName += "-" + serviceVersion;
        }
        Object serviceBean = handlerMap.get(serviceName);
        if (serviceBean == null) {
            throw new RuntimeException(String.format("can not find service bean by key : %s", serviceName));
        }
        // 获取反射调用所需的参数
        Class<?> serviceClass = serviceBean.getClass();
        String methodName = request.getMethodName();
        Class<?>[] parameterTypes = request.getParametersTypes();
        Object[] parameters = request.getParameters();
        //反射调用方法
        Method method = serviceClass.getMethod(methodName, parameterTypes);
        method.setAccessible(true);
        return method.invoke(serviceBean, parameters);
        //使用cglib
//        FastClass serviceFastClass = FastClass.create(serviceClass);
//        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
//        return serviceFastMethod.invoke(serviceBean, parameters);

    }

服务端的代码到此为止,接下来让我们看看client端的代码。
Client目前只有两个类,首先是比较简单的RpcClient:

public RpcResponse send(RpcRequest request) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //创建并初始化netty客户端Bootstrap对象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast(new RpcEncoder(RpcRequest.class, serializer));    //编码RPC请求
                    pipeline.addLast(new RpcDecoder(RpcResponse.class, serializer));  //解码rpc响应
                    pipeline.addLast(RpcClient.this); //处理RPC响应
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            //连接RPC服务器
            ChannelFuture future = bootstrap.connect(host, port).sync();
            //写入RPC请求数据并关闭连接
            Channel channel = future.channel();
            channel.writeAndFlush(request).sync();
            channel.closeFuture().sync();
            //返回RPC响应对象
            return response;
        } finally {
            group.shutdownGracefully();
        }
    }

就是一个标准的netty客户端的写法,接下来的是RpcProxy:

public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
        //创建动态代理对象

        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 创建RPC请求对象并设置请求属性
                        RpcRequest request = new RpcRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setInterfaceName(method.getDeclaringClass().getName());
                        request.setServiceVersion(serviceVersion);
                        request.setMethodName(method.getName());
                        request.setParametersTypes(method.getParameterTypes());
                        request.setParameters(args);

                        //获取 RPC服务地址
                        if (serviceDiscovery != null) {
                            String serviceName = interfaceClass.getName();
                            if (StringUtil.isNotEmpty(serviceVersion)) {
                                serviceName += "-" + serviceVersion;
                            }
                            serviceAddress = serviceDiscovery.discovery(serviceName);
                            LOGGER.debug("discovery service: {} => {}", serviceName, serviceAddress);
                        }
                        if (StringUtil.isEmpty(serviceAddress)) {
                            throw new RuntimeException("server address is empty");
                        }
                        //从RPC服务地址中解析主机名与端口号
                        String[] array = StringUtil.split(serviceAddress,":");
                        String host = array[0];
                        int port = Integer.parseInt(array[1]);
                        //创建RPC客户端对象并发送RPC请求
                        RpcClient client = new RpcClient(host, port);
                        long time = System.currentTimeMillis();
                        RpcResponse response = client.send(request);
                        LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
                        if(response == null){
                            throw new RuntimeException("response is null");
                        }
                        //返回RPC响应结果
                        if(response.hasException()){
                            return response.getException();
                        }else{
                            return response.getResult();
                        }
                    }
                }
        );
    }

在create方法里面,封装请求体,调用send方法,请求服务提供者。

最后在让我们来看看之前的几个类:

public class RpcRequest {
    private String requestId;
    private String interfaceName;
    private String serviceVersion;
    private String methodName;
    private Class<?>[] parametersTypes;
    private Object[] parameters;
}
public class RpcResponse {

    private String requestId;
    private Exception exception;
    private Object result;

    public boolean hasException(){
        return exception != null;
    }
}
public class RpcDecoder extends ByteToMessageDecoder {
    private Class<?> genericClass;
    Serializer serializer;

    public RpcDecoder(Class<?> genericClass, Serializer serializer){
        this.serializer = serializer;
        this.genericClass = genericClass;
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        if(byteBuf.readableBytes() < 4){
            return;
        }
        byteBuf.markReaderIndex();
        int dataLength = byteBuf.readInt();
        if(byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);
        list.add(SerializationUtil.deserialize(data, genericClass));

    }
}
public class RpcEncoder extends MessageToByteEncoder {
    private Class<?> genericClass;
    private Serializer serializer;

    public RpcEncoder(Class<?> genericClass, Serializer serializer) {
        this.serializer = serializer;
        this.genericClass = genericClass;
    }


    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        if(genericClass.isInstance(o)) {
            byte[] data = SerializationUtil.serialize(o);
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }
}

RpcEncoder和RpcDecoder都继承自netty自带的解码编码器,我们可以为其指定自己的序列化方式。

最后

以上就是欢喜紫菜为你收集整理的手写一个简单的RPC框架的全部内容,希望文章能够帮你解决手写一个简单的RPC框架所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部