概述
在接入前,先自定义一组传输对象,而不是原来传输一个Invocation,返回则是一个String,这里需要用到netty的编解码器,当然这里可以用netty已经实现好的对象编解码、第三方的Probuff编解码器,而如果想要实现自己的传输协议,就要继承netty的编解码,他可以用来自定义解析对象以及封装对象,这里对对象处理采用本来是采用gson,但是有很多bug,便用了另一种进行序列化,这样自定义协议的话,可以做很多其他的处理,比如协议的魔数、格式校验等等,自定义协议的处理
定一个序列化接口
/**
* @author: lele
* @date: 2019/11/19 下午5:15
* 对象-字节转换接口
*/
public interface RpcSerializer {
/**
* 序列化
* @param target
* @return
*/
byte[] serialize(Object target);
/**
* 反序列化
* @param target
* @param clazz
* @param <T>
* @return
* @throws Exception
*/
<T> T deserialize(byte[] target,Class<T> clazz) throws Exception;
}
具体的序列化实现类,需要引入protostuff-stuff,protostuff-runtime依赖
public class ProtobufSerializer {
private static Map<Class, Schema> schemaMap = new HashMap<Class, Schema>();
// objenesis是一个小型Java类库用来实例化一个特定class的对象。
private static Objenesis objenesis = new ObjenesisStd(true);
// 存储模式对象映射
private static Schema getSchema(Class cls) {
Schema schema = schemaMap.get(cls);
if (null == schema) {
schema = RuntimeSchema.createFrom(cls);
if (null != schema) {
schemaMap.put(cls, schema);
}
}
return schema;
}
public byte[] serialize(Object target) {
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
Class cls = target.getClass();
try {
Schema schema = getSchema(cls);
byte[] bytes = ProtobufIOUtil.toByteArray(target, schema, buffer);
return bytes;
} catch (Exception e) {
throw new IllegalStateException(e);
} finally {
buffer.clear();
}
}
public <T> T deserialize(byte[] target, Class<T> clazz) throws Exception {
try {
T instance = objenesis.newInstance(clazz);
Schema schema = getSchema(clazz);
ProtobufIOUtil.mergeFrom(target, instance, schema);
return instance;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
//单例
private static class Holder {
private static final ProtobufSerializer j = new ProtobufSerializer();
}
public static ProtobufSerializer getInstance() {
return ProtobufSerializer.Holder.j;
}
}
然后定义请求/返回的对象,这里定义的requestID用于标记该次请求,作用往后再谈
/**
* @author: lele
* @date: 2019/11/15 下午7:01
* 封装调用方所想调用的远程方法信息
*/
@Data
@AllArgsConstructor
public class RpcRequest {
private String requestId;
private String interfaceName;
private String methodName;
private Object[] params;
//防止重载
private Class[] paramsTypes;
}
/**
* @author: lele
* @date: 2019/11/19 下午5:12
*/
@Data
public class RpcResponse {
private String requestId;
private Object result;
private String error;
}
定义处理对象的编解码器
/**
* @author: lele
* @date: 2019/11/19 下午5:16
* 把字节转为实体类,如byte->request供后续处理
*/
public class RpcDecoder extends ByteToMessageDecoder {
//处理的对象
private Class<?> target;
private ProtobufSerializer serializer=ProtobufSerializer.getInstance();
public RpcDecoder(Class<?> target) {
this.target = target;
}
/**
* byte转实体
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
System.out.println("收到字节");
//如果小于一个int的长度,不作处理
if (byteBuf.readableBytes() < 4) {
return;
}
//获取数据长度
int dataLength = byteBuf.readInt();
//写入byte数组
byte[] data = new byte[dataLength];
byteBuf.readBytes(data);
//解码转成对象
Object res = serializer.deserialize(data, target);
//给后面的handler处理
list.add(res);
}
}
/**
* @author: lele
* @date: 2019/11/19 下午5:16
* 把实体变为字节,可用于req->byte、response->byte
*/
public class RpcEncoder extends MessageToByteEncoder {
//处理的对象
private Class<?> entity;
ProtobufSerializer serializer=ProtobufSerializer.getInstance();
public RpcEncoder(Class<?> entity) {
this.entity = entity;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
System.out.println("转成字节");
if (entity.equals(o.getClass())) {
byte[] data=serializer.serialize(o);
//写入消息长度,这里还可以写入版本号、魔数等协议信息
byteBuf.writeInt(data.length);
//写入消息主体
byteBuf.writeBytes(data);
}
}
}
这样,传输对象及其序列化、相应的编解码器就写好了,把原来自带的对象编解码器替换掉即可
而以前那种{interfaceName:{url:具体实现类}的zk存储方式不适合具体的业务处理,以接口为粒度不方便管理,而且现有的设计是每请求一次,就会新建一个client,不能复用,造成资源浪费,然后client如果保持和以接口获取的服务进行链接,当请求的接口时,client就无法复用,比如下图,client可以与serverA保持链接后,访问interfaceC,interfaceA都是可以的,如果调用interfaceB,就不能保持请求
但如果以client和对应服务来写,则不会出现上述情况,也可以保持链接以复用
最后设计为/服务名/url的格式,这次更改的结果是,相应的获取可用地址、注册服务的格式也要进行更改。
接下来就是接入spring了,有了上面的基础,这时候我们可以仿feign的功能,把某一个包下带有某个注解的接口注入spring中,同时为这些接口生成代理,当执行这些接口的方法时,进行动态代理访问远端接口
定义作用在要代理的接口上,这里的name为服务名,zk则在注册服务改为——/服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class(下面server端就是这样处理)
/**
* @author: lele
* @date: 2019/11/20 下午2:44
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
//用于接口上,name为服务名,zk则在注册服务改为 服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class
public @interface RpcStudyClient {
String name();
}
流程为:自定义一个可以动态注册bean的类,然后通过获取所要扫描的包,添加扫描条件,符合扫描条件之后,再复写判断是否为接口,是的话对其进行注册,注册时通过对象工厂来定制化生成bean,这里是加入了动态代理拦截其方法进行远程调用
/**
* @author: lele
* @date: 2019/11/20 下午3:06
* 第一个接口获取注册bean能力,第二个接口获取类加载器,仿feignregister写法
*/
public class RpcStudyClientRegisty implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {
private ClassLoader classLoader;
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (DemoApplication.mode == 1) {
//获取指定路径中注解bean定义扫描器
ClassPathScanningCandidateComponentProvider scanner = getScanner();
//获取扫描的包,通过enable那个注解的属性
Set<String> basePackages = getBasePackages(importingClassMetadata);
//添加过滤规则,属于rpcstudyclient的加入,excludeFilter则是排除
scanner.addIncludeFilter(new AnnotationTypeFilter(RpcStudyClient.class));
Set<BeanDefinition> candidateBeans = new HashSet<>();
//获取符合条件的bean
for (String basePackage : basePackages) {
Set<BeanDefinition> candidateComponents = scanner.findCandidateComponents(basePackage);
candidateBeans.addAll(candidateComponents);
}
//spring中用BeanDefintion来表示bean,这里判断bean类型是否合适,合适就注册
for (BeanDefinition candidateBean : candidateBeans) {
//如果bean还没有注册
if (!registry.containsBeanDefinition(candidateBean.getBeanClassName())) {
//判读是否含有注解
if (candidateBean instanceof AnnotatedBeanDefinition) {
//存储该类信息的bean,methodMetadata(方法),AnnotationMetadata(里面也包括methodMetadata,可以获取注解,类信息等等)
AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) candidateBean;
//获取bean的类信息
AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();
//判断其否为接口
Assert.isTrue(annotationMetadata.isInterface(), "@RpcStudeyClient注解只能用在接口上");
Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(RpcStudyClient.class.getCanonicalName());
this.registerRpcClient(registry, annotationMetadata, attributes);
}
}
}
}
}
//注册bean
private void registerRpcClient(BeanDefinitionRegistry registry,
AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
//获取bean类名
String className = annotationMetadata.getClassName();
//使用自定义的对象工厂定制化生成bean
BeanDefinitionBuilder definition = BeanDefinitionBuilder
.genericBeanDefinition(RpcStudyClientFactoryBean.class);
//设置根据类型的注入方式
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
definition.addPropertyValue("type", className);
String name = attributes.get("name") == null ? "" : (String) (attributes.get("name"));
String alias = name + "RpcStudyClient";
//获取bean基类
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
//防止其他有实现,设置此实现为首要
beanDefinition.setPrimary(true);
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
new String[]{alias});
//注册bean
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
//复写bean扫描的判断
protected ClassPathScanningCandidateComponentProvider getScanner() {
return new ClassPathScanningCandidateComponentProvider(false
) {
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
//存放注解相关信息,具备了class、注解的信息
AnnotationMetadata metadata = beanDefinition.getMetadata();
//是否是独立能创建对象的,比如class、内部类、静态内部类
if (metadata.isIndependent()) {
//用于过滤注解为@RpcClient的注解
if (metadata.isInterface() &&
metadata.getInterfaceNames().length == 1 &&
Annotation.class.getName().equals(metadata.getInterfaceNames()[0])) {
try {
Class<?> target = ClassUtils.forName(metadata.getClassName(),
RpcStudyClientRegisty.this.classLoader);
return !target.isAnnotation();
} catch (Exception ex) {
this.logger.error(
"Could not load target class: " + beanDefinition.getMetadata().getClassName(), ex);
}
}
return true;
}
return false;
}
};
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
//获取需要扫描的包位置
protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
Map<String, Object> attributes = importingClassMetadata
.getAnnotationAttributes(EnableRpcStudyClient.class.getCanonicalName());
String[] scanPackages = (String[]) attributes.get("basePackages");
Set<String> basePackages = new HashSet<>();
if (scanPackages.length > 0) {
//扫描指定包
for (String pkg : scanPackages) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
} else {
//扫描主入口所在的包
basePackages.add(
ClassUtils.getPackageName(importingClassMetadata.getClassName()));
}
return basePackages;
}
}
处理接口的FactoryBean,为每个接口生成代理
/**
* @author: lele
* @date: 2019/11/20 下午3:08
* bean工厂类,这里为接口代理其方法
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class RpcStudyClientFactoryBean implements FactoryBean<Object> {
private Class<?> type;
@Override
public Object getObject() throws Exception {
return ProxyFactory.getProxy(this.type);
}
@Override
public Class<?> getObjectType() {
return this.type;
}
}
把上面的注册配置类注入到该注解
/**
* @author: lele
* @date: 2019/11/20 下午2:42
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(RpcStudyClientRegisty.class)
public @interface EnableRpcStudyClient {
//扫描的包,如果为空,根据启动类所在的包名扫描
String[] basePackages() default {};
}
需要扫描的接口上添加RpcStudyClient注解
@RpcStudyClient(name="user")
public interface HelloService {
String sayHello(String userName);
String qq();
}
启动类上添加@EnableRpcStudyClient注解指定要扫描的包,这样就完成了client端的处理
然后到server端注册具体的服务实现类,定义一个spring会扫描有该注解的类
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcStudyService {
// 用来指定实现的接口
Class<?> value();
}
服务注册类,核心是通过getBeanWithAnnonation来获取相应的bean,然后存储在map里面,并且在初始化后注册本实例的ip地址,netty的handler则通过这个map直接获取实例来执行相应的方法
@Component
@Data
public class RpcStudyRegister implements InitializingBean,ApplicationContextAware {
public static Map<String,Object> serviceMap;
@Value("${spring.application.name}")
private String name;
@Value("${rpcstudy.port}")
private Integer port;
@Override
public void afterPropertiesSet() throws Exception {
if(DemoApplication.mode==0){
String hostAddress = InetAddress.getLocalHost().getHostName();
URL url=new URL(hostAddress,port);
Protocol server= ProtocolFactory.netty();
//注册服务
ZkRegister.register(name,url);
server.start(url);
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//把有RpcStudyService注解的bean添加到map里面,key为该注解的接口
if(DemoApplication.mode==0){
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcStudyService.class);
if (beans != null && beans.size() > 0) {
serviceMap = new HashMap<>(beans.size());
for (Object o : beans.values()) {
RpcStudyService rpcService = o.getClass().getAnnotation(RpcStudyService.class);
String interfaceName = rpcService.value().getName();
serviceMap.put(interfaceName, o);
}
serviceMap = Collections.unmodifiableMap(serviceMap);
}
}
}
}
这样当本地的netty获取class时,可以通过该map从传来的request的interface为参数来获取相应的实现类
这里由于没有做模块划分,而server和client也需要分开,所以在启动类上面定义一个mode变量,mode为0时为client端,只启动tomcat服务,mode为1时是server端只启动netty,可以看到上面的client/server的register类在相应模式下才做处理,启动client(手动改mode=1)和server(手动改mode=0),访问localhost:8080/t2,得到下面的结果
client:
server:
项目地址:https://github.com/97lele/rpcstudy/tree/withspring
最后
以上就是清爽路人为你收集整理的基于netty、zookeeper手写RPC框架之三——接入Spring的全部内容,希望文章能够帮你解决基于netty、zookeeper手写RPC框架之三——接入Spring所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复