概述
增加注册中心工作内容主要是服务注册、服务发现、服务下线,负载均衡。并且将服务端提供的服务用 ZkServiceProviderImpl 保存。
注册中心使用 ZooKeeper 实现,使用 Curator 框架对 ZooKeeper 进行调用。
服务注册包括注册服务的方法,将服务写到 ZooKeeper 中。
服务发现包括发现服务的方法,通过负载均衡选择服务地址。
服务下线包括删除所有服务端的服务。
负载均衡这里只实现了最简单的随机分配。
ZkServiceProviderImpl 需要使用单例模式,增加了 SingletonFactory 类。
为了方便不同的服务端之间进行区分,增加了 RpcServiceConfig 类,主要用 version 和 group 来区分。同时将之间的 Service 和 Client 类优化,提取了启动的关键代码,将其他过程放到 Client 和 Server 文件夹内。
以下是部分重要代码:
package common.dto;
import lombok.*;
/**
* @Author:Summer
* @Data:2022/2/21 20:07
*/
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class RpcServiceConfig {
/**
* service version
*/
private String version = "";
/**
* when the interface has multiple implementation classes, distinguish by group
*/
private String group = "";
/**
* target service
*/
private Object service;
public String getRpcServiceName() {
return this.getServiceName() + this.getGroup() + this.getVersion();
}
public String getServiceName() {
return this.service.getClass().getInterfaces()[0].getCanonicalName();
}
}
package common.provider.impl;
import common.Exceptions.RpcException;
import common.dto.RpcServiceConfig;
import common.provider.ServiceProvider;
import common.register.ServiceRegistry;
import common.register.zookeeper.ZkServiceRegistryImpl;
import demo4.Server.SocketRpcServer;
import lombok.extern.slf4j.Slf4j;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author:Summer
* @Data:2022/2/21 20:08
*/
@Slf4j
public class ZkServiceProviderImpl implements ServiceProvider {
/**
* key: rpc service name(interface name + version + group)
* value: service object
*/
private final Map<String, Object> serviceMap;
private final Set<String> registeredService;
private final ServiceRegistry serviceRegistry;
public ZkServiceProviderImpl() {
serviceMap = new ConcurrentHashMap<>();
registeredService = ConcurrentHashMap.newKeySet();
serviceRegistry = new ZkServiceRegistryImpl();
}
@Override
public void addService(RpcServiceConfig rpcServiceConfig) {
String rpcServiceName = rpcServiceConfig.getRpcServiceName();
if (registeredService.contains(rpcServiceName)) {
return;
}
registeredService.add(rpcServiceName);
serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());
}
@Override
public Object getService(String rpcServiceName) {
Object service = serviceMap.get(rpcServiceName);
if (null == service) {
throw new RpcException("SERVICE_CAN_NOT_BE_FOUND");
}
return service;
}
@Override
public void publishService(RpcServiceConfig rpcServiceConfig) {
try {
String host = InetAddress.getLocalHost().getHostAddress();
this.addService(rpcServiceConfig);
serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, SocketRpcServer.port));
} catch (UnknownHostException e) {
log.error("occur exception when getHostAddress", e);
}
}
}
package common.register.zookeeper;
import common.register.ServiceRegistry;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import java.net.InetSocketAddress;
/**
* @Author:Summer
* @Data:2022/2/21 00:29
*/
@Slf4j
public class ZkServiceRegistryImpl implements ServiceRegistry {
@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString();
CuratorFramework zkClient = CuratorUtils.getZkClient();
CuratorUtils.createPersistentNode(zkClient, servicePath);
}
}
package common.register.zookeeper;
import common.Exceptions.RpcException;
import common.dto.RpcRequest;
import common.loadbalance.LoadBalance;
import common.loadbalance.loadbalancer.RandomLoadBalance;
import common.register.ServiceDiscovery;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @Author:Summer
* @Data:2022/2/21 00:18
*/
@Slf4j
public class ZkServiceDiscoveryImpl implements ServiceDiscovery {
private final LoadBalance loadBalance;
public ZkServiceDiscoveryImpl() {
//this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance");
this.loadBalance = new RandomLoadBalance();
}
@Override
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
String rpcServiceName = rpcRequest.getRpcServiceName();
CuratorFramework zkClient = CuratorUtils.getZkClient();
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
if (serviceUrlList == null || serviceUrlList.size() == 0) {
throw new RpcException("SERVICE_CAN_NOT_BE_FOUN", rpcServiceName);
}
// load balancing
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
log.info("Successfully found the service address:[{}]", targetServiceUrl);
String[] socketAddressArray = targetServiceUrl.split(":");
String host = socketAddressArray[0];
int port = Integer.parseInt(socketAddressArray[1]);
return new InetSocketAddress(host, port);
}
}
package common.register.zookeeper;
import demo4.Server.SocketRpcServer;
import lombok.extern.slf4j.Slf4j;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
/**
* @Author:Summer
* @Data:2022/2/21 20:13
*/
@Slf4j
public class CustomShutdownHook {
private static final CustomShutdownHook CUSTOM_SHUTDOWN_HOOK = new CustomShutdownHook();
public static CustomShutdownHook getCustomShutdownHook() {
return CUSTOM_SHUTDOWN_HOOK;
}
public void clearAll() {
log.info("addShutdownHook for clearAll");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), SocketRpcServer.port);
CuratorUtils.clearRegistry(CuratorUtils.getZkClient(), inetSocketAddress);
} catch (UnknownHostException ignored) {
}
//ThreadPoolFactoryUtils.shutDownAllThreadPool();
}));
}
}
package common.register.zookeeper;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @Author:Summer
* @Data:2022/2/20 21:44
*/
@Slf4j
public final class CuratorUtils {
private static final int BASE_SLEEP_TIME = 1000;
private static final int MAX_RETRIES = 3;
public static final String ZK_REGISTER_ROOT_PATH = "/rpc-demo";
private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();
private static final Set<String> REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet();
private static CuratorFramework zkClient;
private static final String DEFAULT_ZOOKEEPER_ADDRESS = "127.0.0.1:2181";
private CuratorUtils() {
}
/**
* Create persistent nodes. Unlike temporary nodes, persistent nodes are not removed when the client disconnects
*
* @param path node path
*/
public static void createPersistentNode(CuratorFramework zkClient, String path) {
try {
if (REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) {
log.info("The node already exists. The node is:[{}]", path);
} else {
//eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
log.info("The node was created successfully. The node is:[{}]", path);
}
REGISTERED_PATH_SET.add(path);
} catch (Exception e) {
log.error("create persistent node for path [{}] fail", path);
}
}
/**
* Gets the children under a node
*
* @param rpcServiceName rpc service name eg:github.javaguide.HelloServicetest2version1
* @return All child nodes under the specified node
*/
public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) {
if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) {
return SERVICE_ADDRESS_MAP.get(rpcServiceName);
}
List<String> result = null;
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
try {
result = zkClient.getChildren().forPath(servicePath);
SERVICE_ADDRESS_MAP.put(rpcServiceName, result);
registerWatcher(rpcServiceName, zkClient);
} catch (Exception e) {
log.error("get children nodes for path [{}] fail", servicePath);
}
return result;
}
/**
* Empty the registry of data
*/
public static void clearRegistry(CuratorFramework zkClient, InetSocketAddress inetSocketAddress) {
REGISTERED_PATH_SET.stream().parallel().forEach(p -> {
try {
if (p.endsWith(inetSocketAddress.toString())) {
zkClient.delete().forPath(p);
}
} catch (Exception e) {
log.error("clear registry for path [{}] fail", p);
}
});
log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString());
}
public static CuratorFramework getZkClient() {
// check if user has set zk address
//Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
//String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
String zookeeperAddress = DEFAULT_ZOOKEEPER_ADDRESS;
// if zkClient has been started, return directly
if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) {
return zkClient;
}
// Retry strategy. Retry 3 times, and will increase the sleep time between retries.
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.builder()
// the server to connect to (can be a server list)
.connectString(zookeeperAddress)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
try {
// wait 30s until connect to the zookeeper
if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) {
throw new RuntimeException("Time out waiting to connect to ZK!");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return zkClient;
}
/**
* Registers to listen for changes to the specified node
*
* @param rpcServiceName rpc service name eg:github.javaguide.HelloServicetest2version
*/
private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception {
String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName;
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses);
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start();
}
}
package common.singletonfactory;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author:Summer
* @Data:2022/2/21 23:38
*/
public final class SingletonFactory {
private static final Map<String, Object> OBJECT_MAP = new ConcurrentHashMap<>();
private SingletonFactory() {
}
public static <T> T getInstance(Class<T> c) {
if (c == null) {
throw new IllegalArgumentException();
}
String key = c.toString();
if (OBJECT_MAP.containsKey(key)) {
return c.cast(OBJECT_MAP.get(key));
} else {
return c.cast(OBJECT_MAP.computeIfAbsent(key, k -> {
try {
return c.getDeclaredConstructor().newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e.getMessage(), e);
}
}));
}
}
}
package demo4.Client;
import common.dto.RpcRequest;
import common.dto.RpcResponse;
import common.dto.RpcServiceConfig;
import common.transport.RpcRequestTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
/**
* @Author:Summer
* @Data:2022/2/18 11:28
*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {
private static final String INTERFACE_NAME = "interfaceName";
/**
* Used to send requests to the server.And there are two implementations: socket and netty
*/
private final RpcRequestTransport rpcRequestTransport;
private final RpcServiceConfig rpcServiceConfig;
public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = rpcServiceConfig;
}
public RpcClientProxy(RpcRequestTransport rpcRequestTransport) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = new RpcServiceConfig();
}
/**
* get the proxy object
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
/**
* This method is actually called when you use a proxy object to call a method.
* The proxy object is the object you get through the getProxy method.
*/
@SneakyThrows
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
log.info(rpcRequest.toString());
RpcResponse<Object> rpcResponse = null;
rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
return rpcResponse.getData();
}
}
package demo4.Client;
import common.dto.RpcRequest;
import common.dto.RpcResponse;
import common.dto.RpcServiceConfig;
import common.transport.RpcRequestTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
/**
* @Author:Summer
* @Data:2022/2/18 11:28
*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {
private static final String INTERFACE_NAME = "interfaceName";
/**
* Used to send requests to the server.And there are two implementations: socket and netty
*/
private final RpcRequestTransport rpcRequestTransport;
private final RpcServiceConfig rpcServiceConfig;
public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = rpcServiceConfig;
}
public RpcClientProxy(RpcRequestTransport rpcRequestTransport) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = new RpcServiceConfig();
}
/**
* get the proxy object
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
/**
* This method is actually called when you use a proxy object to call a method.
* The proxy object is the object you get through the getProxy method.
*/
@SneakyThrows
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
log.info(rpcRequest.toString());
RpcResponse<Object> rpcResponse = null;
rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
return rpcResponse.getData();
}
}
package demo4.Client;
import common.dto.RpcRequest;
import common.dto.RpcResponse;
import common.dto.RpcServiceConfig;
import common.transport.RpcRequestTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
/**
* @Author:Summer
* @Data:2022/2/18 11:28
*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {
private static final String INTERFACE_NAME = "interfaceName";
/**
* Used to send requests to the server.And there are two implementations: socket and netty
*/
private final RpcRequestTransport rpcRequestTransport;
private final RpcServiceConfig rpcServiceConfig;
public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = rpcServiceConfig;
}
public RpcClientProxy(RpcRequestTransport rpcRequestTransport) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceConfig = new RpcServiceConfig();
}
/**
* get the proxy object
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
/**
* This method is actually called when you use a proxy object to call a method.
* The proxy object is the object you get through the getProxy method.
*/
@SneakyThrows
@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
log.info("invoked method: [{}]", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
log.info(rpcRequest.toString());
RpcResponse<Object> rpcResponse = null;
rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
return rpcResponse.getData();
}
}
package demo4.Server;
import common.dto.RpcRequest;
import common.dto.RpcResponse;
import common.dto.RpcServiceConfig;
import common.provider.ServiceProvider;
import common.provider.impl.ZkServiceProviderImpl;
import common.register.zookeeper.CustomShutdownHook;
import common.singletonfactory.SingletonFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Author:Summer
* @Data:2022/2/21 20:04
*/
@Slf4j
public class SocketRpcServer {
public static final int port = 9999;
//private final ExecutorService threadPool;
private final ServiceProvider serviceProvider;
private final RpcServerHandler rpcRequestHandler;
public SocketRpcServer() {
//threadPool = ThreadPoolFactoryUtils.createCustomThreadPoolIfAbsent("socket-server-rpc-pool");
serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
rpcRequestHandler = new RpcServerHandler();
}
public void registerService(RpcServiceConfig rpcServiceConfig) {
serviceProvider.publishService(rpcServiceConfig);
}
public void start() {
try (ServerSocket server = new ServerSocket()) {
String host = InetAddress.getLocalHost().getHostAddress();
server.bind(new InetSocketAddress(host, port));
CustomShutdownHook.getCustomShutdownHook().clearAll();
Socket socket;
while ((socket = server.accept()) != null) {
log.info("client connected [{}]", socket.getInetAddress());
process(socket);
//threadPool.execute(new SocketRpcRequestHandlerRunnable(socket));
}
//threadPool.shutdown();
} catch (IOException e) {
log.error("occur IOException:", e);
} catch (Exception e) {
log.error("occur Exception:", e);
}
}
public void process(Socket socket) throws Exception {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = rpcRequestHandler.handle(rpcRequest);
objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId()));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
log.error("occur exception:", e);
}
}
}
package demo4;
import common.Hello;
import common.HelloService;
import common.dto.RpcServiceConfig;
import common.transport.Socket.SocketRpcRequestTransport;
import demo4.Client.RpcClientProxy;
/**
* @Author:Summer
* @Data:2022/2/21 21:53
*/
public class SocketClientMain {
public static void main(String[] args) {
RpcClientProxy rpcClientProxy = new RpcClientProxy(new SocketRpcRequestTransport(),new RpcServiceConfig());
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
}
}
public class SocketServerMain {
public static void main(String[] args) {
SocketRpcServer socketRpcServer = new SocketRpcServer();
RpcServiceConfig rpcServiceConfig = new RpcServiceConfig();
rpcServiceConfig.setService(new HelloServiceImpl());
socketRpcServer.registerService(rpcServiceConfig);
socketRpcServer.start();
}
}
最后
以上就是谦让毛巾为你收集整理的【RPC项目】3.增加注册中心的全部内容,希望文章能够帮你解决【RPC项目】3.增加注册中心所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复