概述
一、 RPC基础知识
1.1、RPC是什么
RPC 【Remote Procedure Call】是指远程过程调用,是一种进程间通信方式,是一种技术思想,而不是规范。它允许程序调用另一个地址空间(网络的另一台机器上)的过程或函数,而不用开发人员显式编码这个调用的细节。调用本地方法和调用远程方法一样。
1.2、 RPC基本原理
- 服务调用方 client(客户端)以本地调用方式调用服务;
- client stub(客户端存根:存放服务端地址信息,将客户端的请求参数编组成网络消息,再通过网络发送给服务方)接收到调用后负责将方法、参数等编组成能够进行网络传输的消息体;在Java中就是序列化的过程
- client stub找到服务地址,并将消息通过网络发送到服务端(server);
- server stub(服务端存根:接受客户端发送过来的消息并解组,再调用本地服务,再将本地服务执行结果发送给客户端)收到消息后进行解组;
- server stub根据解组结果调用本地的服务;
- 本地服务执行处理逻辑;
- 本地服务将结果返回给server stub;
- server stub将返回结果编组成网络消息;
- server stub将编组后的消息通过网络并发送客户端
- client stub接收到消息,并进行解组;
- 服务调用方client得到最终结果。
1.3、 RPC协议是什么
RPC调用过程中需要将参数编组为消息进行发送,接受方需要解组消息为参数,过程处理结果同样需要经编组、解组。消息由哪些部分构成及消息的表示形式就构成了消息协议。
RPC调用过程中采用的消息协议称为RPC协议
RPC协议规定请求、响应消息的格式
在TCP(网络传输控制协议)上可选用或自定义消息协议来完成RPC消息交互
我们可以选用通用的标准协议(如:http、https),也也可根据自身的需要定义自己的消息协议。
1.4、 RPC框架是什么
封装好参数编组、消息解组、底层网络通信的RPC程序,可直接在其基础上只需专注与过程业务代码编码,无需再关注其调用细节
目前常见的RPC框架
Dubbo、gRPC、gRPC、Apache Thrift、RMI…等
二、手写RPC框架
下面将一步步来写一个精简版的RPC框架,使项目引入该框架后,通过简单的配置让项目拥有提供远程服务与调用的能力
2.1、 服务端编写
2.1.1、 服务端都需要完成哪些?
首先服务端要停工远程服务,就必须具备服务注册及暴露的能力;在这之后还需要开启网络服务,供客户端连接。有些项目可能即使服务提供者同时又是服务消费者,那么什么时候注册暴露服务,什么时候注入消费服务呢?在这我就引入了一个RPC监听处理器的概念,就有这个处理器来完成服务的注册暴露,以及服务消费注入
2.1.2、具体实现
2.1.2.1、 服务暴露注解
哪些服务需要注册暴露这里使用自定义注解的方式来标注:@Service
/**
* @Author Lijl
* @AnnotationTypeName Service
* @Description 被该注解标记的服务可提供远程访问的能力
* @Date 2022/2/14 14:32
* @Version 1.0
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Component
@Inherited
public @interface Service {
String value() default "";
String version() default "";
long timeout() default 0L;
}
2.1.2.2、服务注册(暴露)
/**
* @Author Lijl
* @InterfaceName ServiceRegister
* @Description 定义服务注册
* @Date 2022/2/15 15:14
* @Version 1.0
*/
public interface ServiceRegister {
void register(List<ServiceObject> so) throws Exception;
ServiceObject getServiceObject(String name) throws Exception;
}
/**
* @Author Lijl
* @ClassName DefaultServiceRegister
* @Description 默认服务注册
* @Date 2022/2/15 15:19
* @Version 1.0
*/
public abstract class DefaultServiceRegister implements ServiceRegister{
private Map<String, ServiceObject> serviceMap = new HashMap<>();
protected String protocol;
protected int port;
/**
* @Author lijl
* @MethodName register
* @Description 缓存服务持有对象
* @Date 16:10 2022/3/11
* @Version 1.0
* @param soList
* @return: void
**/
@Override
public void register(List<ServiceObject> soList) throws Exception {
if (soList==null&&soList.size()>0){
throw new IllegalAccessException("Service object information cannot be empty");
}
soList.forEach(so -> this.serviceMap.put(so.getName(), so));
}
/**
* @Author lijl
* @MethodName getServiceObject
* @Description 获取服务持有对象
* @Date 16:11 2022/3/11
* @Version 1.0
* @param name
* @return: com.huawei.rpc.server.register.ServiceObject
**/
@Override
public ServiceObject getServiceObject(String name) {
return this.serviceMap.get(name);
}
}
/**
* @Author Lijl
* @ClassName ZookeeperExportServiceRegister
* @Description Zookeeper服务注册,提供服务注册、服务暴露
* @Date 2022/2/15 15:26
* @Version 1.0
*/
public class ZookeeperExportServiceRegister extends DefaultServiceRegister {
/**
* zk客户端
*/
private ZkClient client;
public ZookeeperExportServiceRegister(String zkAddress, int port, String protocol){
this.client = new ZkClient(zkAddress);
this.client.setZkSerializer(new ZookeeperSerializer());
this.port = port;
this.protocol = protocol;
}
/**
* @Author lijl
* @MethodName register
* @Description 缓存服务持有对象,并注册服务
* @Date 15:38 2022/2/15
* @Version 1.0
* @param soList 服务持有者集合
* @return: void
**/
@Override
public void register(List<ServiceObject> soList) throws Exception {
super.register(soList);
for (ServiceObject so : soList) {
ServiceInfo serviceInfo = new ServiceInfo();
String host = InetAddress.getLocalHost().getHostAddress();
String address = host + ":" + port;
serviceInfo.setAddress(address);
serviceInfo.setName(so.getName());
serviceInfo.setProtocol(protocol);
this.exportService(serviceInfo);
}
}
/**
* @Author lijl
* @MethodName exportService
* @Description 暴露服务
* @Date 15:38 2022/2/15
* @Version 1.0
* @param serviceInfo 需要暴露的服务信息
* @return: void
**/
private void exportService(ServiceInfo serviceInfo) {
String serviceName = serviceInfo.getName();
String uri = JSON.toJSONString(serviceInfo);
try {
uri = URLEncoder.encode(uri, CommonConstant.UTF_8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String servicePath = CommonConstant.ZK_SERVICE_PATH + CommonConstant.PATH_DELIMITER + serviceName + CommonConstant.PATH_DELIMITER + "service";
if (!client.exists(servicePath)){
client.createPersistent(servicePath,true);
}
String uriPath = servicePath + CommonConstant.PATH_DELIMITER + uri;
if (client.exists(uriPath)){
client.delete(uriPath);
}
client.createEphemeral(uriPath);
}
}
这个过程其实没有详说的必要,就是将指定ServiceObject对象序列化后保存到ZK上,供客户端发现。同时会将服务对象缓存起来,在客户端调用服务时,通过缓存的ServiceObject对象反射指定服务,调用方法;其实看方法上打的注释也能看明白每步都做了什么
2.1.2.3、开启网络服务、处理接收到的客户端请求
/**
* @Author Lijl
* @ClassName RpcService
* @Description RPC抽象服务端
* @Date 2022/2/15 16:09
* @Version 1.0
*/
public abstract class RpcServer {
/**
* 服务端口
*/
protected int port;
/**
* 服务协议
*/
protected String protocol;
/**
* 请求处理者
*/
protected RequestHandler handler;
public RpcServer(int port, String protocol, RequestHandler handler){
super();
this.port = port;
this.protocol = protocol;
this.handler = handler;
}
/**
* 开启服务
*/
public abstract void start();
/**
* 停止服务
*/
public abstract void stop();
}
/**
* @Author Lijl
* @ClassName NettyRpcService
* @Description Netty RPC服务端,提供Netty网络服务开启、关闭,接收客户端请求及消息后的处理
* @Date 2022/2/15 16:28
* @Version 1.0
*/
@Slf4j
public class NettyRpcServer extends RpcServer{
private Channel channel;
public NettyRpcServer(int port, String protocol, RequestHandler handler){
super(port,protocol,handler);
}
@Override
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ChannelRequestHandler());
}
});
//启动服务
ChannelFuture future = sb.bind(port).sync();
log.info("Server starteed successfully.");
channel = future.channel();
//等待服务通道关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void stop() {
if (this.channel!=null){
this.channel.close();
}
}
private class ChannelRequestHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active: {}",ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("The server receives a message: {}",msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] req = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(req);
byte[] res = handler.handleRequest(req);
log.info("Send response: {}",msg);
ByteBuf respBuf = Unpooled.buffer(res.length);
respBuf.writeBytes(res);
ctx.write(respBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
log.error("Exception occurred: {}",cause.getMessage());
ctx.close();
}
}
}
/**
* @Author Lijl
* @ClassName RequestHandler
* @Description 请求处理器,提供解组请求,编组响应操作
* @Date 2022/2/15 16:10
* @Version 1.0
*/
public class RequestHandler {
private MessageProtocol protocol;
private ServiceRegister serviceRegister;
public RequestHandler(MessageProtocol protocol,ServiceRegister serviceRegister){
super();
this.protocol = protocol;
this.serviceRegister = serviceRegister;
}
/**
* @Author lijl
* @MethodName handleRequest
* @Description 处理客户端请求参数,调用本地服务
* @Date 16:26 2022/2/15
* @Version 1.0
* @param data
* @return: byte[]
**/
public byte[] handleRequest(byte[] data) throws Exception{
//1.解组消息
Request request = this.protocol.unmarshallingRequest(data);
//2. 查找服务对象
ServiceObject so = this.serviceRegister.getServiceObject(request.getServiceName());
Response response = null;
if (so==null){
response = Response.builder().status(Status.NOT_FOUND).build();
}else{
try {
//3.反射调用对应的过程方法
Method method = so.getClazz().getMethod(request.getMethod(), request.getParameterTypes());
Object returnVal = method.invoke(so.getObj(), request.getParameters());
response = Response.builder()
.status(Status.SUCCESS)
.returnValue(returnVal)
.build();
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException |
InvocationTargetException e) {
response = Response.builder()
.status(Status.ERROR)
.exception(e)
.build();
}
}
return this.protocol.marshallingResponse(response);
}
}
这段算是服务端的核心部分,控制服务段Netty网络服务的开启关闭;接收客户端发起的请求,将客户端发送的请求参数解组并查询客户端远程调用的过程业务过程接口,并通过反射调用返回调用结果
2.1.2.3、RPC监听处理器
开始有提到RPC监听处理器的概念,用于服务的注册暴露与服务的消费注入,这里先说下服务开启服务注册,后面说的客户端时在补充服务注入
/**
* @Author Lijl
* @ClassName DefaultRpcProcessor
* @Description rcp监听处理器 负责暴露服务、自动注入
* @Date 2022/2/26 20:43
* @Version 1.0
*/
@Slf4j
public class DefaultRpcProcessor implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
@Autowired
private ClientProxyFactory clientProxyFactory;
@Autowired
private ServiceRegister serviceRegister;
@Autowired
private RpcServer rpcService;
@SneakyThrows
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ApplicationContext applicationContext = event.getApplicationContext();
if (Objects.isNull(applicationContext.getParent())){
//开启服务
startServer(applicationContext);
//注入Service
injectService(applicationContext);
}
}
/**
* @Author lijl
* @MethodName startServer
* @Description 扫描服务注册注解,调用服务注册将服务注册到zookeeper中
* @Date 18:44 2022/2/26
* @Version 1.0
* @param applicationContext
* @return: void
**/
private void startServer(ApplicationContext applicationContext) throws Exception {
//过滤出带有服务注册注解的实例
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Service.class);
if (beans.size()!=0){
//遍历服务组装服务注册信息
for (Object o : beans.values()) {
List<ServiceObject> soList = new ArrayList<>();
Class<?> clazz = o.getClass();
Service service = clazz.getAnnotation(Service.class);
String version = service.version();
Class<?>[] interfaces = clazz.getInterfaces();
if (interfaces.length>1){
//相同接口存在不同版本号,则分别注册
for (Class<?> aClass : interfaces) {
String aClassName = aClass.getName();
if (StringUtils.hasLength(version)){
aClassName +=":"+version;
}
soList.add(new ServiceObject(aClassName,aClass,o));
}
}else{
Class<?> superClass = interfaces[0];
String aClassName = superClass.getName();
if (StringUtils.hasLength(version)){
aClassName +=":"+version;
}
soList.add(new ServiceObject(aClassName, superClass, o));
}
//调用服务注册
this.serviceRegister.register(soList);
}
rpcService.start();
}
}
/**
* @Author lijl
* @MethodName injectService
* @Description 注入远程调用服务
* @Date 19:20 2022/2/26
* @Version 1.0
* @param applicationContext
* @return: void
**/
private void injectService(ApplicationContext applicationContext) {
//...
}
/**
* @Author lijl
* @MethodName destroy
* @Description 关闭服务
* @Date 19:35 2022/2/26
* @Version 1.0
* @param
* @return: void
**/
@Override
public void destroy() {
log.info("The application will stop and the zookeeper connection will be closed");
rpcService.stop();
}
}
DefaultRpcProcessor实现了ApplicationListener,并监听了ContextRefreshedEvent事件,在Spring启动完毕过后会收到一个事件通知,基于这个机制,就可以在这里开启服务,以及注入服务。
2.2、客户端编写
2.2.1、客户端需要完成哪些
客户端想要调用远程服务,必须具备服务发现的能力;在知道有哪些服务后,还必须有服务代理来执行服务调用;客户端想要与服务端通信,必须要有相同的消息协议与网络请求的能力,即网络层功能。
2.2.2、具体实现
2.2.2.1、服务发现
/**
* @Author Lijl
* @InterfaceName IDiscovererService
* @Description 服务发现,定义服务发现规范
* @Date 2022/2/14 14:29
* @Version 1.0
*/
public interface IDiscovererService {
List<ServiceInfo> getService(String name);
}
/**
* @Author Lijl
* @ClassName ZookeeperServiceDiscoverer
* @Description 服务发现,定义以zookeeper为注册中心的服务发现
* @Date 2022/2/14 16:00
* @Version 1.0
*/
public class ZookeeperServiceDiscoverer implements IDiscovererService{
private ZkClient zkClient;
public ZookeeperServiceDiscoverer(String zkAddress){
zkClient = new ZkClient(zkAddress);
zkClient.setZkSerializer(new ZookeeperSerializer());
}
/**
* @Author lijl
* @MethodName getService
* @Description 使用Zookeeper客户端,通过服务名获取服务列表
* @Date 17:07 2022/3/11
* @Version 1.0
* @param name 服务名->接口全路径
* @return: java.util.List<com.huawei.rpc.common.info.ServiceInfo> 服务列表
**/
@Override
public List<ServiceInfo> getService(String name) {
// /simple-rpc/接口全路径/service
//从zk中获取已注册的服务
String servicePath = CommonConstant.ZK_SERVICE_PATH+CommonConstant.PATH_DELIMITER+name+CommonConstant.PATH_DELIMITER+"service";
List<String> children = zkClient.getChildren(servicePath);
return Optional.ofNullable(children).orElse(new ArrayList<>()).stream().map(str -> {
String deCh = null;
try {
deCh = URLDecoder.decode(str,CommonConstant.UTF_8);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return JSON.parseObject(deCh, ServiceInfo.class);
}).collect(Collectors.toList());
}
}
服务端是将服务注册进了Zookeeper中,所以服务发现者也使用Zookeeper来实现,通过ZkClient我们很容易发现已经注册在ZK上的服务。当然我们也可以使用其他组件作为注册中心,例如Redis
2.2.2.2、网络客户端,连接服务端,编组请求参数,解组响应参数
/**
* @Author Lijl
* @InterfaceName NetWorkClient
* @Description 网络请求客户端,定义网络请求规范
* @Date 2022/2/14 16:34
* @Version 1.0
*/
public interface NetWorkClient {
byte[] sendRequest(byte[] data, ServiceInfo serviceInfo) throws InterruptedException;
}
/**
* @Author Lijl
* @ClassName NettyNetWorkClient
* @Description Netty网络请求客户端,定义通过Netty实现网络请求
* @Date 2022/2/14 16:35
* @Version 1.0
*/
@Slf4j
public class NettyNetWorkClient implements NetWorkClient{
/**
* @Author lijl
* @MethodName sendRequest
* @Description 发送请求
* @Date 16:36 2022/2/14
* @Version 1.0
* @param data 请求数据
* @param serviceInfo 服务信息
* @return: byte[] 响应数据
**/
@Override
public byte[] sendRequest(byte[] data, ServiceInfo serviceInfo) throws InterruptedException {
String[] addrInfoArray = serviceInfo.getAddress().split(":");
String serverAddress = addrInfoArray[0];
String serverPort = addrInfoArray[1];
SendHandler sendHandler = new SendHandler(data);
byte[] respData;
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(sendHandler);
}
});
bootstrap.connect(serverAddress,Integer.parseInt(serverPort)).sync();
respData = (byte[]) sendHandler.rspData();
log.info("SendRequest get reply: {}",respData);
} finally {
//释放线程组资源
group.shutdownGracefully();
}
return respData;
}
}
/**
* @Author Lijl
* @ClassName SendHandler
* @Description 发送处理类,定义Netty入站处理
* @Date 2022/2/14 16:53
* @Version 1.0
*/
@Slf4j
public class SendHandler extends ChannelInboundHandlerAdapter {
private CountDownLatch cdl;
private Object readMsg = null;
private byte[] data;
public SendHandler(byte[] data){
this.cdl = new CountDownLatch(1);
this.data = data;
}
/**
* @Author lijl
* @MethodName channelActive
* @Description 当连接服务段成功后,发送请求数据
* @Date 16:56 2022/2/14
* @Version 1.0
* @param ctx 通道上下文
* @return: void
**/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Successful connection to server: {}",ctx);
ByteBuf buffer = Unpooled.buffer(data.length);
buffer.writeBytes(data);
log.info("Client sends message: {}",buffer);
ctx.writeAndFlush(buffer);
}
/**
* @Author lijl
* @MethodName channelRead
* @Description 读取数据,数据读取完毕释放CD锁
* @Date 16:59 2022/2/14
* @Version 1.0
* @param ctx 上下文
* @param msg 字节流
* @return: void
**/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("Client reads message: {}",msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] resp = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(resp);
readMsg = resp;
cdl.countDown();
}
/**
* @Author lijl
* @MethodName rspData
* @Description 等待读取数据完成
* @Date 17:02 2022/2/14
* @Version 1.0
* @param
* @return: java.lang.Object
**/
public Object rspData() throws InterruptedException {
cdl.await();
return readMsg;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
log.error("Exception occurred: {}",cause.getMessage());
ctx.close();
}
}
这部分前面讲的服务端请求处理类似,连接服务端(Netty),将请求参数编组后发送到服务端,待服务端接收处理后返回,再这里将服务端返回的结果进行解组
2.2.2.3、服务动态代理
/**
* @Author Lijl
* @ClassName ClientProxyFactory
* @Description 客户端代理工厂:用于创建远程服务代理类;封装编组请求、请求发送、编组响应等操作
* @Date 2022/2/14 17:13
* @Version 1.0
*/
public class ClientProxyFactory {
@Getter
@Setter
private IDiscovererService iDiscovererService;
@Getter
@Setter
private Map<String, MessageProtocol> supportMessageProtocols;
@Getter
@Setter
private NetWorkClient netWorkClient;
/**
* @Author lijl
* @MethodName getProxy
* @Description 通过Java动态代理获取服务代理类
* @Date 14:14 2022/2/15
* @Version 1.0
* @param clazz 被代理类Class
* @return: T 服务代理类
**/
public <T> T getProxy(Class<T> clazz,String version){
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, new ClientInvocationHandler(clazz,version));
}
/**
* @Author Lijl
* @ClassName ClientProxyFactory
* @Description 客户端服务代理类invoke函数细节实现
* @Date 2022/2/15 14:31
* @Version 1.0
*/
private class ClientInvocationHandler implements InvocationHandler {
private Class<?> aClass;
private String version;
public ClientInvocationHandler(Class<?> aClass,String version){
super();
this.aClass = aClass;
this.version = version;
}
private Random random = new Random();
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if ("toString".equals(methodName)){
return proxy.getClass().toString();
}
if (("hashCode").equals(methodName)){
return 0;
}
//1、获取服务信息
String serviceName = this.aClass.getName();
if (StringUtils.hasLength(this.version)){
serviceName += ":"+version;
}
List<ServiceInfo> serviceInfoList = iDiscovererService.getService(serviceName);
if (serviceInfoList==null||serviceInfoList.isEmpty()){
throw new ClassCastException("No provider available");
}
//多个服务的话随机获取一个
ServiceInfo serviceInfo = serviceInfoList.get(random.nextInt(serviceInfoList.size()));
//2、构造request对象
Request request = new Request();
request.setServiceName(serviceInfo.getName());
request.setMethod(methodName);
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
//3、请求协议编组
//获取该方法对应的协议
MessageProtocol messageProtocol = supportMessageProtocols.get(serviceInfo.getProtocol());
//编组请求
byte[] data = messageProtocol.marshallingRequest(request);
//4、条用网络层发送请求
byte[] requestData = netWorkClient.sendRequest(data, serviceInfo);
//5、解组响应信息
Response response = messageProtocol.unmarshallingResponse(requestData);
// 6、处理结果
if (response.getException()!=null){
throw response.getException();
}
return response.getReturnValue();
}
}
}
服务代理类由客户端代理工厂类产生,代理方式是基于Java的动态代理。在处理类ClientInvocationHandler的invoke函数中,定义了一系列的操作,包括获取服务、选择服务提供者、构造请求对象、编组请求对象、网络请求客户端发送请求、解组响应消息、异常处理等
2.2.2.3、注入远程服务注解
/**
* @Author Lijl
* @AnnotationTypeName Reference
* @Description 用于注入远程服务
* @Date 2022/2/14 14:37
* @Version 1.0
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface Reference {
String value() default "";
String version() default "";
long timeout() default 0L;
}
2.2.2.4、注入远程服务
/**
* @Author lijl
* @MethodName injectService
* @Description 注入远程调用服务
* @Date 16:52 2022/3/11
* @Version 1.0
* @param applicationContext
* @return: void
**/
private void injectService(ApplicationContext applicationContext) {
String[] beanNames = applicationContext.getBeanDefinitionNames();
for (String beanName : beanNames) {
Class<?> clazz = applicationContext.getType(beanName);
if (Objects.isNull(clazz)){
continue;
}
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
Reference reference = field.getAnnotation(Reference.class);
if (Objects.isNull(reference)){
continue;
}
Class<?> fieldClass = field.getType();
Object bean = applicationContext.getBean(beanName);
field.setAccessible(true);
String version = reference.version();
try {
field.set(bean,clientProxyFactory.getProxy(fieldClass,version));
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
2.2.2.5、消息协议
/**
* @Author Lijl
* @ClassName MessageProtocol
* @Description 消息协议,定义编组请求、解组请求、编组响应、解组响应规范
* @Date 2022/2/14 17:26
* @Version 1.0
*/
public interface MessageProtocol {
/**
* @Author lijl
* @MethodName marshallingRequest
* @Description 编组请求
* @Date 13:44 2022/2/15
* @Version 1.0
* @param request 请求信息
* @return: byte[] 请求字节
**/
byte[] marshallingRequest(Request request) throws Exception;
/**
* @Author lijl
* @MethodName unmarshallingRequest
* @Description 解组请求
* @Date 13:45 2022/2/15
* @Version 1.0
* @param data 请求字节
* @return: com.huawei.rpc.common.prorocol.Request 请求信息
**/
Request unmarshallingRequest(byte[] data) throws Exception;
/**
* @Author lijl
* @MethodName marshallingResponse
* @Description 编组响应
* @Date 13:46 2022/2/15
* @Version 1.0
* @param response 响应信息
* @return: byte[] 响应字节
**/
byte[] marshallingResponse(Response response) throws Exception;
/**
* @Author lijl
* @MethodName unmarshallingResponse
* @Description 解组响应
* @Date 13:47 2022/2/15
* @Version 1.0
* @param data 响应字节
* @return: com.huawei.rpc.common.prorocol.Response 响应信息
**/
Response unmarshallingResponse(byte[] data) throws Exception;
}
/**
* @Author Lijl
* @ClassName HessianSerializeMessageProrocol
* @Description Hessian 序列化信息
* @Date 2022/2/17 15:16
* @Version 1.0
*/
public class HessianSerializeMessageProrocol implements MessageProtocol{
/**
* @Author lijl
* @MethodName serialize
* @Description Hessian实现序列化
* @Date 15:23 2022/2/17
* @Version 1.0
* @param object
* @return: byte[]
**/
public byte[] serialize(Object object) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = null;
HessianOutput hessianOutput = null;
try {
byteArrayOutputStream = new ByteArrayOutputStream();
// Hessian的序列化输出
hessianOutput = new HessianOutput(byteArrayOutputStream);
hessianOutput.writeObject(object);
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
throw e;
} finally {
try {
if (byteArrayOutputStream!=null){
byteArrayOutputStream.close();
}
} catch (IOException e) {
throw e;
}
try {
if (hessianOutput!=null){
hessianOutput.close();
}
} catch (IOException e) {
throw e;
}
}
}
/**
* @Author lijl
* @MethodName deserialize
* @Description Hessian实现反序列化
* @Date 15:29 2022/2/17
* @Version 1.0
* @param employeeArray
* @return: java.lang.Object
**/
public Object deserialize(byte[] employeeArray) throws Exception {
ByteArrayInputStream byteArrayInputStream = null;
HessianInput hessianInput = null;
try {
byteArrayInputStream = new ByteArrayInputStream(employeeArray);
// Hessian的反序列化读取对象
hessianInput = new HessianInput(byteArrayInputStream);
return hessianInput.readObject();
} catch (IOException e) {
throw e;
} finally {
try {
if (byteArrayInputStream!=null){
byteArrayInputStream.close();
}
} catch (IOException e) {
throw e;
}
try {
if (hessianInput!=null){
hessianInput.close();
}
} catch (Exception e) {
throw e;
}
}
}
@Override
public byte[] marshallingRequest(Request request) throws Exception {
return serialize(request);
}
@Override
public Request unmarshallingRequest(byte[] data) throws Exception {
return (Request) deserialize(data);
}
@Override
public byte[] marshallingResponse(Response response) throws Exception {
return serialize(response);
}
@Override
public Response unmarshallingResponse(byte[] data) throws Exception {
return (Response) deserialize(data);
}
}
/**
* @Author Lijl
* @ClassName JavaSerializeMessageProtocol
* @Description java序列化信息
* @Date 2022/2/15 13:52
* @Version 1.0
*/
public class JavaSerializeMessageProtocol implements MessageProtocol{
private byte[] serialize(Object obj) throws Exception{
try(ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);) {
oos.writeObject(obj);
return baos.toByteArray();
}
}
private Object deserialize(byte[] data)throws Exception{
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data))) {
return ois.readObject();
}
}
@Override
public byte[] marshallingRequest(Request request) throws Exception {
return this.serialize(request);
}
@Override
public Request unmarshallingRequest(byte[] data) throws Exception {
return (Request) this.deserialize(data);
}
@Override
public byte[] marshallingResponse(Response response) throws Exception {
return this.serialize(response);
}
@Override
public Response unmarshallingResponse(byte[] data) throws Exception {
return (Response) this.deserialize(data);
}
}
消息协议主要是定义了客户端如何编组请求、解组响应,服务端如何解组请求、编组响应这四个操作规范。本文提供了Hessian、Java序列化与反序列化的实现,感兴趣的可以加入其他序列化技术,例如:kryo、protostuff等。
2.3、自动装配
这里是spring自动装配的一些配置很简单,就不做赘述了,不了解的可看这篇手写spring-boot-starter
/**
* @Author Lijl
* @ClassName AutoConfiguration
* @Description 自动装配
* @Date 2022/2/16 9:25
* @Version 1.0
*/
@Configuration
@EnableConfigurationProperties({ProtocolProperties.class,RegistryProperties.class})
public class AutoConfiguration {
private ProtocolProperties protocolProperties;
private RegistryProperties registryProperties;
public AutoConfiguration(ProtocolProperties protocolProperties,RegistryProperties registryProperties){
this.protocolProperties = protocolProperties;
this.registryProperties = registryProperties;
}
@Bean
public DefaultRpcProcessor defaultRpcProcessor(){
return new DefaultRpcProcessor();
}
/**
* @Author lijl
* @MethodName clientProxyFactory
* @Description 注册客户端动态代理
* @Date 20:50 2022/2/26
* @Version 1.0
* @param
* @return: com.huawei.rpc.client.ClientProxyFactory
**/
@Bean
public ClientProxyFactory clientProxyFactory(){
ClientProxyFactory clientProxyFactory = new ClientProxyFactory();
//设置服务发现者
String address = registryProperties.getAddress();
int port = registryProperties.getPort();
clientProxyFactory.setIDiscovererService(new ZookeeperServiceDiscoverer(address+":"+port));
//设置支持的协议
Map<String, MessageProtocol> supportMessageProtocols = new HashMap<>(16);
supportMessageProtocols.put(protocolProperties.getName(), protocolProperties.getMessageProtocol());
clientProxyFactory.setSupportMessageProtocols(supportMessageProtocols);
//设置网络层实现
clientProxyFactory.setNetWorkClient(new NettyNetWorkClient());
return clientProxyFactory;
}
/**
* @Author lijiale
* @MethodName serviceRegister
* @Description 服务注册
* @Date 20:52 2022/2/26
* @Version 1.0
* @param
* @return: com.huawei.rpc.server.register.ServiceRegister
**/
@Bean
public ServiceRegister serviceRegister(){
String address = registryProperties.getAddress();
int port = registryProperties.getPort();
return new ZookeeperExportServiceRegister(address+":"+port, protocolProperties.getPort(), protocolProperties.getName());
}
@Bean
public RequestHandler requestHandler(ServiceRegister serviceRegister){
return new RequestHandler(protocolProperties.getMessageProtocol(), serviceRegister);
}
@Bean
public RpcServer rpcServer(RequestHandler requestHandler){
return new NettyRpcServer(protocolProperties.getPort(), protocolProperties.getName(), requestHandler);
}
}
三、RPC框架使用
因为我这是直接封装成starter了,所以使用起来非常简单,只需要准备一个zookeeper与简单的五步配置编写,当然创建服务端与客户端程序除外
3.1 步骤一
分别再创建好的服务端工程与客户端工程中引入依赖
<dependency>
<groupId>com.huawei</groupId>
<artifactId>simple-rpc-spring-boot-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
3.2 步骤二
服务端配置zookeeper地址与协议端口
spring:
rpc:
registry:
address: 127.0.0.1
port: 2181
protocol:
port: 12138
3.3 步骤三
客户端配置zookeeper地址与协议端口
spring:
rpc:
registry:
address: 127.0.0.1
port: 2181
protocol:
port: 12138
3.4 步骤四
将你的远程服务使用@Service注解,例如:
在需要暴露的服务上使用@Service注解申明,注意该注解是自定义注解
/**
* @Author Lijl
* @ClassName TestServiceImpl
* @Description 测试
* @Date 2022/2/16 13:52
* @Version 1.0
*/
@Service(version = "1.0")
public class TestServiceImpl implements TestService, ITestService {
@Override
public TestDto getTestData() {
System.out.println("远程调用.");
TestDto testDto = new TestDto();
testDto.setId(1);
testDto.setData("测试远程调用");
return testDto;
}
@Override
public TestDto getTestDtoData() {
System.out.println("远程调用.....");
TestDto testDto = new TestDto();
testDto.setId(2);
testDto.setData("测试远程调用");
return testDto;
}
}
3.5 步骤五
使用@Reference注解注入远程服务
/**
* @Author Lijl
* @ClassName TestController
* @Description 测试
* @Date 2022/2/16 14:07
* @Version 1.0
*/
@RestController
public class TestController {
@Reference(version = "1.0")
private TestService testService;
@Reference(version = "1.0")
private ITestService iTestService;
@PostMapping(value = "/test")
public void test(){
TestDto testData = this.testService.getTestData();
TestDto testDtoData = this.iTestService.getTestDtoData();
System.out.println(testData.toString()+"------"+testDtoData.toString());
}
}
完整代码
主要用于理解RPC调用原理,功能写的比较简单,待完善的地方还有很多,例如调用超时、负载均衡,容错等…,拉的有feature分支,感兴趣的小伙伴可在上补充修改自己所想到的内容
最后
以上就是俊秀白开水为你收集整理的手写简易版rpc框架,理解远程过程调用原理的全部内容,希望文章能够帮你解决手写简易版rpc框架,理解远程过程调用原理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复