概述
简易版RPC
RPC是什么?
简单的回答 : 远程服务调用.
应用在哪里?
在dubbo中有应用.
为什么要学习它?
为了更好的理解dubbo框架以及其他的东西.
手写RPC并解析相关知识点(这才是重点)
基础知识点
实现架构(从角色角度分析)
引入官网架构图:
链接: link.
简单版本结构图
https://www.processon.com/diagraming/601fd9fee0b34d208a6a2924
具体实现
以下会贴入代码及解释及测试
工程结构
rpc-common : 属于公共依赖(定义公共接口, 定义规范)
rpc-consumer1: 属于消费端实现(主要通过协议与服务端通信)
rpc-provider1: 属于服务端(服务提供者)实现
基本依赖
<!--引入netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<!--<version>1.2.47</version>-->
<version>1.2.71</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<!--<version>5.1.46</version>-->
<version>5.1.20</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.21</version>
</dependency>
<!--zkclient客户端-->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
<!--zookeeper 案例 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
以上依赖有许多并不是专门为这个RPC服务的, 但是我这里也贴出来了,主要是这个工程还有其他的功能,这不做介绍.
具体实现
rpc-common工程:
IUserService : 接口定义
/**
*
* @param msg
* @return
*/
String sayHello(String msg);
JsonSerializer : 序列化接口实现
public class JsonSerializer implements Serializer {
@Override
public byte[] serialize(Object o) throws IOException {
return JSON.toJSONBytes(o);
}
@Override
public <T> T deserialize(Class<?> clazz, byte[] bytes) throws IOException {
//反序列化
return JSON.parseObject(bytes, clazz);
}
}
RpcDecoder : rpc解码器
public class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
private Class<?> clazz;
private Serializer serializer;
public RpcDecoder(Class<?> clazz, Serializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf,
List<Object> out) throws Exception {
int readInt = byteBuf.readInt();
byte[] bytes = new byte[readInt];
byteBuf.readBytes(bytes);
//反序列化
Object deserialize = serializer.deserialize(clazz, bytes);
out.add(deserialize);
}
}
RpcEncoder : rpc编码器
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> clazz;
private Serializer serializer;
public RpcEncoder(Class<?> clazz, Serializer serializer) {
this.clazz = clazz;
this.serializer = serializer;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg,
ByteBuf out) throws Exception {
/*
将对象 写入到缓存 :
写入 int + byte
*/
if(clazz != null && clazz.isInstance(msg)) {
byte[] bytes = serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
}
RpcRequest : rpc请求协议对象
- 传输 协议对象
- 请求服务id
- 方法名
- 类名
- 形式参数
- 实际参数
@Data
public class RpcRequest implements Serializable {
private static final long serialVersionUID = -4172554571135234360L;
private String requestId;//请求服务id
private String className;//方法名
private String methodName;//类名
private Class<?>[] parameterTypes;//形式参数
private Object[] parameters;//实际参数
}
RpcResult : rpc返回结果(服务端 -> 客户端)
@Data
public class RpcResult implements Serializable {
private static final long serialVersionUID = 1561074744495374547L;
private boolean success;
private String message;
private Object data;
}
Serializer : 序列化接口
public interface Serializer {
/*
接口 :
*/
/**
* 序列化
* @param o
* @return
* @throws IOException
*/
byte[] serialize(Object o)throws IOException;
/**
* 反序列化
* @param clazz
* @param bytes
* @param <T>
* @return
* @throws IOException
*/
<T> T deserialize(Class<?> clazz, byte[] bytes) throws IOException;
}
ZookeeperUtils:
public class ZookeeperUtils {
/**
*
*/
private static CuratorFramework client;
/**
*
* @return
*/
public static CuratorFramework getClient() {
if(client == null) {
synchronized (ZookeeperUtils.class) {
if(null == client) {
client = createClient();
}
}
}
return client;
}
/**
*
* @return
*/
private static CuratorFramework createClient() {
RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
//使用fluent变成风格
client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(exponentialBackoffRetry)
.namespace("lagou-servers") //独立的命名空间
.build();
client.start();
System.out.println("zookeeper会话创建了.");
return client;
}
/**
* 测试创建会话
* @param args
*/
public static void main(String[] args) {
createClient();
}
}
以上涉及知识点:
1: netty的编解码器, 拆包,粘包,自定义协议.
2: netty的服务处理器的入站,出站顺序及配置.
3: netty的ByteBuf.
4: java的反射.
5: netty的事件触发点.
6: alibaba的fastjson序列化反序列化.
7: zookeeper的基础知识,zookeeper节点的特性.
zookeeper节点的数据结构:
zookeeper节点的分类:
rpc-provider1工程:
ServerBoot
@Service
public class ServerBoot implements ApplicationRunner {
private static String IP = "127.0.0.1";
private static Integer PORT = 9010;
/**
* @param applicationArguments
* @throws Exception
*/
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
startNettyServer(IP, PORT);
}
//*************启动netty
/**
* @param ip
* @param port
* @throws Exception
*/
private static void startNettyServer(String ip, int port) throws Exception {
/*
1: 引导器
2: 线程池
3: 处理器
4: 监听端口
*/
//1.创建两个线程池对象
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
//2.创建服务端的启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//3.配置启动引导对象
serverBootstrap.group(bossGroup, workGroup)
//设置通道为NIO
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//这个配置要注意: RpcResult + RpcRequest;
//我们根据Encoder和Decoder来分入站和出站
//根据RpcRequest : 知道这个是要Encoder的
//知道RpcResult是要Decoder的.
pipeline.addLast(new RpcEncoder(RpcResult.class, new JsonSerializer()));
pipeline.addLast(new RpcDecoder(RpcRequest.class, new JsonSerializer()));
//业务处理类
pipeline.addLast(new UserServiceHandler(SpringContextHolder.getApplicationContext()));
}
});
//这里的sync()
serverBootstrap.bind(port).sync();
//链接zk : 创建临时有序节点
CuratorFramework client = ZookeeperUtils.getClient();
String path = "/serverNodes/node";
//在这里创建 lagou-servers
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(path, (ip + ":" + port).getBytes());
//赋值为 当前服务器的 ip和端口 --》 可以和远程客户端进行通信
}
}
UserServiceHandler
public class UserServiceHandler extends ChannelInboundHandlerAdapter {
private ApplicationContext applicationContext;
public UserServiceHandler(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest rpcRequest = null;
if(null == msg) {
returnFailRst(ctx, "netty服务端: 协议对象不能为空!");
return;
}
rpcRequest = (RpcRequest) msg;
System.out.println("netty服务端: 服务端接收到的协议对象为: " + rpcRequest);
if(StringUtils.isEmpty(rpcRequest.getRequestId())) {
returnFailRst(ctx, "netty服务端: 请求的协议对象ID不能为空!!!");
}
// Object bean = applicationContext.getBean(rpcRequest.getRequestId());
Object bean = SpringContextHolder.getApplicationContext().getBean(rpcRequest.getRequestId());
if(bean == null) {
returnFailRst(ctx, "netty服务端: 服务ID" + rpcRequest.getRequestId() + "不存在!");
}
//构造反射
Method method = ReflectionUtils.findMethod(bean.getClass(), rpcRequest.getMethodName(),
rpcRequest.getParameterTypes());
Object result = ReflectionUtils.invokeMethod(method, bean, rpcRequest.getParameters());
returnSuccRst(ctx, result);
System.out.println("netty服务端: 返回结果 " + result);
}
/**
* 构造成功结果返回
* @param ctx
* @param result
*/
private void returnSuccRst(ChannelHandlerContext ctx, Object result) {
RpcResult rpcResult = new RpcResult();
rpcResult.setSuccess(true);
rpcResult.setData(result);
rpcResult.setMessage("netty服务端: 处理成功!" + result);
ctx.writeAndFlush(rpcResult);
}
/**
* 返回 失败的结果
* @param ctx
* @param s
*/
private void returnFailRst(ChannelHandlerContext ctx,String s) {
RpcResult rpcResult = new RpcResult();
rpcResult.setSuccess(false);
rpcResult.setMessage("netty服务端: 处理失败!" + s);
ctx.writeAndFlush(rpcResult);
}
}
SpringContextHolder
@Component
public class SpringContextHolder implements ApplicationContextAware,DisposableBean {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
SpringContextHolder.applicationContext = context;
}
/**
* 提供外部访问
* @return
*/
public static ApplicationContext getApplicationContext() {
if (applicationContext == null) {
throw new IllegalStateException("applicaitonContext属性未注入, 请在SpringBoot启动类中注册SpringContextHolder.");
}else {
return applicationContext;
}
}
@Override
public void destroy() throws Exception {
applicationContext = null;
// SpringContextHolder.applicationContext = null;
}
//***********提供一系列 获取 bean的方法
/**
* 通过name获取 Bean
* @param name 类名称
* @return 实例对象
*/
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
/**
* 通过class获取Bean
*/
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
/**
* 通过name,以及Clazz返回指定的Bean
*/
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}
UserServiceImpl
@Service(value = "IUserService")
public class UserServiceImpl implements IUserService {
private static Integer PORT = 9010;
@Override
public String sayHello(String msg) {
System.out.println("这是端口("+ PORT +")服务端的第一个方法:sayHello---》 " + msg);
int i = RandomUtil.randomInt(3);
ThreadUtil.sleep(i * 1000);
return "这是端口("+ PORT +")服务器返回数据 : " + msg;
}
@Override
public String sayHelloWorld(String msg) {
System.out.println("这是端口("+ PORT +")服务端的第二个方法sayHelloWorld---》 " + msg);
return "这是端口("+ PORT +")服务端的第二个方法返回数据 : " + msg;
}
}
LaGouV1Application
以上涉及知识点:
1: springboot
2: springboot的自动装配
3: 其他
rpc-consumer1工程:
RpcConsumer
public class RpcConsumer {
/**
* 线程池
*/
private static ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
/**
* 记录 服务端 处理器
*/
private volatile static Map<String, UserClientHandler> userClientHandlerMap;
/**
* 记录最近一次服务处理 时间
*/
private volatile static Map<String, Long> userClientHandlerTime = new ConcurrentHashMap<>();
/**
*
*/
private static volatile long count = 0;
/*
1: 启动所有客户端
2: 注册监听
*/
private static void init() throws Exception {
CuratorFramework client = ZookeeperUtils.getClient();
String path = "/serverNodes";
//初始化所有的客户端连接
initClients(client, path);
//注册zk 父路径监听
registerZkPathListener(client, path);
}
/**
* 注册路径监听
* @param client
* @param path
*/
private static void registerZkPathListener(CuratorFramework client, String path) throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
ChildData childData = event.getData();
PathChildrenCacheEvent.Type eventType = event.getType();
System.out.println("zk节点监控: childData: " + childData);
System.out.println("zk节点监控: eventType: " + eventType);
switch (eventType) {
case CONNECTION_RECONNECTED:
pathChildrenCache.rebuild();
break;
case CONNECTION_SUSPENDED:
break;
case CONNECTION_LOST:
System.out.println("Connection lost.");
break;
case CHILD_ADDED:
addClient(client, event.getData());
break;
case CHILD_UPDATED:
System.out.println("Child updated");
break;
case CHILD_REMOVED:
System.out.println("节点移除!");
delClient(client, event.getData());
break;
default:
break;
}
}
});
}
/**
* 移除 服务 端连接
* @param client
* @param data
*/
private static void delClient(CuratorFramework client, ChildData data) {
System.out.println("zk节点监控: 移除childData : " + data);
byte[] dataData = data.getData();
UserClientHandler userClientHandler = userClientHandlerMap.get(client);
if(userClientHandler == null) {
System.out.println("zk节点监控: 服务节点: " + client + "不存在!");
}
String serverKey = new String(dataData);
if(userClientHandler.isState()) {
System.out.println("zk节点监控: 服务节点:" + serverKey + "是激活的不能被删除!!!");
} else {
userClientHandlerMap.remove(serverKey);
System.out.println("zk节点监控: delete server:" + serverKey);
}
}
/**
* 新增 服务 端连接
* @param client
* @param data
*/
private static void addClient(CuratorFramework client, ChildData data) throws InterruptedException {
System.out.println("zk节点监控: 新增ChildData: " + data);
byte[] dataData = data.getData();
String serverKey = new String(dataData);
System.out.println("zk节点监控: add server :" + serverKey);
String[] split = serverKey.split(":");
initClient(split[0], split[1]);
}
/**
* 初始化 和所有 服务端的链接 : netty 链接
* @param client
* @param path
*/
private synchronized static void initClients(CuratorFramework client, String path) {
if(userClientHandlerMap == null) {
userClientHandlerMap = new HashMap<>(64);
}
try {
List<String> list = client.getChildren().forPath(path);
//
for (String link : list) {
String allPath = path + "/" + link;
System.out.println("zk节点监控: 已注册服务节点路径: " + allPath);
byte[] bytes = client.getData().forPath(allPath);
String s = new String(bytes);
System.out.println("zk节点监控: 已注册服务节点路径的值: " + s);
String[] split = s.split(":");
//初始化netty客户 端连接
initClient(split[0], split[1]);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
*
* @param ip
* @param port
*/
private static void initClient(String ip, String port) throws InterruptedException {
//这儿的 UserClientHandler 将会被设置一个值 存放到容器当中, 然后会被取出发送客户端请求
UserClientHandler userClientHandler = new UserClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RpcEncoder(RpcRequest.class, new JsonSerializer()));
pipeline.addLast(new RpcDecoder(RpcResult.class, new JsonSerializer()));
pipeline.addLast(userClientHandler);
}
});
bootstrap.connect(ip, Integer.parseInt(port)).sync();
//打印这个存储
userClientHandlerMap.put(ip + ":" + port, userClientHandler);
System.out.println("初始化存储: 容器存储: " + (ip + ":" + port) + " = " + userClientHandler);
}
/**
*
* @return
*/
private static Integer clearClients() {
for (Map.Entry<String, UserClientHandler> key : userClientHandlerMap.entrySet()) {
UserClientHandler userClientHandler = userClientHandlerMap.get(key);
if(userClientHandler != null) {
if(!userClientHandler.isState()) {
System.out.println("zk节点监控: 从服务列表中移除服务节点:" + key);
userClientHandlerMap.remove(key);
userClientHandlerTime.remove(key);
}
}
}
return userClientHandlerMap.size();
}
//现在来发请求 --> 获取远程服务响应 (netty + zookeeper)
//只有接口, 通过代理 + 协议完成
/**
*
* @param proxyClass
* @return
*/
public Object createProxy(Class<?> proxyClass) {
Object proxyInstance = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{proxyClass},
new RpcInvokeInvocationHandler());
return proxyInstance;
}
/**
* rpc 远程服务 反射调用类
*/
private class RpcInvokeInvocationHandler implements InvocationHandler {
/**
* 代理类生成后, 在调用原方法的时候会执行这儿的逻辑 todo
* @param proxy
* @param method
* @param args
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/*
1: 初始化所有的连接
2: 初始化处理器容器
*/
if(userClientHandlerMap == null) {
init();
}
//判断当前节点是否可用 : 判断是否有可用的节点
Integer availableCount = clearClients();
if(availableCount == 0) {
throw new RuntimeException("代理: 无服务节点可用!!!");
}
//构造协议对象
RpcRequest rpcRequest = new RpcRequest();
//获取服务类名
//根据方法可以推理出来
String className = method.getDeclaringClass().getName();
String packageName = method.getDeclaringClass().getPackage().getName();
//获取类
String serviceName = StrUtil.removeAll(className, packageName + '.');
System.out.println("代理: className: " + className + "tpackageName: " + packageName + "tserviceName: " + serviceName);
rpcRequest.setRequestId(serviceName);
rpcRequest.setClassName(className);
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
rpcRequest.setParameterTypes(method.getParameterTypes());
System.out.println("代理: rpcRequest: " + rpcRequest);
LoadBalanceService loadBalanceService;
loadBalanceService = new ServiceLBRandom();
//获取服务key :
String serverKey = loadBalanceService.getService();
System.out.println("代理: serverkey: " + serverKey);
//这个key 是从服务处理容器中随机获取的
UserClientHandler userClientHandler = userClientHandlerMap.get(serverKey);
//发送消息
userClientHandler.setToSendParam(rpcRequest);
//使用线程池异步接收
long begin = DateUtil.current();
Object result = executorService.submit(userClientHandler).get();
long spendMs = DateUtil.spendMs(begin);
//记录耗时
getUserClientHandlerTime().put(serverKey, spendMs);
System.out.println("代理: call server:" + serverKey+",耗时:" + spendMs + "ms");
//返回结果
return result;
}
}
/**
*
* @return
*/
public static Map<String, Long> getUserClientHandlerTime() {
return userClientHandlerTime;
}
public static Map<String, UserClientHandler> getUserClientHandlerMap() {
return userClientHandlerMap;
}
}
UserClientHandler
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
/*
1: 异步发送
2: 线程池接收结果
3: 发送数据
4: 记录接收结果
5: 加入等待唤醒机制 wait() notify()
6: 上下文对象 Ctx ChannelHandlerContext
*/
private ChannelHandlerContext ctx;
/**
* 发送参数
*/
private Object toSendParam;
/**
* 结束结果
*/
private Object toRecvResult;
/**
* 标识当前服务器的状态
*/
private boolean state = false;
/**
* 设置发送 请求信息
* @param toSendParam
*/
public void setToSendParam(Object toSendParam) {
this.toSendParam = toSendParam;
}
/**
* 获取服务器状态 state
* @return
*/
public boolean isState() {
return state;
}
/**
* 阻塞等待结果
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
System.out.println("发送对象: " + this.toSendParam.toString());
this.ctx.writeAndFlush(this.toSendParam);
System.out.println("netty客户端: 睡眠线程(线程名) : " + Thread.currentThread().getName());
wait();
return toRecvResult;
}
/**
* 激活 与服务段连接后发生事件
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
this.state = true;
}
/**
* 与服务端 断开连接事件
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("netty客户端: 断开连接.");
this.state = false;
}
/**
* 读取 服务端发送的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(null != msg) {
RpcResult rpcResult = (RpcResult) msg;
//成功失败判断
if(rpcResult.isSuccess()) {
this.toRecvResult = rpcResult.getData();
} else {
new RuntimeException(rpcResult.getMessage());
}
}
System.out.println("netty客户端: 唤醒线程(唤醒线程名) : " + Thread.currentThread().getName());
notify();
}
}
LoadBalanceService
public interface LoadBalanceService {
/**
* 获取服务
* @return
*/
String getService();
}
ServiceLBLeastTime
public class ServiceLBLeastTime implements LoadBalanceService {
@Override
public String getService() {
Map<String, UserClientHandler> userClientHandlerMap = RpcConsumer.getUserClientHandlerMap();
Map<String, Long> userClientHandlerTime = RpcConsumer.getUserClientHandlerTime();
Set<String> services = userClientHandlerMap.keySet();
long dr=-1;
String serverKey="";
//循环得到耗时最短的服务节点
for (String service : services) {
boolean b = userClientHandlerTime.containsKey(service);
if(b) {
Long aLong = userClientHandlerTime.get(service);
if(aLong < dr || dr < 0) {
dr = aLong;
serverKey = service;
}
} else {
//一次都没执行过 说明 没有承担负载
serverKey = service;
break;
}
}
return serverKey;
}
}
ServiceLBRandom
public class ServiceLBRandom implements LoadBalanceService {
@Override
public String getService() {
Map<String, UserClientHandler> userClientHandlerMap = RpcConsumer.getUserClientHandlerMap();
String[] keys = userClientHandlerMap.keySet().toArray(new String[0]);
System.out.println("keys: " + keys);
for (String x : keys) {
System.out.println("keys : " + x);
}
int i = RandomUtil.randomInt(keys.length);
String key = keys[i];
System.out.println("随机获得的服务key : " + key);
return key;
}
public static void main(String[] args) {
HashMap<String, String> map = new HashMap<>();
map.put("1", "1");
map.put("3", "3");
map.put("2", "2");
map.put("5", "5");
String[] strings = map.keySet().toArray(new String[0]);
System.out.println("strings: " + strings);
for (String x : strings) {
System.out.println("keys : " + x);
}
/*
keys : 1
keys : 2
keys : 3
keys : 5
*/
}
}
扩展点思考
1: 如何定义一个订单服务呢?
首先我们在rpc-common工程中定义一个订单接口IOrderService, 然后我们在
rpc-provider中写好OrderServiceImpl实现类, 实现订单接口.
最后我们在消费端创建代理对象, 然后传输对应的协议对象进行请求订单服务即可.
2: 待扩展.
3: 关于负载均衡那一块可以使用策略模式进行优化,加入其它的算法,加入权重等值进行判断, 待优化.
4: 关于ip和端口的配置可以引入动态配置
遇到问题
1: netty在连接服务端后,写了如下代码, 出现了发送请求没有收到任何响应的情况
客户端这么写之后发送请求直接没有任何响应,没有任何报错,也没有任何结果.
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture();
2: notify()和wait()方法的使用必须结合sychronized关键字, 否则线程无法唤醒,或者是报非法的监视器异常: illegalMonitorException.
最后
以上就是自由手套为你收集整理的基于zk和netty的简易版RPC的全部内容,希望文章能够帮你解决基于zk和netty的简易版RPC所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复