我是靠谱客的博主 有魅力柚子,这篇文章主要介绍深入理解RPC之手写RPC框架,现在分享给大家,希望可以做个参考。

TOC

一. RPC是什么

RPC全称remote procedure call,翻译过来就是远程过程调用。在分布式系统中,一个模块像调用本地方法一样调用远程方法的过程,就叫RPC。
我们耳熟能详的webservice、restful接口调用都是RPC,只是消息的组织方式和消息协议不同。
为了加深对RPC的理解,我手写了一个简单的RPC框架,完整的代码已上传至https://github.com/RingWu2012/ym-rpc

二.RPC流程

在这里插入图片描述
RPC的流程大致如上图所示:

  1. 客户端调用client stub(client stub位于本地,就和调用本地方法一样),传递参数
  2. clientstub将参数编组为消息,然后通过系统调用向服务的发送消息;
  3. 客户端本地操作系统将消息从客户端机器发送到服务端机器;
  4. 服务的操作系统将接收到的数据包传递给Server stub;
  5. Server stub解组消息为参数;
  6. Server stub再调用服务端的过程,过程执行结果以相同的方式回传给客户端。

三. RPC协议

涉及到通信,自然需要协议,比如tcp、udp、http、ssh、hession、thrift、grpc、webservice等等;相信有很多小伙伴也马上想到了netty,事实上,很多RPC框架都用到了netty。

常用的RPC协议有SOAP、XML-RPC、JSON-RPC、JSON-WSP,传统的webservice框架apache cxf、apache axis2等大多基于标准的SOAP协议;很多新兴的框架,譬如dubbo支持多种协议。

四. 手写RPC框架

在了解了RPC的基本原理后,尝试来写一个RPC框架。一方面可以加深对RPC的理解,另外一方面,也可以发现RPC过程中一些细节上的问题,对于阅读开源RPC框架的源码很有帮助。
在开发前,需要说明的是,用户在使用RPC框架的时候,过程接口的定义,接口的调用,接口的实现都是用户去完成的。

客户端代码开发

结合前面的RPC流程,rpc客户端需要完成的功能的有:

  1. 生成接口的代理对象
  2. 从注册中心发现服务
  3. 将请求编组为消息发送到服务端

1.生成接口的代理对象

这里使用jdk的动态代理生成接口的代理对象,生成代理对象的工厂类,代码如下

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package cn.wym.rpc.client; import cn.wym.rpc.discovery.ServiceInfoDiscoverer; import cn.wym.rpc.discovery.ZookeeperServiceInfoDiscoverer; import lombok.Getter; import lombok.Setter; import java.lang.reflect.Proxy; import java.util.HashMap; import java.util.Map; @Getter@Setter public class ClientStubProxyFactory { //服务发现 private ServiceInfoDiscoverer serviceInfoDiscoverer= new ZookeeperServiceInfoDiscoverer(); //代理对象缓存,避免每次都新建 private Map<Class<?>, Object> objectCache = new HashMap<Class<?>, Object>(); //通信客户端,用于发送请求 private NetClient netClient = new NettyClient(); public <T> T getProxy(Class<T> interf) { T obj = (T) this.objectCache.get(interf); if (obj == null) { obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf }, new ClientStubInvocationHandler(interf, serviceInfoDiscoverer, netClient)); this.objectCache.put(interf, obj); } return obj; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.wym.rpc.client; import cn.wym.rpc.common.Request; import cn.wym.rpc.common.Response; import cn.wym.rpc.discovery.ServiceInfo; import cn.wym.rpc.discovery.ServiceInfoDiscoverer; import cn.wym.rpc.protocol.JSONRpcPRotocol; import cn.wym.rpc.protocol.RpcProtocol; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.List; import java.util.Random; public class ClientStubInvocationHandler implements InvocationHandler { private Class<?> interf; private ServiceInfoDiscoverer serviceInfoDiscoverer; private NetClient netClient; private Random random = new Random(); public <T> ClientStubInvocationHandler(Class<T> interf, ServiceInfoDiscoverer serviceInfoDiscoverer, NetClient netClient) { this.interf = interf; this.serviceInfoDiscoverer = serviceInfoDiscoverer; this.netClient = netClient; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method.getName().equals("toString")) { return proxy.getClass().toString(); } if (method.getName().equals("hashCode")) { return 0; } //根据名称去注册中心找到对应的服务 String serviceName = method.getName(); List<ServiceInfo> serviceInfos = serviceInfoDiscoverer.getServiceInfo(serviceName); //缘分负载均衡 ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size())); //TODO: 将请求编组为消息发送到服务端,并读取服务端返回的结果 } }

2.从注册中心发现服务

这里使用zookeeper作为注册中心,从zookeeper上读取服务端的信息,包括 服务名称、协议、服务端地址;在上面的工厂类中,ServiceInfoDiscoverer就是服务发现类,相关代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
@Getter@Setter public class ServiceInfo { private String name; private String protocol; private String address; }
复制代码
1
2
3
4
5
6
7
8
9
package cn.wym.rpc.discovery; import java.util.List; public interface ServiceInfoDiscoverer { List<ServiceInfo> getServiceInfo(String name); }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.wym.rpc.discovery; import com.alibaba.fastjson.JSON; import org.I0Itec.zkclient.ZkClient; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.ArrayList; import java.util.List; public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer{ ZkClient client; private String centerRootPath = "/ym-rpc"; public ZookeeperServiceInfoDiscoverer() { //示例代码,配置信息直接写代码里了,实际应该写在配置文件里 String addr = "10.18.51.105:2181"; client = new ZkClient(addr); client.setZkSerializer(new DefaultZkSerializer()); } public List<ServiceInfo> getServiceInfo(String name) { String servicePath = centerRootPath + "/" + name + "/service"; List<String> children = client.getChildren(servicePath); List<ServiceInfo> resources = new ArrayList<ServiceInfo>(); for (String ch : children) { try { String deCh = URLDecoder.decode(ch, "UTF-8"); ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class); resources.add(r); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return resources; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package cn.wym.rpc.discovery; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; public class DefaultZkSerializer implements ZkSerializer { public byte[] serialize(Object o) throws ZkMarshallingError { return String.valueOf(o).getBytes(); } public Object deserialize(byte[] bytes) throws ZkMarshallingError { return new String(bytes); } }

3.将请求编组为消息发送到服务端

涉及到服务器之间通信,毫无疑问,直接上netty;协议的话,简单地来个json;

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.wym.rpc.protocol; import cn.wym.rpc.common.Request; import cn.wym.rpc.common.Response; public interface RpcProtocol { //编码请求 byte[] marshallingRequest(Request req) throws Exception; //解码请求 Request unmarshallingRequest(byte[] data) throws Exception; //编码响应 byte[] marshallingResponse(Response rsp) throws Exception; //解码响应 Response unmarshallingResponse(byte[] data) throws Exception; }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.wym.rpc.protocol; import cn.wym.rpc.common.Request; import cn.wym.rpc.common.Response; import com.alibaba.fastjson.JSONObject; public class JSONRpcPRotocol implements RpcProtocol { public byte[] marshallingRequest(Request req) throws Exception { return JSONObject.toJSONBytes(req); } public Request unmarshallingRequest(byte[] data) throws Exception { return JSONObject.parseObject(data, Request.class); } public byte[] marshallingResponse(Response rsp) throws Exception { return JSONObject.toJSONBytes(rsp); } public Response unmarshallingResponse(byte[] data) throws Exception { return JSONObject.parseObject(data, Response.class); } }

发送请求的客户端的代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
package cn.wym.rpc.client; import cn.wym.rpc.discovery.ServiceInfo; public interface NetClient { byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable; }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package cn.wym.rpc.client; import cn.wym.rpc.discovery.ServiceInfo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient implements NetClient { public byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable { String[] addInfoArray = sinfo.getAddress().split(":"); final NettySendHandler sendHandler = new NettySendHandler(data); byte[] respData = null; // 配置客户端 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(sendHandler); } }); // 启动客户端连接 b.connect(addInfoArray[0], Integer.valueOf(addInfoArray[1])).sync(); respData = (byte[]) sendHandler.rspData(); } finally { // 释放线程组资源 group.shutdownGracefully(); } return respData; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package cn.wym.rpc.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; @Slf4j public class NettySendHandler extends ChannelInboundHandlerAdapter { private CountDownLatch cdl = null; private Object readMsg = null; private byte[] data; public NettySendHandler(byte[] data) { cdl = new CountDownLatch(1); this.data = data; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("连接服务端成功:" + ctx); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); log.info("客户端发送消息:" + reqBuf); ctx.writeAndFlush(reqBuf); } public Object rspData() { try { cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } return readMsg; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("客户端读取到的数据: " + msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] resp = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(resp); readMsg = resp; cdl.countDown(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); log.error("发生异常:" + cause.getMessage()); ctx.close(); } }

服务端代码开发

服务端需要完成的功能有:

  1. 向注册中心注册服务
  2. 监听端口,接收来自客户端的请求
  3. 调用用户的实现类处理请求,结果回传给客户端

1. 向注册中心注册服务

将服务的信息(地址、端口、接口名称、支持的协议)写入到zookeeper

复制代码
1
2
3
4
5
6
7
8
9
10
package cn.wym.rpc.server.registry; public interface ServiceRegister { void register(ServiceObject so, String protocol, int port) throws Exception; ServiceObject getServiceObject(String name) throws Exception; }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package cn.wym.rpc.server.registry; import cn.wym.rpc.discovery.DefaultZkSerializer; import cn.wym.rpc.discovery.ServiceInfo; import cn.wym.rpc.util.PropertiesUtils; import com.alibaba.fastjson.JSON; import org.I0Itec.zkclient.ZkClient; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.URLEncoder; import java.util.HashMap; import java.util.Map; /** * Zookeeper方式获取远程服务信息类。 * * ZookeeperServiceInfoDiscoverer */ public class ZookeeperExportServiceRegister implements ServiceRegister { private ZkClient client; private String centerRootPath = "/ym-rpc"; private Map<String, ServiceObject> serviceMap = new HashMap<String, ServiceObject>(); public ServiceObject getServiceObject(String name) { return this.serviceMap.get(name); } public ZookeeperExportServiceRegister() { String addr = PropertiesUtils.getProperties("zk.address"); client = new ZkClient(addr); client.setZkSerializer(new DefaultZkSerializer()); } public void register(ServiceObject so, String protocolName, int port) throws Exception { if (so == null) { throw new IllegalArgumentException("参数不能为空"); } this.serviceMap.put(so.getName(), so); ServiceInfo soInf = new ServiceInfo(); String host = InetAddress.getLocalHost().getHostAddress(); String address = host + ":" + port; soInf.setAddress(address); soInf.setName(so.getInterf().getName()); soInf.setProtocol(protocolName); this.exportService(soInf); } private void exportService(ServiceInfo serviceResource) { String serviceName = serviceResource.getName(); String uri = JSON.toJSONString(serviceResource); try { uri = URLEncoder.encode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } String servicePath = centerRootPath + "/" + serviceName + "/service"; if (!client.exists(servicePath)) { client.createPersistent(servicePath, true); } String uriPath = servicePath + "/" + uri; if (client.exists(uriPath)) { client.delete(uriPath); } client.createEphemeral(uriPath); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package cn.wym.rpc.server.registry; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; @Getter@Setter@AllArgsConstructor public class ServiceObject { private String name; private Class<?> interf; private Object obj; }

2.监听端口,接收来自客户端的请求

通过netty监听固定端口来接收客户端的请求,并使用ChannelHandler来处理请求

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package cn.wym.rpc.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @Getter@Setter@Slf4j public class NettyRpcServer { protected int port; protected String protocol; protected RequestHandler handler; private Channel channel; public NettyRpcServer(int port, String protocol, RequestHandler handler) { this.port = port; this.protocol = protocol; this.handler = handler; } public void start() { // 配置服务器 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new ChannelRequestHandler(handler)); } }); // 启动服务 ChannelFuture f = b.bind(port).sync(); log.info("完成服务端端口绑定与启动"); channel = f.channel(); // 等待服务通道关闭 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 释放线程组资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public void stop() { this.channel.close(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.wym.rpc.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; @Slf4j public class ChannelRequestHandler extends ChannelInboundHandlerAdapter { private RequestHandler handler; public ChannelRequestHandler(RequestHandler handler) { this.handler = handler; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("激活"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("服务端收到消息:" + msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] req = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(req); byte[] res = handler.handleRequest(req); log.info("发送响应:" + msg); ByteBuf respBuf = Unpooled.buffer(res.length); respBuf.writeBytes(res); ctx.write(respBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error(ExceptionUtils.getStackTrace(cause)); ctx.close(); } }

3.调用用户的实现类处理请求,结果回传给客户端

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.wym.rpc.server; import cn.wym.rpc.common.Request; import cn.wym.rpc.common.Response; import cn.wym.rpc.common.Status; import cn.wym.rpc.protocol.RpcProtocol; import cn.wym.rpc.server.registry.ServiceObject; import cn.wym.rpc.server.registry.ServiceRegister; import lombok.Getter; import lombok.Setter; import java.lang.reflect.Method; @Getter@Setter public class RequestHandler { private RpcProtocol protocol; private ServiceRegister serviceRegister; public RequestHandler(RpcProtocol protocol, ServiceRegister serviceRegister) { super(); this.protocol = protocol; this.serviceRegister = serviceRegister; } public byte[] handleRequest(byte[] data) throws Exception { // 1、解组消息 Request req = this.protocol.unmarshallingRequest(data); // 2、查找服务对象 ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName()); Response rsp = null; if (so == null) { rsp = new Response(Status.NOT_FOUND); } else { // 3、反射调用对应的过程方法 try { Method m = so.getInterf().getMethod(req.getMethod(), req.getParameterTypes()); Object returnValue = m.invoke(so.getObj(), req.getParameters()); rsp = new Response(Status.SUCCESS); rsp.setReturnValue(returnValue); } catch (Exception e) { rsp = new Response(Status.ERROR); rsp.setException(e); } } // 4、编组响应消息 return this.protocol.marshallingResponse(rsp); } }

五. 验证

写一个Provider,Consumer,还有一个DemoService的接口,分别启动两个provider实例,监听端口8080,8079,然后运行consumer,查看服务调用情况在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

最后

以上就是有魅力柚子最近收集整理的关于深入理解RPC之手写RPC框架的全部内容,更多相关深入理解RPC之手写RPC框架内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(83)

评论列表共有 0 条评论

立即
投稿
返回
顶部