我是靠谱客的博主 清爽路人,最近开发中收集的这篇文章主要介绍基于netty、zookeeper手写RPC框架之三——接入Spring,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在接入前,先自定义一组传输对象,而不是原来传输一个Invocation,返回则是一个String,这里需要用到netty的编解码器,当然这里可以用netty已经实现好的对象编解码、第三方的Probuff编解码器,而如果想要实现自己的传输协议,就要继承netty的编解码,他可以用来自定义解析对象以及封装对象,这里对对象处理采用本来是采用gson,但是有很多bug,便用了另一种进行序列化,这样自定义协议的话,可以做很多其他的处理,比如协议的魔数、格式校验等等,自定义协议的处理

定一个序列化接口

/**
 * @author: lele
 * @date: 2019/11/19 下午5:15
 * 对象-字节转换接口
 */
public interface RpcSerializer {
    /**
     * 序列化
     * @param target
     * @return
     */
    byte[] serialize(Object target);

    /**
     * 反序列化
     * @param target
     * @param clazz
     * @param <T>
     * @return
     * @throws Exception
     */
    <T> T deserialize(byte[] target,Class<T> clazz) throws Exception;
}

具体的序列化实现类,需要引入protostuff-stuff,protostuff-runtime依赖

public class ProtobufSerializer  {
    private static Map<Class, Schema> schemaMap = new HashMap<Class, Schema>();

    // objenesis是一个小型Java类库用来实例化一个特定class的对象。
    private static Objenesis objenesis = new ObjenesisStd(true);

    // 存储模式对象映射
    private static Schema getSchema(Class cls) {
        Schema schema = schemaMap.get(cls);
        if (null == schema) {
            schema = RuntimeSchema.createFrom(cls);
            if (null != schema) {
                schemaMap.put(cls, schema);
            }
        }
        return schema;
    }

    public byte[] serialize(Object target) {
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        Class cls = target.getClass();

        try {
            Schema schema = getSchema(cls);
            byte[] bytes = ProtobufIOUtil.toByteArray(target, schema, buffer);
            return bytes;
        } catch (Exception e) {
            throw new IllegalStateException(e);
        } finally {
            buffer.clear();
        }
    }

    public <T> T deserialize(byte[] target, Class<T> clazz) throws Exception {
        try {
            T instance = objenesis.newInstance(clazz);
            Schema schema = getSchema(clazz);
            ProtobufIOUtil.mergeFrom(target, instance, schema);
            return instance;
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

//单例
    private static class Holder {
        private static final ProtobufSerializer j = new ProtobufSerializer();
    }

    public static ProtobufSerializer getInstance() {
        return ProtobufSerializer.Holder.j;
    }
}

然后定义请求/返回的对象,这里定义的requestID用于标记该次请求,作用往后再谈

/**
 * @author: lele
 * @date: 2019/11/15 下午7:01
 * 封装调用方所想调用的远程方法信息
 */
@Data
@AllArgsConstructor
public class RpcRequest  {

    private String requestId;

    private String interfaceName;

    private String methodName;

    private Object[] params;
    //防止重载
    private Class[] paramsTypes;
}



/**
 * @author: lele
 * @date: 2019/11/19 下午5:12
 */
@Data
public class RpcResponse  {
    private String requestId;

    private Object result;

    private String error;
}

定义处理对象的编解码器

/**
 * @author: lele
 * @date: 2019/11/19 下午5:16
 * 把字节转为实体类,如byte->request供后续处理
 */
public class RpcDecoder extends ByteToMessageDecoder {
    //处理的对象
    private Class<?> target;

    private ProtobufSerializer serializer=ProtobufSerializer.getInstance();

    public RpcDecoder(Class<?> target) {
        this.target = target;
    }

    /**
     * byte转实体
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
        System.out.println("收到字节");
        //如果小于一个int的长度,不作处理
        if (byteBuf.readableBytes() < 4) {
            return;
        }
        //获取数据长度
        int dataLength = byteBuf.readInt();
        //写入byte数组
        byte[] data = new byte[dataLength];
        byteBuf.readBytes(data);
        //解码转成对象
        Object res = serializer.deserialize(data, target);
        //给后面的handler处理
        list.add(res);

    }
}



/**
 * @author: lele
 * @date: 2019/11/19 下午5:16
 * 把实体变为字节,可用于req->byte、response->byte
 */
public class RpcEncoder extends MessageToByteEncoder {
    //处理的对象
    private Class<?> entity;
    ProtobufSerializer serializer=ProtobufSerializer.getInstance();

    public RpcEncoder(Class<?> entity) {
        this.entity = entity;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        System.out.println("转成字节");
        if (entity.equals(o.getClass())) {
            byte[] data=serializer.serialize(o);
            //写入消息长度,这里还可以写入版本号、魔数等协议信息
            byteBuf.writeInt(data.length);
            //写入消息主体
            byteBuf.writeBytes(data);
        }
    }
}

这样,传输对象及其序列化、相应的编解码器就写好了,把原来自带的对象编解码器替换掉即可

而以前那种{interfaceName:{url:具体实现类}的zk存储方式不适合具体的业务处理,以接口为粒度不方便管理,而且现有的设计是每请求一次,就会新建一个client,不能复用,造成资源浪费,然后client如果保持和以接口获取的服务进行链接,当请求的接口时,client就无法复用,比如下图,client可以与serverA保持链接后,访问interfaceC,interfaceA都是可以的,如果调用interfaceB,就不能保持请求

但如果以client和对应服务来写,则不会出现上述情况,也可以保持链接以复用

最后设计为/服务名/url的格式,这次更改的结果是,相应的获取可用地址、注册服务的格式也要进行更改。

接下来就是接入spring了,有了上面的基础,这时候我们可以仿feign的功能,把某一个包下带有某个注解的接口注入spring中,同时为这些接口生成代理,当执行这些接口的方法时,进行动态代理访问远端接口

定义作用在要代理的接口上,这里的name为服务名,zk则在注册服务改为——/服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class(下面server端就是这样处理)

/**
 * @author: lele
 * @date: 2019/11/20 下午2:44
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)

//用于接口上,name为服务名,zk则在注册服务改为 服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class
public @interface RpcStudyClient {

    String name();
}

流程为:自定义一个可以动态注册bean的类,然后通过获取所要扫描的包,添加扫描条件,符合扫描条件之后,再复写判断是否为接口,是的话对其进行注册,注册时通过对象工厂来定制化生成bean,这里是加入了动态代理拦截其方法进行远程调用


/**
 * @author: lele
 * @date: 2019/11/20 下午3:06
 * 第一个接口获取注册bean能力,第二个接口获取类加载器,仿feignregister写法
 */

public class RpcStudyClientRegisty implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {


    private ClassLoader classLoader;

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {


        if (DemoApplication.mode == 1) {
            //获取指定路径中注解bean定义扫描器
            ClassPathScanningCandidateComponentProvider scanner = getScanner();
            //获取扫描的包,通过enable那个注解的属性
            Set<String> basePackages = getBasePackages(importingClassMetadata);
            //添加过滤规则,属于rpcstudyclient的加入,excludeFilter则是排除
            scanner.addIncludeFilter(new AnnotationTypeFilter(RpcStudyClient.class));

            Set<BeanDefinition> candidateBeans = new HashSet<>();
            //获取符合条件的bean
            for (String basePackage : basePackages) {
                Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
                candidateBeans.addAll(candidateComponents);
            }
            //spring中用BeanDefintion来表示bean,这里判断bean类型是否合适,合适就注册
            for (BeanDefinition candidateBean : candidateBeans) {
                //如果bean还没有注册
                if (!registry.containsBeanDefinition(candidateBean.getBeanClassName())) {
                    //判读是否含有注解
                    if (candidateBean instanceof AnnotatedBeanDefinition) {
                        //存储该类信息的bean,methodMetadata(方法),AnnotationMetadata(里面也包括methodMetadata,可以获取注解,类信息等等)
                        AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) candidateBean;
                        //获取bean的类信息
                        AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
                        //判断其否为接口
                        Assert.isTrue(annotationMetadata.isInterface(), "@RpcStudeyClient注解只能用在接口上");
                        Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(RpcStudyClient.class.getCanonicalName());

                        this.registerRpcClient(registry, annotationMetadata, attributes);
                    }
                }
            }
        }

    }

    //注册bean
    private void registerRpcClient(BeanDefinitionRegistry registry,
                                   AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {

        //获取bean类名
        String className = annotationMetadata.getClassName();
        //使用自定义的对象工厂定制化生成bean
        BeanDefinitionBuilder definition = BeanDefinitionBuilder
                .genericBeanDefinition(RpcStudyClientFactoryBean.class);
        //设置根据类型的注入方式
        definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);

        definition.addPropertyValue("type", className);
        String name = attributes.get("name") == null ? "" : (String) (attributes.get("name"));

        String alias = name + "RpcStudyClient";
        //获取bean基类
        AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
        //防止其他有实现,设置此实现为首要
        beanDefinition.setPrimary(true);
        BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
                new String[]{alias});
        //注册bean
        BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
    }


    //复写bean扫描的判断
    protected ClassPathScanningCandidateComponentProvider getScanner() {
        return new ClassPathScanningCandidateComponentProvider(false
        ) {
            @Override
            protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
                //存放注解相关信息,具备了class、注解的信息
                AnnotationMetadata metadata = beanDefinition.getMetadata();
                //是否是独立能创建对象的,比如class、内部类、静态内部类
                if (metadata.isIndependent()) {
                    //用于过滤注解为@RpcClient的注解
                    if (metadata.isInterface() &&
                            metadata.getInterfaceNames().length == 1 &&
                            Annotation.class.getName().equals(metadata.getInterfaceNames()[0])) {
                        try {
                            Class<?> target = ClassUtils.forName(metadata.getClassName(),
                                    RpcStudyClientRegisty.this.classLoader);
                            return !target.isAnnotation();
                        } catch (Exception ex) {
                            this.logger.error(
                                    "Could not load target class: " + beanDefinition.getMetadata().getClassName(), ex);
                        }
                    }
                    return true;
                }
                return false;

            }
        };
    }

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.classLoader = classLoader;
    }

    //获取需要扫描的包位置
    protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
        Map<String, Object> attributes = importingClassMetadata
                .getAnnotationAttributes(EnableRpcStudyClient.class.getCanonicalName());
        String[] scanPackages = (String[]) attributes.get("basePackages");
        Set<String> basePackages = new HashSet<>();

        if (scanPackages.length > 0) {
            //扫描指定包
            for (String pkg : scanPackages) {
                if (StringUtils.hasText(pkg)) {
                    basePackages.add(pkg);
                }
            }
        } else {
            //扫描主入口所在的包
            basePackages.add(
                    ClassUtils.getPackageName(importingClassMetadata.getClassName()));
        }
        return basePackages;
    }

}

处理接口的FactoryBean,为每个接口生成代理

/**
 * @author: lele
 * @date: 2019/11/20 下午3:08
 * bean工厂类,这里为接口代理其方法
 */

@Data
@EqualsAndHashCode(callSuper = false)
public class RpcStudyClientFactoryBean implements FactoryBean<Object> {
    private Class<?> type;
    @Override
    public Object getObject() throws Exception {
        return ProxyFactory.getProxy(this.type);
    }

    @Override
    public Class<?> getObjectType() {
        return this.type;
    }
}

把上面的注册配置类注入到该注解

/**
 * @author: lele
 * @date: 2019/11/20 下午2:42
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(RpcStudyClientRegisty.class)
public @interface EnableRpcStudyClient {
    //扫描的包,如果为空,根据启动类所在的包名扫描
    String[] basePackages() default {};
}

需要扫描的接口上添加RpcStudyClient注解

@RpcStudyClient(name="user")
public interface HelloService {
    String sayHello(String userName);
    String qq();
}

启动类上添加@EnableRpcStudyClient注解指定要扫描的包,这样就完成了client端的处理

然后到server端注册具体的服务实现类,定义一个spring会扫描有该注解的类

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcStudyService {
    // 用来指定实现的接口
    Class<?> value();
}

服务注册类,核心是通过getBeanWithAnnonation来获取相应的bean,然后存储在map里面,并且在初始化后注册本实例的ip地址,netty的handler则通过这个map直接获取实例来执行相应的方法

@Component
@Data
public class RpcStudyRegister implements InitializingBean,ApplicationContextAware {
    public static Map<String,Object> serviceMap;
    @Value("${spring.application.name}")
    private String name;

    @Value("${rpcstudy.port}")
    private Integer port;



    @Override
    public void afterPropertiesSet() throws Exception {
        if(DemoApplication.mode==0){
            String hostAddress = InetAddress.getLocalHost().getHostName();
            URL url=new URL(hostAddress,port);
            Protocol server= ProtocolFactory.netty();
            //注册服务
            ZkRegister.register(name,url);
            server.start(url);
        }

    }

     @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //把有RpcStudyService注解的bean添加到map里面,key为该注解的接口
        if(DemoApplication.mode==0){
            Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcStudyService.class);
            if (beans != null && beans.size() > 0) {
                serviceMap = new HashMap<>(beans.size());
                for (Object o : beans.values()) {
                    RpcStudyService rpcService = o.getClass().getAnnotation(RpcStudyService.class);
                    String interfaceName = rpcService.value().getName();
                    serviceMap.put(interfaceName, o);
                }
                serviceMap = Collections.unmodifiableMap(serviceMap);

            }
        }

    }
}

这样当本地的netty获取class时,可以通过该map从传来的request的interface为参数来获取相应的实现类

这里由于没有做模块划分,而server和client也需要分开,所以在启动类上面定义一个mode变量,mode为0时为client端,只启动tomcat服务,mode为1时是server端只启动netty,可以看到上面的client/server的register类在相应模式下才做处理,启动client(手动改mode=1)和server(手动改mode=0),访问localhost:8080/t2,得到下面的结果

client:

server:

项目地址:https://github.com/97lele/rpcstudy/tree/withspring

最后

以上就是清爽路人为你收集整理的基于netty、zookeeper手写RPC框架之三——接入Spring的全部内容,希望文章能够帮你解决基于netty、zookeeper手写RPC框架之三——接入Spring所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部