在上一篇中,服务的地址是写死在代码中的,这很不好,对于客户端,只能去寻找这么一个服务提供者,假设这个提供者宕机了,或者换地址了,那就无法正常调用。
nacos是分布式框架里面的一个重要组件,所有的服务端在nacos中注册自己拥有的服务,客户端在进行rpc调用的时候,nacos负责返回一个可用的服务地址供客户端调用。nacos还起到负载均衡的作用,因为在部署的时候一般是分布式部署,假设某服务部署在两台服务器上,在服务调用的时候,nacos可以使用轮询的算法处理请求,不至于让某一台服务器的处理过多的远程调用。为了简单,使用单机版的nacos,当然了单机版的nacos就用不着raft算法做冗余了,nacos集群至少要三台服务器。下载好nacos并且启动,界面非常整洁。

在第二节中,把本地服务保存到本地的类叫ServiceRegistry,这里用ServiceProvider代替,而ServiceRegistry作为远程注册表使用.
定义一个服务中心通用接口:
public interface ServiceRegistry {
/**
* 将一个服务注册进注册表
*
* @param serviceName 服务名称
* @param inetSocketAddress 提供服务的地址
*/
void register(String serviceName, InetSocketAddress inetSocketAddress);
/**
* 根据服务名称查找服务实体
*
* @param serviceName 服务名称
* @return 服务实体
*/
InetSocketAddress lookupService(String serviceName);
}
借来下是nacos作为注册中心的实现,当然了,注册中心可以用zookeeper,etcd。。。nacos提供了注册的接口namingService.registerInstance和获取服务ip地址和端口号的接口namingService.getAllInstances.在获取的服务实例中,选择instances.get(0),设计到nacos负载均衡策略之后会陈述。
public class NacosServiceRegistry implements ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(NacosServiceRegistry.class);
private static final String SERVER_ADDR = "127.0.0.1:8848";
private static final NamingService namingService;
static {
try {
namingService = NamingFactory.createNamingService(SERVER_ADDR);
} catch (NacosException e) {
logger.error("连接到Nacos时有错误发生: ", e);
throw new RpcException(RpcError.FAILED_TO_CONNECT_TO_SERVICE_REGISTRY);
}
}
@Override
public void register(String serviceName, InetSocketAddress inetSocketAddress) {
try {
namingService.registerInstance(serviceName, inetSocketAddress.getHostName(), inetSocketAddress.getPort());
} catch (NacosException e) {
logger.error("注册服务时有错误发生:", e);
throw new RpcException(RpcError.REGISTER_SERVICE_FAILED);
}
}
@Override
public InetSocketAddress lookupService(String serviceName) {
try {
List<Instance> instances = namingService.getAllInstances(serviceName);
Instance instance = instances.get(0);
return new InetSocketAddress(instance.getIp(), instance.getPort());
} catch (NacosException e) {
logger.error("获取服务时有错误发生:", e);
}
return null;
}
}
RpcServer有向nacos注册服务的功能,所以对Rpcserver代码做出修改,新增一个方法publishService,用于向nacos注册服务, serviceProvider会把服务信息加入本地map中,而
serviceRegistry.register负责把服务注册到nacos中。
public NettyServer(String host, int port) {
this.host = host;
this.port = port;
serviceRegistry = new NacosServiceRegistry();
serviceProvider = new ServiceProviderImpl();
}
@Override
public <T> void publishService(Object service, Class<T> serviceClass) {
if(serializer == null) {
logger.error("未设置序列化器");
throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
}
serviceProvider.addServiceProvider(service);
serviceRegistry.register(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));
start();
}
服务发现则在RpcClient实现,之前RpcClient调用sendRequest的时候,ip地址和端口是写死的,现在ip地址和端口号从nacos中获取:
@Override
public Object sendRequest(RpcRequest rpcRequest) {
if(serializer == null) {
logger.error("未设置序列化器");
throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
}
AtomicReference<Object> result = new AtomicReference<>(null);
try {
InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName());
Channel channel = ChannelProvider.get(inetSocketAddress, serializer);
if(channel.isActive()) {
channel.writeAndFlush(rpcRequest).addListener(future1 -> {
if (future1.isSuccess()) {
logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
} else {
logger.error("发送消息时有错误发生: ", future1.cause());
}
});
channel.closeFuture().sync();
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + rpcRequest.getRequestId());
RpcResponse rpcResponse = channel.attr(key).get();
RpcMessageChecker.check(rpcRequest, rpcResponse);
result.set(rpcResponse.getData());
} else {
System.exit(0);
}
} catch (InterruptedException e) {
logger.error("发送消息时有错误发生: ", e);
}
return result.get();
}
一些其他变更:RpcServer本地调用异步化,交给线程池去调用,产生结果之后再把结果发送给客户端,对应RpcServerHandler中的改变。
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
threadPool.execute(() -> {
try {
logger.info("服务器接收到请求: {}", msg);
Object result = requestHandler.handle(msg);
ChannelFuture future = ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
future.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
}
});
}
最后
以上就是英俊发带最近收集整理的关于实现简易rpc框架-(4)使用nacos作注册中心的全部内容,更多相关实现简易rpc框架-(4)使用nacos作注册中心内容请搜索靠谱客的其他文章。
发表评论 取消回复