我是靠谱客的博主 自由手套,最近开发中收集的这篇文章主要介绍基于zk和netty的简易版RPC,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

简易版RPC

RPC是什么?

简单的回答 : 远程服务调用.

应用在哪里?

在dubbo中有应用.

为什么要学习它?

为了更好的理解dubbo框架以及其他的东西.

手写RPC并解析相关知识点(这才是重点)

基础知识点
实现架构(从角色角度分析)

引入官网架构图:
链接: link.
在这里插入图片描述

简单版本结构图
在这里插入图片描述
https://www.processon.com/diagraming/601fd9fee0b34d208a6a2924

具体实现

以下会贴入代码及解释及测试
工程结构

在这里插入图片描述
rpc-common : 属于公共依赖(定义公共接口, 定义规范)
rpc-consumer1: 属于消费端实现(主要通过协议与服务端通信)
rpc-provider1: 属于服务端(服务提供者)实现

基本依赖
 <!--引入netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.39.Final</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <!--<version>1.2.47</version>-->
            <version>1.2.71</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.2</version>
        </dependency>
         <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-expression</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>
         <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.2.6.RELEASE</version>
        </dependency>
          <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--<version>5.1.46</version>-->
            <version>5.1.20</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.21</version>
        </dependency>

        <!--zkclient客户端-->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>
        <!--zookeeper 案例 -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

以上依赖有许多并不是专门为这个RPC服务的, 但是我这里也贴出来了,主要是这个工程还有其他的功能,这不做介绍.

具体实现

rpc-common工程:
IUserService : 接口定义

    /**
     *
     * @param msg
     * @return
     */
    String sayHello(String msg);
JsonSerializer : 序列化接口实现
public class JsonSerializer implements Serializer {

    @Override
    public byte[] serialize(Object o) throws IOException {
        return JSON.toJSONBytes(o);
    }

    @Override
    public <T> T deserialize(Class<?> clazz, byte[] bytes) throws IOException {
        //反序列化
        return JSON.parseObject(bytes, clazz);
    }
}
RpcDecoder : rpc解码器
public class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
    private Class<?> clazz;
    private Serializer serializer;
    public RpcDecoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf,
                          List<Object> out) throws Exception {
        int readInt = byteBuf.readInt();
        byte[] bytes = new byte[readInt];
        byteBuf.readBytes(bytes);
        //反序列化
        Object deserialize = serializer.deserialize(clazz, bytes);
        out.add(deserialize);
    }
}
RpcEncoder : rpc编码器
public class RpcEncoder extends MessageToByteEncoder {
    private Class<?> clazz;
    private Serializer serializer;
    public RpcEncoder(Class<?> clazz, Serializer serializer) {
        this.clazz = clazz;
        this.serializer = serializer;
    }
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg,
                          ByteBuf out) throws Exception {
        /*
            将对象 写入到缓存 :
            写入 int + byte
         */
        if(clazz != null && clazz.isInstance(msg)) {
            byte[] bytes = serializer.serialize(msg);
            out.writeInt(bytes.length);
            out.writeBytes(bytes);
        }
    }
}
RpcRequest : rpc请求协议对象
  • 传输 协议对象
  • 请求服务id
  • 方法名
  • 类名
  • 形式参数
  • 实际参数
@Data
public class RpcRequest implements Serializable {
    private static final long serialVersionUID = -4172554571135234360L;
    private String requestId;//请求服务id
    private String className;//方法名
    private String methodName;//类名
    private Class<?>[] parameterTypes;//形式参数
    private Object[] parameters;//实际参数
}
RpcResult : rpc返回结果(服务端 -> 客户端)
@Data
public class RpcResult implements Serializable {
    private static final long serialVersionUID = 1561074744495374547L;
    private boolean success;
    private String message;
    private Object data;
}

Serializer : 序列化接口
public interface Serializer {
    /*
    接口 :
     */
    /**
     * 序列化
     * @param o
     * @return
     * @throws IOException
     */
    byte[] serialize(Object o)throws IOException;
    /**
     * 反序列化
     * @param clazz
     * @param bytes
     * @param <T>
     * @return
     * @throws IOException
     */
    <T> T deserialize(Class<?> clazz, byte[] bytes) throws IOException;
}

ZookeeperUtils:

public class ZookeeperUtils {
    /**
     *
     */
    private static CuratorFramework client;
    /**
     *
     * @return
     */
    public static CuratorFramework getClient() {
        if(client == null) {
            synchronized (ZookeeperUtils.class) {
                if(null == client) {
                    client = createClient();
                }
            }
        }
        return client;
    }
    /**
     *
     * @return
     */
    private static CuratorFramework createClient() {
        RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        //使用fluent变成风格
        client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(3000)
                .retryPolicy(exponentialBackoffRetry)
                .namespace("lagou-servers")  //独立的命名空间
                .build();
        client.start();
        System.out.println("zookeeper会话创建了.");
        return client;
    }

    /**
     * 测试创建会话
     * @param args
     */
    public static void main(String[] args) {
        createClient();
    }
}
以上涉及知识点: 
1: netty的编解码器, 拆包,粘包,自定义协议.
2: netty的服务处理器的入站,出站顺序及配置.
3: netty的ByteBuf.
4: java的反射.
5: netty的事件触发点.
6: alibaba的fastjson序列化反序列化.
7: zookeeper的基础知识,zookeeper节点的特性.
	zookeeper节点的数据结构: 
	zookeeper节点的分类:

rpc-provider1工程:
ServerBoot

@Service
public class ServerBoot implements ApplicationRunner {
    private static String IP = "127.0.0.1";
    private static Integer PORT = 9010;
    /**
     * @param applicationArguments
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        startNettyServer(IP, PORT);
    }
    //*************启动netty
    /**
     * @param ip
     * @param port
     * @throws Exception
     */
    private static void startNettyServer(String ip, int port) throws Exception {
        /*
        1: 引导器
        2: 线程池
        3: 处理器
        4: 监听端口
         */
        //1.创建两个线程池对象
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        //2.创建服务端的启动引导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //3.配置启动引导对象
        serverBootstrap.group(bossGroup, workGroup)
                //设置通道为NIO
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //这个配置要注意: RpcResult + RpcRequest;
                        //我们根据Encoder和Decoder来分入站和出站
                        //根据RpcRequest : 知道这个是要Encoder的
                        //知道RpcResult是要Decoder的.
                        pipeline.addLast(new RpcEncoder(RpcResult.class, new JsonSerializer()));
                        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JsonSerializer()));
                        //业务处理类
                        pipeline.addLast(new UserServiceHandler(SpringContextHolder.getApplicationContext()));
                    }
                });

        //这里的sync() 
        serverBootstrap.bind(port).sync();
        //链接zk : 创建临时有序节点
        CuratorFramework client = ZookeeperUtils.getClient();
        String path = "/serverNodes/node";
        //在这里创建 lagou-servers
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                .forPath(path, (ip + ":" + port).getBytes());
        //赋值为 当前服务器的 ip和端口 --》 可以和远程客户端进行通信
    }
}
UserServiceHandler
public class UserServiceHandler extends ChannelInboundHandlerAdapter {
    private ApplicationContext applicationContext;
    public UserServiceHandler(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest rpcRequest = null;
        if(null == msg) {
            returnFailRst(ctx, "netty服务端: 协议对象不能为空!");
            return;
        }
        rpcRequest = (RpcRequest) msg;

        System.out.println("netty服务端: 服务端接收到的协议对象为: " + rpcRequest);
        if(StringUtils.isEmpty(rpcRequest.getRequestId())) {
            returnFailRst(ctx, "netty服务端: 请求的协议对象ID不能为空!!!");
        }
//        Object bean = applicationContext.getBean(rpcRequest.getRequestId());
        Object bean = SpringContextHolder.getApplicationContext().getBean(rpcRequest.getRequestId());
        if(bean == null) {
            returnFailRst(ctx, "netty服务端: 服务ID" + rpcRequest.getRequestId() + "不存在!");
        }
        //构造反射
        Method method = ReflectionUtils.findMethod(bean.getClass(), rpcRequest.getMethodName(),
                rpcRequest.getParameterTypes());
        Object result = ReflectionUtils.invokeMethod(method, bean, rpcRequest.getParameters());
        returnSuccRst(ctx, result);
        System.out.println("netty服务端: 返回结果 " + result);
    }
    /**
     * 构造成功结果返回
     * @param ctx
     * @param result
     */
    private void returnSuccRst(ChannelHandlerContext ctx, Object result) {
        RpcResult rpcResult = new RpcResult();
        rpcResult.setSuccess(true);
        rpcResult.setData(result);
        rpcResult.setMessage("netty服务端: 处理成功!" + result);
        ctx.writeAndFlush(rpcResult);
    }
    /**
     * 返回 失败的结果
     * @param ctx
     * @param s
     */
    private void returnFailRst(ChannelHandlerContext ctx,String s) {
        RpcResult rpcResult = new RpcResult();
        rpcResult.setSuccess(false);
        rpcResult.setMessage("netty服务端: 处理失败!" + s);
        ctx.writeAndFlush(rpcResult);
    }
}
SpringContextHolder
@Component
public class SpringContextHolder implements ApplicationContextAware,DisposableBean {
    private static ApplicationContext applicationContext = null;
    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        SpringContextHolder.applicationContext = context;
    }
    /**
     * 提供外部访问
     * @return
     */
    public static ApplicationContext getApplicationContext() {
        if (applicationContext == null) {
            throw new IllegalStateException("applicaitonContext属性未注入, 请在SpringBoot启动类中注册SpringContextHolder.");
        }else {
            return applicationContext;
        }
    }
    @Override
    public void destroy() throws Exception {
        applicationContext = null;
//        SpringContextHolder.applicationContext = null;
    }
    //***********提供一系列 获取 bean的方法
    /**
     * 通过name获取 Bean
     * @param name 类名称
     * @return 实例对象
     */
    public static Object getBean(String name){
        return getApplicationContext().getBean(name);
    }
    /**
     * 通过class获取Bean
     */
    public static <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }
    /**
     * 通过name,以及Clazz返回指定的Bean
     */
    public static <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name, clazz);
    }
}
UserServiceImpl
@Service(value = "IUserService")
public class UserServiceImpl implements IUserService {
    private static Integer PORT = 9010;
    @Override
    public String sayHello(String msg) {
        System.out.println("这是端口("+ PORT +")服务端的第一个方法:sayHello---》 " + msg);
        int i = RandomUtil.randomInt(3);
        ThreadUtil.sleep(i * 1000);
        return "这是端口("+ PORT +")服务器返回数据 : " + msg;
    }
    @Override
    public String sayHelloWorld(String msg) {
        System.out.println("这是端口("+ PORT +")服务端的第二个方法sayHelloWorld---》 " + msg);
        return "这是端口("+ PORT +")服务端的第二个方法返回数据 : " + msg;
    }
}
LaGouV1Application
以上涉及知识点: 
1: springboot
2: springboot的自动装配
3: 其他

rpc-consumer1工程:
RpcConsumer

public class RpcConsumer {
    /**
     * 线程池
     */
    private static ExecutorService executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors()
    );
    /**
     * 记录 服务端 处理器
     */
    private volatile static Map<String, UserClientHandler> userClientHandlerMap;

    /**
     * 记录最近一次服务处理 时间
     */
    private volatile static Map<String, Long> userClientHandlerTime = new ConcurrentHashMap<>();
    /**
     *
     */
    private static volatile long count = 0;
    /*
    1: 启动所有客户端
    2: 注册监听
     */
    private static void init() throws Exception {
        CuratorFramework client = ZookeeperUtils.getClient();
        String path = "/serverNodes";
        //初始化所有的客户端连接
        initClients(client, path);

        //注册zk 父路径监听
        registerZkPathListener(client, path);
    }
    /**
     * 注册路径监听
     * @param client
     * @param path
     */
    private static void registerZkPathListener(CuratorFramework client, String path) throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                ChildData childData = event.getData();
                PathChildrenCacheEvent.Type eventType = event.getType();
                System.out.println("zk节点监控: childData: " + childData);
                System.out.println("zk节点监控: eventType: " + eventType);
                switch (eventType) {
                    case CONNECTION_RECONNECTED:
                        pathChildrenCache.rebuild();
                        break;
                    case CONNECTION_SUSPENDED:
                        break;
                    case CONNECTION_LOST:
                        System.out.println("Connection lost.");
                        break;
                    case CHILD_ADDED:
                        addClient(client, event.getData());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("Child updated");
                        break;
                    case CHILD_REMOVED:
                        System.out.println("节点移除!");
                        delClient(client, event.getData());
                        break;
                    default:
                            break;
                }
            }
        });
    }
    /**
     * 移除 服务 端连接
     * @param client
     * @param data
     */
    private static void delClient(CuratorFramework client, ChildData data) {
        System.out.println("zk节点监控: 移除childData : " + data);
        byte[] dataData = data.getData();
        UserClientHandler userClientHandler = userClientHandlerMap.get(client);
        if(userClientHandler == null) {
            System.out.println("zk节点监控: 服务节点: " + client + "不存在!");
        }
        String serverKey = new String(dataData);
        if(userClientHandler.isState()) {
            System.out.println("zk节点监控: 服务节点:" + serverKey + "是激活的不能被删除!!!");
        } else {
            userClientHandlerMap.remove(serverKey);
            System.out.println("zk节点监控: delete server:" + serverKey);
        }
    }
    /**
     * 新增 服务 端连接
     * @param client
     * @param data
     */
    private static void addClient(CuratorFramework client, ChildData data) throws InterruptedException {
        System.out.println("zk节点监控: 新增ChildData: " + data);
        byte[] dataData = data.getData();
        String serverKey = new String(dataData);
        System.out.println("zk节点监控: add server :" + serverKey);
        String[] split = serverKey.split(":");
        initClient(split[0], split[1]);
    }

    /**
     * 初始化 和所有 服务端的链接 : netty 链接
     * @param client
     * @param path
     */
    private synchronized static void initClients(CuratorFramework client, String path) {
        if(userClientHandlerMap == null) {
            userClientHandlerMap = new HashMap<>(64);
        }
        try {
            List<String> list = client.getChildren().forPath(path);
            //
            for (String link : list) {
                String allPath = path + "/" + link;
                System.out.println("zk节点监控: 已注册服务节点路径: " + allPath);
                byte[] bytes = client.getData().forPath(allPath);
                String s = new String(bytes);
                System.out.println("zk节点监控: 已注册服务节点路径的值: " + s);
                String[] split = s.split(":");
                //初始化netty客户 端连接
                initClient(split[0], split[1]);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     *
     * @param ip
     * @param port
     */
    private static void initClient(String ip, String port) throws InterruptedException {
        //这儿的 UserClientHandler 将会被设置一个值 存放到容器当中, 然后会被取出发送客户端请求
        UserClientHandler userClientHandler = new UserClientHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JsonSerializer()));
                        pipeline.addLast(new RpcDecoder(RpcResult.class, new JsonSerializer()));
                        pipeline.addLast(userClientHandler);
                    }
                });
        bootstrap.connect(ip, Integer.parseInt(port)).sync();
        //打印这个存储
        userClientHandlerMap.put(ip + ":" + port, userClientHandler);
        System.out.println("初始化存储: 容器存储: " + (ip + ":" + port) + " = " + userClientHandler);
    }
    /**
     *
     * @return
     */
    private static Integer clearClients() {
        for (Map.Entry<String, UserClientHandler> key : userClientHandlerMap.entrySet()) {
            UserClientHandler userClientHandler = userClientHandlerMap.get(key);
            if(userClientHandler != null) {
                if(!userClientHandler.isState()) {
                    System.out.println("zk节点监控: 从服务列表中移除服务节点:" + key);
                    userClientHandlerMap.remove(key);
                    userClientHandlerTime.remove(key);
                }
            }

        }
        return userClientHandlerMap.size();
    }
    //现在来发请求 --> 获取远程服务响应 (netty + zookeeper)
    //只有接口, 通过代理 + 协议完成
    /**
     *
     * @param proxyClass
     * @return
     */
    public Object createProxy(Class<?> proxyClass) {
        Object proxyInstance = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{proxyClass},
                new RpcInvokeInvocationHandler());
        return proxyInstance;
    }
    /**
     * rpc 远程服务 反射调用类
     */
    private class RpcInvokeInvocationHandler implements InvocationHandler {
        /**
         * 代理类生成后, 在调用原方法的时候会执行这儿的逻辑 todo
         * @param proxy
         * @param method
         * @param args
         * @return
         * @throws Throwable
         */
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            /*
            1: 初始化所有的连接
            2: 初始化处理器容器
             */
            if(userClientHandlerMap == null) {
                init();
            }
            //判断当前节点是否可用 : 判断是否有可用的节点
            Integer availableCount = clearClients();
            if(availableCount == 0) {
                throw new RuntimeException("代理: 无服务节点可用!!!");
            }
            //构造协议对象
            RpcRequest rpcRequest = new RpcRequest();
            //获取服务类名
            //根据方法可以推理出来
            String className = method.getDeclaringClass().getName();
            String packageName = method.getDeclaringClass().getPackage().getName();
            //获取类
            String serviceName = StrUtil.removeAll(className, packageName + '.');
            System.out.println("代理: className: " + className + "tpackageName: " + packageName + "tserviceName: " + serviceName);
            rpcRequest.setRequestId(serviceName);
            rpcRequest.setClassName(className);
            rpcRequest.setMethodName(method.getName());
            rpcRequest.setParameters(args);
            rpcRequest.setParameterTypes(method.getParameterTypes());
            System.out.println("代理: rpcRequest: " + rpcRequest);
            LoadBalanceService loadBalanceService;
            loadBalanceService = new ServiceLBRandom();
            //获取服务key :
            String serverKey = loadBalanceService.getService();
            System.out.println("代理: serverkey: " + serverKey);
            //这个key 是从服务处理容器中随机获取的
            UserClientHandler userClientHandler = userClientHandlerMap.get(serverKey);
            //发送消息
            userClientHandler.setToSendParam(rpcRequest);
            //使用线程池异步接收
            long begin = DateUtil.current();
            Object result = executorService.submit(userClientHandler).get();
            long spendMs = DateUtil.spendMs(begin);
            //记录耗时
            getUserClientHandlerTime().put(serverKey, spendMs);
            System.out.println("代理: call server:" + serverKey+",耗时:" + spendMs + "ms");
            //返回结果
            return result;
        }
    }

    /**
     *
     * @return
     */
    public static Map<String, Long> getUserClientHandlerTime() {
        return userClientHandlerTime;
    }

    public static Map<String, UserClientHandler> getUserClientHandlerMap() {
        return userClientHandlerMap;
    }
}
UserClientHandler
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    /*
    1: 异步发送
    2: 线程池接收结果
    3: 发送数据
    4: 记录接收结果
    5: 加入等待唤醒机制 wait() notify()
    6: 上下文对象 Ctx ChannelHandlerContext
     */
    private ChannelHandlerContext ctx;
    /**
     * 发送参数
     */
    private Object toSendParam;
    /**
     * 结束结果
     */
    private Object toRecvResult;
    /**
     * 标识当前服务器的状态
     */
    private boolean state = false;
    /**
     * 设置发送 请求信息
     * @param toSendParam
     */
    public void setToSendParam(Object toSendParam) {
        this.toSendParam = toSendParam;
    }
    /**
     * 获取服务器状态 state
     * @return
     */
    public boolean isState() {
        return state;
    }
    /**
     * 阻塞等待结果
     * @return
     * @throws Exception
     */
    @Override
    public synchronized Object call() throws Exception {
        System.out.println("发送对象: " + this.toSendParam.toString());
        this.ctx.writeAndFlush(this.toSendParam);
        System.out.println("netty客户端: 睡眠线程(线程名) : " + Thread.currentThread().getName());
        wait();
        return toRecvResult;
    }
    /**
     * 激活 与服务段连接后发生事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
        this.state = true;
    }
    /**
     * 与服务端 断开连接事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("netty客户端: 断开连接.");
        this.state = false;
    }
    /**
     * 读取 服务端发送的数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(null != msg) {
            RpcResult rpcResult = (RpcResult) msg;
            //成功失败判断
            if(rpcResult.isSuccess()) {
                this.toRecvResult = rpcResult.getData();
            } else {
                new RuntimeException(rpcResult.getMessage());
            }
        }
        System.out.println("netty客户端: 唤醒线程(唤醒线程名) : " + Thread.currentThread().getName());
        notify();
    }
}
LoadBalanceService
public interface LoadBalanceService {
    /**
     * 获取服务
     * @return
     */
    String getService();
}
ServiceLBLeastTime
public class ServiceLBLeastTime implements LoadBalanceService {
    @Override
    public String getService() {
        Map<String, UserClientHandler> userClientHandlerMap = RpcConsumer.getUserClientHandlerMap();
        Map<String, Long> userClientHandlerTime = RpcConsumer.getUserClientHandlerTime();
        Set<String> services = userClientHandlerMap.keySet();
        long dr=-1;
        String serverKey="";
        //循环得到耗时最短的服务节点
        for (String service : services) {
            boolean b = userClientHandlerTime.containsKey(service);
            if(b) {
                Long aLong = userClientHandlerTime.get(service);
                if(aLong < dr || dr < 0) {
                    dr = aLong;
                    serverKey = service;
                }
            } else {
                //一次都没执行过 说明 没有承担负载
                serverKey = service;
                break;
            }
        }
        return serverKey;
    }
}
ServiceLBRandom
public class ServiceLBRandom implements LoadBalanceService {

    @Override
    public String getService() {
        Map<String, UserClientHandler> userClientHandlerMap = RpcConsumer.getUserClientHandlerMap();
        String[] keys = userClientHandlerMap.keySet().toArray(new String[0]);
        System.out.println("keys: " + keys);
        for (String x : keys) {
            System.out.println("keys : " + x);
        }
        int i = RandomUtil.randomInt(keys.length);
        String key = keys[i];
        System.out.println("随机获得的服务key : " + key);
        return key;
    }

    public static void main(String[] args) {
        HashMap<String, String> map = new HashMap<>();
        map.put("1", "1");
        map.put("3", "3");
        map.put("2", "2");
        map.put("5", "5");
        String[] strings = map.keySet().toArray(new String[0]);
        System.out.println("strings: " + strings);
        for (String x : strings) {
            System.out.println("keys : " + x);
        }
        /*
keys : 1
keys : 2
keys : 3
keys : 5
         */

    }
}

扩展点思考

1: 如何定义一个订单服务呢?
首先我们在rpc-common工程中定义一个订单接口IOrderService, 然后我们在
rpc-provider中写好OrderServiceImpl实现类, 实现订单接口.
最后我们在消费端创建代理对象, 然后传输对应的协议对象进行请求订单服务即可.

2: 待扩展.
3: 关于负载均衡那一块可以使用策略模式进行优化,加入其它的算法,加入权重等值进行判断, 待优化.
4: 关于ip和端口的配置可以引入动态配置

遇到问题

1: netty在连接服务端后,写了如下代码, 出现了发送请求没有收到任何响应的情况
客户端这么写之后发送请求直接没有任何响应,没有任何报错,也没有任何结果.

 ChannelFuture channelFuture = bootstrap.connect().sync();
 channelFuture.channel().closeFuture();

2: notify()和wait()方法的使用必须结合sychronized关键字, 否则线程无法唤醒,或者是报非法的监视器异常: illegalMonitorException.

最后

以上就是自由手套为你收集整理的基于zk和netty的简易版RPC的全部内容,希望文章能够帮你解决基于zk和netty的简易版RPC所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部