概述
在上一篇文章,我们通过 BIO 实现了一个简易的 RPC 框架,使用 BIO 的优点是编码简单,但是问题也很明显,因为是同步阻塞式 IO,所以为了实现并发处理,需要给每个连接都分配一个线程,这样势必很浪费资源,导致业务体量很容易因为硬件出现瓶颈。
本篇我们就将这个 RPC 框架的底层通信方式升级为 Netty,Netty 是对 NIO 的封装,而 NIO 是基于 IO 多路复用的设计模式,通过轮询处理各个事件,大大节约了系统资源。下面我们就来看看具体该怎么做吧
可以看到,框架的整体结构并没有改变,还是那3个包,还是那几个类。但在此之上,我还添加了一个 @RpcService 注解,去优化手动发布服务为扫描注解自动发布。
注:如果相对于 version1 没有改变的类,我会在标题后标注上“未变”,看过 version1 的同学可以直接看变化的地方。
1.RpcRequest(未变)
RpcRequest 封装了消费者要调用方法的具体信息,是我们的自定义协议,或者说是消息格式。
涉及两个过程:
- Consumer 编码:RpcRequest -> 数据流(二进制) => 告诉 Provider 要执行哪个方法
- Provider 解码:数据流(二进制)-> RpcRequest => 获取 Consumer 要调用哪个方法
// 注:只有实现了序列化接口,才能实现远程传输
public class RpcRequest implements Serializable {
private String className; // 类(接口/服务)
private String methodName; // 方法
private Object[] parameters; // 参数
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
}
2.RpcService(新增)
用来标识要发布的服务
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcService {
}
3.RpcServer(修改)*
这里不再是像 version1 的 BIO 那样通过 new ServerSocket() 创建服务端,然后再用 accept() 接受连接了,而是通过 Netty 去进行通信:
public class RpcServer{
// 初始化主线程池,Selector
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 初始化子线程池,对应具体客户端处理逻辑
NioEventLoopGroup workerGrop = new NioEventLoopGroup();
/**
* 发布服务
* 注:这里因为改造成了@RpcService自动扫描,所以不再需要传入服务实例了,只用传一个端口就行
*/
public void publisher(int port) throws InterruptedException {
// 创建服务端
ServerBootstrap server = new ServerBootstrap();
// 无锁式串行化编程
server.group(bossGroup, workerGrop)
.channel(NioServerSocketChannel.class)
// 当有请求来的时候会将 Pipeline 中的所有 ChannelHandler 的走一遍
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/**
* LengthFieldBasedFrameDecoder入参有5个,分别解释如下
* maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
* lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
* lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
* lengthAdjustment:要添加到长度字段值的补偿值
* initialBytesToStrip:从解码帧中去除的第一个字节数
*/
// 1.自定义协议(InvokerProtocol)的解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 2.对象参数类型的编解码器
pipeline.addLast("encoder",new ObjectEncoder());
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
// 3.处理请求的自定义 Handler
pipeline.addLast(new ProcessorHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // 最大SelectionKey数量
.childOption(ChannelOption.SO_KEEPALIVE, true); // 保证所有子线程是长连接,且可以重复利用
// 绑定端口,并阻塞
ChannelFuture future = server.bind(port).sync();
// 正式启动服务,相当于用一个死循环开始轮训
future.channel().closeFuture().sync();
}
}
4.ProcessorHandler(修改)*
Provider 线程的具体调用逻辑:
- 扫描指定包下的所有所有 Java 文件,获取它们的全类名
- 将标有 @RpcService 的服务实现创建实例对象,然后放入map中保存(单例)
- 等有请求来到后获取到相应 Method,然后执行方法,写出结果
注:由于在上面已经配置好了编解码器,所以可以直接获取到 RpcRequest 对象,然后在写出时编码器会对内容编码成二进制流;但 BIO 这里就需要 ObjectInputStream 和 ObjectOutputStream 手动编码。
public class ProcessorHandler extends SimpleChannelInboundHandler<RpcRequest> {
// 扫描目标包后所有 Java 文件的全类名
private List<String> classNames = new ArrayList<>();
// 有 @RpcService 注解的服务bean的(serviceName,instance)
private Map<String, Object> registryMap = new ConcurrentHashMap<>();
public ProcessorHandler() {
try {
scannerClass("com.xupt.yzh.provider");
doRegistry();
} catch (Exception e) {
e.printStackTrace();
}
}
private void scannerClass(String packageName) {
// 根据包名获取绝对路径
URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\.", "/"));
File classpath = new File(url.getFile());
for (File file : classpath.listFiles()) {
if (file.isDirectory()) {
scannerClass(packageName + "." + file.getName());
} else {
// 将扫描到的类的全类名放入 classNames
classNames.add(packageName + "." + file.getName().replace(".class", ""));
}
}
}
private void doRegistry() throws Exception {
if (classNames.isEmpty()) {
return;
}
for (String className : classNames) {
Class<?> clazz = Class.forName(className);
RpcService annotation = clazz.getAnnotation(RpcService.class);
if (annotation != null) {
// 注:如果实现了多个接口,只取第一个
Class<?> i = clazz.getInterfaces()[0];
String serviceName = i.getName();
// 注册
registryMap.put(serviceName, clazz.newInstance());
}
}
}
// Consumer 发送请求过来时触发回调
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
if (registryMap.containsKey(request.getClassName())) {
Object service = registryMap.get(request.getClassName());
// 根据实参获取方法的形参
// 注:获取形参列表后才能确定一个方法
Object[] args = request.getParameters();
Class<?>[] types = new Class[args.length];
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
// 获取 Method
Method method = service.getClass().getMethod(request.getMethodName(), types);
// 执行方法
Object res = method.invoke(service, args);
ctx.writeAndFlush(res);
ctx.close();
}
}
}
5.RpcProxyClient(未变)
代理对象,通过 JDK 动态代理生成一个代理对象
PS:这里创建一个代理对象是因为,服务的实现实例在 Provider,但 Consumer 调用服务的具体方法时也需要一个实例,而 Consumer 并没有这个实例。
public class RpcProxyClient {
public <T>T clientProxy(final Class<T> interfaceCls, final String host, final int port) {
return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
new Class[]{interfaceCls},
new RemoteInvocationHandler(host, port));
}
}
6.RemoteInvocationHandler(未变)
代理对象的具体逻辑,核心是 invoke 方法,当 Consumer 调用了服务的方法时,就会走到 invoke():
- 构建请求信息 RpcRequest
- 发送(编码):将 RpcRequest 转换成二进制流,发送
- 等待 Provider 处理结果
- 接收(解码):接收 Provider 的处理结果,将二进制流转换成基本类型/Java对象,并返回给上层函数
注:234步的逻辑都是网络 IO 相关,所以后面单独封装了一个 RpcNetTransport 类去实现
public class RemoteInvocationHandler implements InvocationHandler {
String host;
int port;
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 构建调用Provider的请求参数
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName()); //
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
// 进行远程调用,并返回执行结果
RpcNetTransport netTransport = new RpcNetTransport(host, port);
Object res = netTransport.send(rpcRequest);
return res;
}
}
7.RpcNetTransport(修改)*
将原来的 BIO 网络通信修改成 Netty 代码,总体与 RpcServer 的代码有点类似
public class RpcNetTransport {
String host;
int port;
public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}
public Object send(RpcRequest rpcRequest) throws InterruptedException {
NioEventLoopGroup wokergroup = new NioEventLoopGroup();
TransportHandler transportHandler = new TransportHandler();
// 创建客户端
Bootstrap client = new Bootstrap();
client.group(wokergroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 1.自定义协议(InvokerProtocol)的解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 2.对象参数类型的编解码器
pipeline.addLast("encoder",new ObjectEncoder());
pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE,
// 3.接收 Provider 处理结果的自定义 Channel Handler
pipeline.addLast(transportHandler);
}
});
// 建立连接
ChannelFuture future = client.connect(host, port).sync();
// 发送请求信息
future.channel().writeAndFlush(rpcRequest);
future.channel().closeFuture().sync();
// 返回请求信息
return transportHandler.getResponse();
}
// 内部类
static class TransportHandler extends ChannelInboundHandlerAdapter {
private Object response;
// 接收到消息时触发该回调
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
}
public Object getResponse() {
return response;
}
}
}
好了,到此我们对 BIO 到 Netty 的改造就完成了,由于我们只是做了底层实现的优化,所以测试代码并不用改变,这也是实际开发中项目迭代的一个基本要求,要兼容老版本。
结果测试(未变)
api
public interface TestService {
String test(String name);
}
Provider
服务实现类:
@RpcService
public class TestServiceImpl implements TestService {
@Override
public String test(String name) {
System.out.println("new requst coming..." + name);
Random random = new Random();
String json = "{"name":" + """ + name + """ + ", "age":" + random.nextInt(40) + "}";
return json;
}
}
发布服务:
public class Provider {
public static void main(String[] args) throws InterruptedException {
RpcServer proxyServer = new RpcServer();
// 注:这里不用再传入服务实例
proxyServer.publisher(8080);
}
}
Consumer
远程调用服务:
public class Consumer {
public static void main(String[] args) {
// 创建代理对象
RpcProxyClient rpcProxyClient = new RpcProxyClient();
// 创建一个代理对象
TestService service = rpcProxyClient.clientProxy(TestService.class, "localhost", 8080);
// test() 会进行远程调用
String json = service.test("老五");
System.out.println(json);
}
}
结果如下:
完整代码我放到 GitHub 上了,有需要参考的同学点击这里跳转…
最后
以上就是虚心项链为你收集整理的【RPC】手写简易 RPC 框架 --重构,实现基于 Netty 通信的全部内容,希望文章能够帮你解决【RPC】手写简易 RPC 框架 --重构,实现基于 Netty 通信所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复