概述
最近在学习Netty框架,为了学以致用,决定自己实现一个简单的RPC框架,该项目采用Netty进行通信,使用Spring进行搭建,目前已经实现了RPC框架的基本功能,其余的改进尚在进一步完善中,github地址:https://github.com/13350747533/HahaRpc
在这里记录一下简单的思路:
服务注册与发现:
首先是服务的注册与发现,本项目使用的是zookeeper来作为注册中心,使用ZKClient来对zookeeper的节点进行管理。
服务发现的代码:
public String discovery(String serviceName) {
//创建zookeeper节点
ZkClient zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT, Constant.ZK_CONNECTION_TIMEOUT);
LOGGER.debug("connect zookeeper");
try{
//获取service节点
String servicePath = Constant.ZK_REGISTRY_PATH + "/" + serviceName;
if(!zkClient.exists(servicePath)){
throw new RuntimeException(String.format("can not find any service node on path: %s", servicePath));
}
List<String> addressList = zkClient.getChildren(servicePath);
if(CollectionUtil.isEmpty(addressList)){
throw new RuntimeException(String.format("can not find any address node on path: %s", servicePath));
}
//获取 address 节点
String address;
int size = addressList.size();
if(size == 1) {
//若只有一个地址
address = addressList.get(0);
LOGGER.debug("get only address node: {}", address);
}else{
address = addressList.get(ThreadLocalRandom.current().nextInt(size));
LOGGER.debug("get random address node: {}", address);
}
//获取address节点的值
String addressPath = servicePath + "/" + address;
return zkClient.readData(addressPath);
}finally {
zkClient.close();
}
}
服务注册的代码:
private final ZkClient zkClient;
public ZooKeeperServiceRegistry(String zkAddress) {
zkClient = new ZkClient(zkAddress, Constant.ZK_SESSION_TIMEOUT,Constant.ZK_CONNECTION_TIMEOUT);
LOGGER.debug("connect zookeeper");
}
@Override
public void registry(String serviceName, String serviceAddress) {
//创建registry节点 (持久)
String registryPath = Constant.ZK_REGISTRY_PATH;
if(!zkClient.exists(registryPath)){
zkClient.createPersistent(registryPath);
LOGGER.debug("create registry node :{}", registryPath);
}
//创建Service节点 (持久)
String servicePath = registryPath + "/" + serviceName;
if(!zkClient.exists(servicePath)){
zkClient.createPersistent(servicePath);
LOGGER.debug("create service node : {}", servicePath);
}
//创建Address节点 (临时)
String addressPath = servicePath + "/address-";
String addressNode = zkClient.createEphemeralSequential(addressPath,serviceAddress);
LOGGER.debug("create address node : {}", addressNode);
}
处理完了服务的注册与发现以后,我们先来看一下服务端的代码。
首先我们要确定哪些方法需要被远程调用,为此我们自定义一个RPCService注解。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
/**
* 服务接口类
* @return
*/
Class<?> value();
/**
* 服务版本号
* @return
*/
String version() default "";
/**
* 序列化方式
* @return
*/
// String serlize() default "protostuff";
}
该注解继承了spring的@component,因此在spring容器被初始化的时候所有带有@RPCService注解的方法都会被扫描到。
我们定义一个NettyServer类,它要实现ApplicationContextAware, InitializingBean这两个接口。然后实现setApplicationContext方法,这个方法会完成上面提到的扫描。
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//扫描带有RpcService注解的类并初始化 handleMapduixiang
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if(MapUtils.isNotEmpty(serviceBeanMap)){
for(Object serviceBean : serviceBeanMap.values()) {
RpcService rpcService = serviceBean.getClass().getAnnotation(RpcService.class);
String serviceName = rpcService.value().getName();
String serviceVersion = rpcService.version();
// String serialize = rpcService.serlize();
if (StringUtil.isNotEmpty(serviceVersion)){
serviceName += "-" + serviceVersion;
}
handlerMap.put(serviceName, serviceBean);
// LOGGER.debug("handlerMap is {}", handlerMap);
}
}
}
同时,我们还要实现 afterPropertiesSet()方法,该方法在spring的bean的生命周期中会在属性填充之后调用,我们利用这个方法来开启netty服务端:
public void afterPropertiesSet() throws Exception {
super.start();
}
在父类中,有我们的netty服务器的相关代码和zookeeper注册的相关代码:
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//创建并初始化Netty服务器 BootStrap 对象
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler((new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//解码,编码,并处理请求
pipeline.addLast(new RpcDecoder(RpcRequest.class, serlizer));
pipeline.addLast(new RpcEncoder(RpcResponse.class, serlizer));
pipeline.addLast(new RpcServerHandler(handlerMap, threadPoolExecutor));
}
}));
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
//获取RPC服务器的IP和端口号
String[] addressArray = StringUtil.split(serviceAddress,":");
String ip = addressArray[0];
int port = Integer.parseInt(addressArray[1]);
//启动RPC服务器
ChannelFuture future = bootstrap.bind(ip, port).sync();
//注册RPC服务器
// LOGGER.debug("serviceRegistry is {}", serviceRegistry);
if (serviceRegistry != null) {
LOGGER.debug("1 handleMap is {}", handlerMap);
for(String interfaceName : handlerMap.keySet()){
serviceRegistry.registry(interfaceName, serviceAddress);
LOGGER.debug("registry service: {} => {}", interfaceName, serviceAddress);
}
}
LOGGER.debug("server started on port {}", port);
//关闭 RPC服务器
future.channel().closeFuture().sync();
}finally{
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
});
thread.start();
这里我们可以看到自定义的解码和编码的类RpcEncoder和RpcDecoder,以及Rpc的响应和请求的自定义类RpcRequest,RpcResponse,现在让我们展时先放一放,稍后再来看看这几个类。
现在服务端收到了来自客户端的请求,我们需要对其进行处理,定义RpcServerHandler类继承自SimpleChannelInboundHandler,在其中重写channelRead0方法:
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
threadPoolExecutor.execute(new Runnable() {
public void run() {
//创建并初始化RPC响应对象
RpcResponse response = new RpcResponse();
LOGGER.debug("cannelRead0start, request is {}", request);
response.setRequestId(request.getRequestId());
try{
Object result = handle(request);
response.setResult(result);
} catch (Exception e) {
LOGGER.error("handler result failuer", e);
response.setException(e);
}
LOGGER.debug("channelread0 voer, response is {}", response);
//写入RPC响应对象并自动关闭连接
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
});
}
在handle方法中,使用反射的方法进行调用:
private Object handle(RpcRequest request) throws Exception {
//获取服务对象
// LOGGER.debug("Request is {}", request);
String serviceName = request.getInterfaceName();
String serviceVersion = request.getServiceVersion();
if(StringUtil.isNotEmpty(serviceVersion)) {
serviceName += "-" + serviceVersion;
}
Object serviceBean = handlerMap.get(serviceName);
if (serviceBean == null) {
throw new RuntimeException(String.format("can not find service bean by key : %s", serviceName));
}
// 获取反射调用所需的参数
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParametersTypes();
Object[] parameters = request.getParameters();
//反射调用方法
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, parameters);
//使用cglib
// FastClass serviceFastClass = FastClass.create(serviceClass);
// FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// return serviceFastMethod.invoke(serviceBean, parameters);
}
服务端的代码到此为止,接下来让我们看看client端的代码。
Client目前只有两个类,首先是比较简单的RpcClient:
public RpcResponse send(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建并初始化netty客户端Bootstrap对象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new RpcEncoder(RpcRequest.class, serializer)); //编码RPC请求
pipeline.addLast(new RpcDecoder(RpcResponse.class, serializer)); //解码rpc响应
pipeline.addLast(RpcClient.this); //处理RPC响应
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//连接RPC服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
//写入RPC请求数据并关闭连接
Channel channel = future.channel();
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
//返回RPC响应对象
return response;
} finally {
group.shutdownGracefully();
}
}
就是一个标准的netty客户端的写法,接下来的是RpcProxy:
public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
//创建动态代理对象
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 创建RPC请求对象并设置请求属性
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setInterfaceName(method.getDeclaringClass().getName());
request.setServiceVersion(serviceVersion);
request.setMethodName(method.getName());
request.setParametersTypes(method.getParameterTypes());
request.setParameters(args);
//获取 RPC服务地址
if (serviceDiscovery != null) {
String serviceName = interfaceClass.getName();
if (StringUtil.isNotEmpty(serviceVersion)) {
serviceName += "-" + serviceVersion;
}
serviceAddress = serviceDiscovery.discovery(serviceName);
LOGGER.debug("discovery service: {} => {}", serviceName, serviceAddress);
}
if (StringUtil.isEmpty(serviceAddress)) {
throw new RuntimeException("server address is empty");
}
//从RPC服务地址中解析主机名与端口号
String[] array = StringUtil.split(serviceAddress,":");
String host = array[0];
int port = Integer.parseInt(array[1]);
//创建RPC客户端对象并发送RPC请求
RpcClient client = new RpcClient(host, port);
long time = System.currentTimeMillis();
RpcResponse response = client.send(request);
LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
if(response == null){
throw new RuntimeException("response is null");
}
//返回RPC响应结果
if(response.hasException()){
return response.getException();
}else{
return response.getResult();
}
}
}
);
}
在create方法里面,封装请求体,调用send方法,请求服务提供者。
最后在让我们来看看之前的几个类:
public class RpcRequest {
private String requestId;
private String interfaceName;
private String serviceVersion;
private String methodName;
private Class<?>[] parametersTypes;
private Object[] parameters;
}
public class RpcResponse {
private String requestId;
private Exception exception;
private Object result;
public boolean hasException(){
return exception != null;
}
}
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
Serializer serializer;
public RpcDecoder(Class<?> genericClass, Serializer serializer){
this.serializer = serializer;
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes() < 4){
return;
}
byteBuf.markReaderIndex();
int dataLength = byteBuf.readInt();
if(byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
byteBuf.readBytes(data);
list.add(SerializationUtil.deserialize(data, genericClass));
}
}
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
private Serializer serializer;
public RpcEncoder(Class<?> genericClass, Serializer serializer) {
this.serializer = serializer;
this.genericClass = genericClass;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
if(genericClass.isInstance(o)) {
byte[] data = SerializationUtil.serialize(o);
byteBuf.writeInt(data.length);
byteBuf.writeBytes(data);
}
}
}
RpcEncoder和RpcDecoder都继承自netty自带的解码编码器,我们可以为其指定自己的序列化方式。
最后
以上就是欢喜紫菜为你收集整理的手写一个简单的RPC框架的全部内容,希望文章能够帮你解决手写一个简单的RPC框架所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复