概述
TOC
一. RPC是什么
RPC全称remote procedure call,翻译过来就是远程过程调用。在分布式系统中,一个模块像调用本地方法一样调用远程方法的过程,就叫RPC。
我们耳熟能详的webservice、restful接口调用都是RPC,只是消息的组织方式和消息协议不同。
为了加深对RPC的理解,我手写了一个简单的RPC框架,完整的代码已上传至https://github.com/RingWu2012/ym-rpc
二.RPC流程
RPC的流程大致如上图所示:
- 客户端调用client stub(client stub位于本地,就和调用本地方法一样),传递参数
- clientstub将参数编组为消息,然后通过系统调用向服务的发送消息;
- 客户端本地操作系统将消息从客户端机器发送到服务端机器;
- 服务的操作系统将接收到的数据包传递给Server stub;
- Server stub解组消息为参数;
- Server stub再调用服务端的过程,过程执行结果以相同的方式回传给客户端。
三. RPC协议
涉及到通信,自然需要协议,比如tcp、udp、http、ssh、hession、thrift、grpc、webservice等等;相信有很多小伙伴也马上想到了netty,事实上,很多RPC框架都用到了netty。
常用的RPC协议有SOAP、XML-RPC、JSON-RPC、JSON-WSP,传统的webservice框架apache cxf、apache axis2等大多基于标准的SOAP协议;很多新兴的框架,譬如dubbo支持多种协议。
四. 手写RPC框架
在了解了RPC的基本原理后,尝试来写一个RPC框架。一方面可以加深对RPC的理解,另外一方面,也可以发现RPC过程中一些细节上的问题,对于阅读开源RPC框架的源码很有帮助。
在开发前,需要说明的是,用户在使用RPC框架的时候,过程接口的定义,接口的调用,接口的实现都是用户去完成的。
客户端代码开发
结合前面的RPC流程,rpc客户端需要完成的功能的有:
- 生成接口的代理对象
- 从注册中心发现服务
- 将请求编组为消息发送到服务端
1.生成接口的代理对象
这里使用jdk的动态代理生成接口的代理对象,生成代理对象的工厂类,代码如下
package cn.wym.rpc.client;
import cn.wym.rpc.discovery.ServiceInfoDiscoverer;
import cn.wym.rpc.discovery.ZookeeperServiceInfoDiscoverer;
import lombok.Getter;
import lombok.Setter;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
@Getter@Setter
public class ClientStubProxyFactory {
//服务发现
private ServiceInfoDiscoverer serviceInfoDiscoverer= new ZookeeperServiceInfoDiscoverer();
//代理对象缓存,避免每次都新建
private Map<Class<?>, Object> objectCache = new HashMap<Class<?>, Object>();
//通信客户端,用于发送请求
private NetClient netClient = new NettyClient();
public <T> T getProxy(Class<T> interf) {
T obj = (T) this.objectCache.get(interf);
if (obj == null) {
obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
new ClientStubInvocationHandler(interf, serviceInfoDiscoverer, netClient));
this.objectCache.put(interf, obj);
}
return obj;
}
}
package cn.wym.rpc.client;
import cn.wym.rpc.common.Request;
import cn.wym.rpc.common.Response;
import cn.wym.rpc.discovery.ServiceInfo;
import cn.wym.rpc.discovery.ServiceInfoDiscoverer;
import cn.wym.rpc.protocol.JSONRpcPRotocol;
import cn.wym.rpc.protocol.RpcProtocol;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Random;
public class ClientStubInvocationHandler implements InvocationHandler {
private Class<?> interf;
private ServiceInfoDiscoverer serviceInfoDiscoverer;
private NetClient netClient;
private Random random = new Random();
public <T> ClientStubInvocationHandler(Class<T> interf, ServiceInfoDiscoverer serviceInfoDiscoverer, NetClient netClient) {
this.interf = interf;
this.serviceInfoDiscoverer = serviceInfoDiscoverer;
this.netClient = netClient;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("toString")) {
return proxy.getClass().toString();
}
if (method.getName().equals("hashCode")) {
return 0;
}
//根据名称去注册中心找到对应的服务
String serviceName = method.getName();
List<ServiceInfo> serviceInfos = serviceInfoDiscoverer.getServiceInfo(serviceName);
//缘分负载均衡
ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));
//TODO: 将请求编组为消息发送到服务端,并读取服务端返回的结果
}
}
2.从注册中心发现服务
这里使用zookeeper作为注册中心,从zookeeper上读取服务端的信息,包括 服务名称、协议、服务端地址;在上面的工厂类中,ServiceInfoDiscoverer就是服务发现类,相关代码如下:
@Getter@Setter
public class ServiceInfo {
private String name;
private String protocol;
private String address;
}
package cn.wym.rpc.discovery;
import java.util.List;
public interface ServiceInfoDiscoverer {
List<ServiceInfo> getServiceInfo(String name);
}
package cn.wym.rpc.discovery;
import com.alibaba.fastjson.JSON;
import org.I0Itec.zkclient.ZkClient;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.List;
public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer{
ZkClient client;
private String centerRootPath = "/ym-rpc";
public ZookeeperServiceInfoDiscoverer() {
//示例代码,配置信息直接写代码里了,实际应该写在配置文件里
String addr = "10.18.51.105:2181";
client = new ZkClient(addr);
client.setZkSerializer(new DefaultZkSerializer());
}
public List<ServiceInfo> getServiceInfo(String name) {
String servicePath = centerRootPath + "/" + name + "/service";
List<String> children = client.getChildren(servicePath);
List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
for (String ch : children) {
try {
String deCh = URLDecoder.decode(ch, "UTF-8");
ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
resources.add(r);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return resources;
}
}
package cn.wym.rpc.discovery;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
public class DefaultZkSerializer implements ZkSerializer {
public byte[] serialize(Object o) throws ZkMarshallingError {
return String.valueOf(o).getBytes();
}
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return new String(bytes);
}
}
3.将请求编组为消息发送到服务端
涉及到服务器之间通信,毫无疑问,直接上netty;协议的话,简单地来个json;
package cn.wym.rpc.protocol;
import cn.wym.rpc.common.Request;
import cn.wym.rpc.common.Response;
public interface RpcProtocol {
//编码请求
byte[] marshallingRequest(Request req) throws Exception;
//解码请求
Request unmarshallingRequest(byte[] data) throws Exception;
//编码响应
byte[] marshallingResponse(Response rsp) throws Exception;
//解码响应
Response unmarshallingResponse(byte[] data) throws Exception;
}
package cn.wym.rpc.protocol;
import cn.wym.rpc.common.Request;
import cn.wym.rpc.common.Response;
import com.alibaba.fastjson.JSONObject;
public class JSONRpcPRotocol implements RpcProtocol {
public byte[] marshallingRequest(Request req) throws Exception {
return JSONObject.toJSONBytes(req);
}
public Request unmarshallingRequest(byte[] data) throws Exception {
return JSONObject.parseObject(data, Request.class);
}
public byte[] marshallingResponse(Response rsp) throws Exception {
return JSONObject.toJSONBytes(rsp);
}
public Response unmarshallingResponse(byte[] data) throws Exception {
return JSONObject.parseObject(data, Response.class);
}
}
发送请求的客户端的代码如下:
package cn.wym.rpc.client;
import cn.wym.rpc.discovery.ServiceInfo;
public interface NetClient {
byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable;
}
package cn.wym.rpc.client;
import cn.wym.rpc.discovery.ServiceInfo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient implements NetClient {
public byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable {
String[] addInfoArray = sinfo.getAddress().split(":");
final NettySendHandler sendHandler = new NettySendHandler(data);
byte[] respData = null;
// 配置客户端
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(sendHandler);
}
});
// 启动客户端连接
b.connect(addInfoArray[0], Integer.valueOf(addInfoArray[1])).sync();
respData = (byte[]) sendHandler.rspData();
} finally {
// 释放线程组资源
group.shutdownGracefully();
}
return respData;
}
}
package cn.wym.rpc.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class NettySendHandler extends ChannelInboundHandlerAdapter {
private CountDownLatch cdl = null;
private Object readMsg = null;
private byte[] data;
public NettySendHandler(byte[] data) {
cdl = new CountDownLatch(1);
this.data = data;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("连接服务端成功:" + ctx);
ByteBuf reqBuf = Unpooled.buffer(data.length);
reqBuf.writeBytes(data);
log.info("客户端发送消息:" + reqBuf);
ctx.writeAndFlush(reqBuf);
}
public Object rspData() {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return readMsg;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客户端读取到的数据: " + msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] resp = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(resp);
readMsg = resp;
cdl.countDown();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
log.error("发生异常:" + cause.getMessage());
ctx.close();
}
}
服务端代码开发
服务端需要完成的功能有:
- 向注册中心注册服务
- 监听端口,接收来自客户端的请求
- 调用用户的实现类处理请求,结果回传给客户端
1. 向注册中心注册服务
将服务的信息(地址、端口、接口名称、支持的协议)写入到zookeeper
package cn.wym.rpc.server.registry;
public interface ServiceRegister {
void register(ServiceObject so, String protocol, int port) throws Exception;
ServiceObject getServiceObject(String name) throws Exception;
}
package cn.wym.rpc.server.registry;
import cn.wym.rpc.discovery.DefaultZkSerializer;
import cn.wym.rpc.discovery.ServiceInfo;
import cn.wym.rpc.util.PropertiesUtils;
import com.alibaba.fastjson.JSON;
import org.I0Itec.zkclient.ZkClient;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;
/**
* Zookeeper方式获取远程服务信息类。
*
* ZookeeperServiceInfoDiscoverer
*/
public class ZookeeperExportServiceRegister implements ServiceRegister {
private ZkClient client;
private String centerRootPath = "/ym-rpc";
private Map<String, ServiceObject> serviceMap = new HashMap<String, ServiceObject>();
public ServiceObject getServiceObject(String name) {
return this.serviceMap.get(name);
}
public ZookeeperExportServiceRegister() {
String addr = PropertiesUtils.getProperties("zk.address");
client = new ZkClient(addr);
client.setZkSerializer(new DefaultZkSerializer());
}
public void register(ServiceObject so, String protocolName, int port) throws Exception {
if (so == null) {
throw new IllegalArgumentException("参数不能为空");
}
this.serviceMap.put(so.getName(), so);
ServiceInfo soInf = new ServiceInfo();
String host = InetAddress.getLocalHost().getHostAddress();
String address = host + ":" + port;
soInf.setAddress(address);
soInf.setName(so.getInterf().getName());
soInf.setProtocol(protocolName);
this.exportService(soInf);
}
private void exportService(ServiceInfo serviceResource) {
String serviceName = serviceResource.getName();
String uri = JSON.toJSONString(serviceResource);
try {
uri = URLEncoder.encode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String servicePath = centerRootPath + "/" + serviceName + "/service";
if (!client.exists(servicePath)) {
client.createPersistent(servicePath, true);
}
String uriPath = servicePath + "/" + uri;
if (client.exists(uriPath)) {
client.delete(uriPath);
}
client.createEphemeral(uriPath);
}
}
package cn.wym.rpc.server.registry;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
@Getter@Setter@AllArgsConstructor
public class ServiceObject {
private String name;
private Class<?> interf;
private Object obj;
}
2.监听端口,接收来自客户端的请求
通过netty监听固定端口来接收客户端的请求,并使用ChannelHandler来处理请求
package cn.wym.rpc.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Getter@Setter@Slf4j
public class NettyRpcServer {
protected int port;
protected String protocol;
protected RequestHandler handler;
private Channel channel;
public NettyRpcServer(int port, String protocol, RequestHandler handler) {
this.port = port;
this.protocol = protocol;
this.handler = handler;
}
public void start() {
// 配置服务器
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ChannelRequestHandler(handler));
}
});
// 启动服务
ChannelFuture f = b.bind(port).sync();
log.info("完成服务端端口绑定与启动");
channel = f.channel();
// 等待服务通道关闭
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public void stop() {
this.channel.close();
}
}
package cn.wym.rpc.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
@Slf4j
public class ChannelRequestHandler extends ChannelInboundHandlerAdapter {
private RequestHandler handler;
public ChannelRequestHandler(RequestHandler handler) {
this.handler = handler;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("激活");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务端收到消息:" + msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] req = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(req);
byte[] res = handler.handleRequest(req);
log.info("发送响应:" + msg);
ByteBuf respBuf = Unpooled.buffer(res.length);
respBuf.writeBytes(res);
ctx.write(respBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(ExceptionUtils.getStackTrace(cause));
ctx.close();
}
}
3.调用用户的实现类处理请求,结果回传给客户端
package cn.wym.rpc.server;
import cn.wym.rpc.common.Request;
import cn.wym.rpc.common.Response;
import cn.wym.rpc.common.Status;
import cn.wym.rpc.protocol.RpcProtocol;
import cn.wym.rpc.server.registry.ServiceObject;
import cn.wym.rpc.server.registry.ServiceRegister;
import lombok.Getter;
import lombok.Setter;
import java.lang.reflect.Method;
@Getter@Setter
public class RequestHandler {
private RpcProtocol protocol;
private ServiceRegister serviceRegister;
public RequestHandler(RpcProtocol protocol, ServiceRegister serviceRegister) {
super();
this.protocol = protocol;
this.serviceRegister = serviceRegister;
}
public byte[] handleRequest(byte[] data) throws Exception {
// 1、解组消息
Request req = this.protocol.unmarshallingRequest(data);
// 2、查找服务对象
ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName());
Response rsp = null;
if (so == null) {
rsp = new Response(Status.NOT_FOUND);
} else {
// 3、反射调用对应的过程方法
try {
Method m = so.getInterf().getMethod(req.getMethod(), req.getParameterTypes());
Object returnValue = m.invoke(so.getObj(), req.getParameters());
rsp = new Response(Status.SUCCESS);
rsp.setReturnValue(returnValue);
} catch (Exception e) {
rsp = new Response(Status.ERROR);
rsp.setException(e);
}
}
// 4、编组响应消息
return this.protocol.marshallingResponse(rsp);
}
}
五. 验证
写一个Provider,Consumer,还有一个DemoService的接口,分别启动两个provider实例,监听端口8080,8079,然后运行consumer,查看服务调用情况
最后
以上就是有魅力柚子为你收集整理的深入理解RPC之手写RPC框架的全部内容,希望文章能够帮你解决深入理解RPC之手写RPC框架所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复