增加注册中心工作内容主要是服务注册、服务发现、服务下线,负载均衡。并且将服务端提供的服务用 ZkServiceProviderImpl 保存。
注册中心使用 ZooKeeper 实现,使用 Curator 框架对 ZooKeeper 进行调用。
服务注册包括注册服务的方法,将服务写到 ZooKeeper 中。
服务发现包括发现服务的方法,通过负载均衡选择服务地址。
服务下线包括删除所有服务端的服务。
负载均衡这里只实现了最简单的随机分配。
ZkServiceProviderImpl 需要使用单例模式,增加了 SingletonFactory 类。
为了方便不同的服务端之间进行区分,增加了 RpcServiceConfig 类,主要用 version 和 group 来区分。同时将之间的 Service 和 Client 类优化,提取了启动的关键代码,将其他过程放到 Client 和 Server 文件夹内。
以下是部分重要代码:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39package common.dto; import lombok.*; /** * @Author:Summer * @Data:2022/2/21 20:07 */ @AllArgsConstructor @NoArgsConstructor @Getter @Setter @Builder @ToString public class RpcServiceConfig { /** * service version */ private String version = ""; /** * when the interface has multiple implementation classes, distinguish by group */ private String group = ""; /** * target service */ private Object service; public String getRpcServiceName() { return this.getServiceName() + this.getGroup() + this.getVersion(); } public String getServiceName() { return this.service.getClass().getInterfaces()[0].getCanonicalName(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71package common.provider.impl; import common.Exceptions.RpcException; import common.dto.RpcServiceConfig; import common.provider.ServiceProvider; import common.register.ServiceRegistry; import common.register.zookeeper.ZkServiceRegistryImpl; import demo4.Server.SocketRpcServer; import lombok.extern.slf4j.Slf4j; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * @Author:Summer * @Data:2022/2/21 20:08 */ @Slf4j public class ZkServiceProviderImpl implements ServiceProvider { /** * key: rpc service name(interface name + version + group) * value: service object */ private final Map<String, Object> serviceMap; private final Set<String> registeredService; private final ServiceRegistry serviceRegistry; public ZkServiceProviderImpl() { serviceMap = new ConcurrentHashMap<>(); registeredService = ConcurrentHashMap.newKeySet(); serviceRegistry = new ZkServiceRegistryImpl(); } @Override public void addService(RpcServiceConfig rpcServiceConfig) { String rpcServiceName = rpcServiceConfig.getRpcServiceName(); if (registeredService.contains(rpcServiceName)) { return; } registeredService.add(rpcServiceName); serviceMap.put(rpcServiceName, rpcServiceConfig.getService()); log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces()); } @Override public Object getService(String rpcServiceName) { Object service = serviceMap.get(rpcServiceName); if (null == service) { throw new RpcException("SERVICE_CAN_NOT_BE_FOUND"); } return service; } @Override public void publishService(RpcServiceConfig rpcServiceConfig) { try { String host = InetAddress.getLocalHost().getHostAddress(); this.addService(rpcServiceConfig); serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, SocketRpcServer.port)); } catch (UnknownHostException e) { log.error("occur exception when getHostAddress", e); } } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23package common.register.zookeeper; import common.register.ServiceRegistry; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import java.net.InetSocketAddress; /** * @Author:Summer * @Data:2022/2/21 00:29 */ @Slf4j public class ZkServiceRegistryImpl implements ServiceRegistry { @Override public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) { String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName + inetSocketAddress.toString(); CuratorFramework zkClient = CuratorUtils.getZkClient(); CuratorUtils.createPersistentNode(zkClient, servicePath); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44package common.register.zookeeper; import common.Exceptions.RpcException; import common.dto.RpcRequest; import common.loadbalance.LoadBalance; import common.loadbalance.loadbalancer.RandomLoadBalance; import common.register.ServiceDiscovery; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import java.net.InetSocketAddress; import java.util.List; /** * @Author:Summer * @Data:2022/2/21 00:18 */ @Slf4j public class ZkServiceDiscoveryImpl implements ServiceDiscovery { private final LoadBalance loadBalance; public ZkServiceDiscoveryImpl() { //this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance"); this.loadBalance = new RandomLoadBalance(); } @Override public InetSocketAddress lookupService(RpcRequest rpcRequest) { String rpcServiceName = rpcRequest.getRpcServiceName(); CuratorFramework zkClient = CuratorUtils.getZkClient(); List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName); if (serviceUrlList == null || serviceUrlList.size() == 0) { throw new RpcException("SERVICE_CAN_NOT_BE_FOUN", rpcServiceName); } // load balancing String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest); log.info("Successfully found the service address:[{}]", targetServiceUrl); String[] socketAddressArray = targetServiceUrl.split(":"); String host = socketAddressArray[0]; int port = Integer.parseInt(socketAddressArray[1]); return new InetSocketAddress(host, port); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33package common.register.zookeeper; import demo4.Server.SocketRpcServer; import lombok.extern.slf4j.Slf4j; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; /** * @Author:Summer * @Data:2022/2/21 20:13 */ @Slf4j public class CustomShutdownHook { private static final CustomShutdownHook CUSTOM_SHUTDOWN_HOOK = new CustomShutdownHook(); public static CustomShutdownHook getCustomShutdownHook() { return CUSTOM_SHUTDOWN_HOOK; } public void clearAll() { log.info("addShutdownHook for clearAll"); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), SocketRpcServer.port); CuratorUtils.clearRegistry(CuratorUtils.getZkClient(), inetSocketAddress); } catch (UnknownHostException ignored) { } //ThreadPoolFactoryUtils.shutDownAllThreadPool(); })); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142package common.register.zookeeper; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * @Author:Summer * @Data:2022/2/20 21:44 */ @Slf4j public final class CuratorUtils { private static final int BASE_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 3; public static final String ZK_REGISTER_ROOT_PATH = "/rpc-demo"; private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>(); private static final Set<String> REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet(); private static CuratorFramework zkClient; private static final String DEFAULT_ZOOKEEPER_ADDRESS = "127.0.0.1:2181"; private CuratorUtils() { } /** * Create persistent nodes. Unlike temporary nodes, persistent nodes are not removed when the client disconnects * * @param path node path */ public static void createPersistentNode(CuratorFramework zkClient, String path) { try { if (REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) { log.info("The node already exists. The node is:[{}]", path); } else { //eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999 zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); log.info("The node was created successfully. The node is:[{}]", path); } REGISTERED_PATH_SET.add(path); } catch (Exception e) { log.error("create persistent node for path [{}] fail", path); } } /** * Gets the children under a node * * @param rpcServiceName rpc service name eg:github.javaguide.HelloServicetest2version1 * @return All child nodes under the specified node */ public static List<String> getChildrenNodes(CuratorFramework zkClient, String rpcServiceName) { if (SERVICE_ADDRESS_MAP.containsKey(rpcServiceName)) { return SERVICE_ADDRESS_MAP.get(rpcServiceName); } List<String> result = null; String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName; try { result = zkClient.getChildren().forPath(servicePath); SERVICE_ADDRESS_MAP.put(rpcServiceName, result); registerWatcher(rpcServiceName, zkClient); } catch (Exception e) { log.error("get children nodes for path [{}] fail", servicePath); } return result; } /** * Empty the registry of data */ public static void clearRegistry(CuratorFramework zkClient, InetSocketAddress inetSocketAddress) { REGISTERED_PATH_SET.stream().parallel().forEach(p -> { try { if (p.endsWith(inetSocketAddress.toString())) { zkClient.delete().forPath(p); } } catch (Exception e) { log.error("clear registry for path [{}] fail", p); } }); log.info("All registered services on the server are cleared:[{}]", REGISTERED_PATH_SET.toString()); } public static CuratorFramework getZkClient() { // check if user has set zk address //Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue()); //String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS; String zookeeperAddress = DEFAULT_ZOOKEEPER_ADDRESS; // if zkClient has been started, return directly if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) { return zkClient; } // Retry strategy. Retry 3 times, and will increase the sleep time between retries. RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); zkClient = CuratorFrameworkFactory.builder() // the server to connect to (can be a server list) .connectString(zookeeperAddress) .retryPolicy(retryPolicy) .build(); zkClient.start(); try { // wait 30s until connect to the zookeeper if (!zkClient.blockUntilConnected(30, TimeUnit.SECONDS)) { throw new RuntimeException("Time out waiting to connect to ZK!"); } } catch (InterruptedException e) { e.printStackTrace(); } return zkClient; } /** * Registers to listen for changes to the specified node * * @param rpcServiceName rpc service name eg:github.javaguide.HelloServicetest2version */ private static void registerWatcher(String rpcServiceName, CuratorFramework zkClient) throws Exception { String servicePath = ZK_REGISTER_ROOT_PATH + "/" + rpcServiceName; PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true); PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> { List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath); SERVICE_ADDRESS_MAP.put(rpcServiceName, serviceAddresses); }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); pathChildrenCache.start(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35package common.singletonfactory; import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @Author:Summer * @Data:2022/2/21 23:38 */ public final class SingletonFactory { private static final Map<String, Object> OBJECT_MAP = new ConcurrentHashMap<>(); private SingletonFactory() { } public static <T> T getInstance(Class<T> c) { if (c == null) { throw new IllegalArgumentException(); } String key = c.toString(); if (OBJECT_MAP.containsKey(key)) { return c.cast(OBJECT_MAP.get(key)); } else { return c.cast(OBJECT_MAP.computeIfAbsent(key, k -> { try { return c.getDeclaredConstructor().newInstance(); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { throw new RuntimeException(e.getMessage(), e); } })); } } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72package demo4.Client; import common.dto.RpcRequest; import common.dto.RpcResponse; import common.dto.RpcServiceConfig; import common.transport.RpcRequestTransport; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.UUID; /** * @Author:Summer * @Data:2022/2/18 11:28 */ @Slf4j public class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; /** * Used to send requests to the server.And there are two implementations: socket and netty */ private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } /** * get the proxy object */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this); } /** * This method is actually called when you use a proxy object to call a method. * The proxy object is the object you get through the getProxy method. */ @SneakyThrows @SuppressWarnings("unchecked") @Override public Object invoke(Object proxy, Method method, Object[] args) { log.info("invoked method: [{}]", method.getName()); RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()) .parameters(args) .interfaceName(method.getDeclaringClass().getName()) .paramTypes(method.getParameterTypes()) .requestId(UUID.randomUUID().toString()) .group(rpcServiceConfig.getGroup()) .version(rpcServiceConfig.getVersion()) .build(); log.info(rpcRequest.toString()); RpcResponse<Object> rpcResponse = null; rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest); return rpcResponse.getData(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72package demo4.Client; import common.dto.RpcRequest; import common.dto.RpcResponse; import common.dto.RpcServiceConfig; import common.transport.RpcRequestTransport; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.UUID; /** * @Author:Summer * @Data:2022/2/18 11:28 */ @Slf4j public class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; /** * Used to send requests to the server.And there are two implementations: socket and netty */ private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } /** * get the proxy object */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this); } /** * This method is actually called when you use a proxy object to call a method. * The proxy object is the object you get through the getProxy method. */ @SneakyThrows @SuppressWarnings("unchecked") @Override public Object invoke(Object proxy, Method method, Object[] args) { log.info("invoked method: [{}]", method.getName()); RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()) .parameters(args) .interfaceName(method.getDeclaringClass().getName()) .paramTypes(method.getParameterTypes()) .requestId(UUID.randomUUID().toString()) .group(rpcServiceConfig.getGroup()) .version(rpcServiceConfig.getVersion()) .build(); log.info(rpcRequest.toString()); RpcResponse<Object> rpcResponse = null; rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest); return rpcResponse.getData(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72package demo4.Client; import common.dto.RpcRequest; import common.dto.RpcResponse; import common.dto.RpcServiceConfig; import common.transport.RpcRequestTransport; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.UUID; /** * @Author:Summer * @Data:2022/2/18 11:28 */ @Slf4j public class RpcClientProxy implements InvocationHandler { private static final String INTERFACE_NAME = "interfaceName"; /** * Used to send requests to the server.And there are two implementations: socket and netty */ private final RpcRequestTransport rpcRequestTransport; private final RpcServiceConfig rpcServiceConfig; public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = rpcServiceConfig; } public RpcClientProxy(RpcRequestTransport rpcRequestTransport) { this.rpcRequestTransport = rpcRequestTransport; this.rpcServiceConfig = new RpcServiceConfig(); } /** * get the proxy object */ @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this); } /** * This method is actually called when you use a proxy object to call a method. * The proxy object is the object you get through the getProxy method. */ @SneakyThrows @SuppressWarnings("unchecked") @Override public Object invoke(Object proxy, Method method, Object[] args) { log.info("invoked method: [{}]", method.getName()); RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName()) .parameters(args) .interfaceName(method.getDeclaringClass().getName()) .paramTypes(method.getParameterTypes()) .requestId(UUID.randomUUID().toString()) .group(rpcServiceConfig.getGroup()) .version(rpcServiceConfig.getVersion()) .build(); log.info(rpcRequest.toString()); RpcResponse<Object> rpcResponse = null; rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest); return rpcResponse.getData(); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74package demo4.Server; import common.dto.RpcRequest; import common.dto.RpcResponse; import common.dto.RpcServiceConfig; import common.provider.ServiceProvider; import common.provider.impl.ZkServiceProviderImpl; import common.register.zookeeper.CustomShutdownHook; import common.singletonfactory.SingletonFactory; import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; /** * @Author:Summer * @Data:2022/2/21 20:04 */ @Slf4j public class SocketRpcServer { public static final int port = 9999; //private final ExecutorService threadPool; private final ServiceProvider serviceProvider; private final RpcServerHandler rpcRequestHandler; public SocketRpcServer() { //threadPool = ThreadPoolFactoryUtils.createCustomThreadPoolIfAbsent("socket-server-rpc-pool"); serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class); rpcRequestHandler = new RpcServerHandler(); } public void registerService(RpcServiceConfig rpcServiceConfig) { serviceProvider.publishService(rpcServiceConfig); } public void start() { try (ServerSocket server = new ServerSocket()) { String host = InetAddress.getLocalHost().getHostAddress(); server.bind(new InetSocketAddress(host, port)); CustomShutdownHook.getCustomShutdownHook().clearAll(); Socket socket; while ((socket = server.accept()) != null) { log.info("client connected [{}]", socket.getInetAddress()); process(socket); //threadPool.execute(new SocketRpcRequestHandlerRunnable(socket)); } //threadPool.shutdown(); } catch (IOException e) { log.error("occur IOException:", e); } catch (Exception e) { log.error("occur Exception:", e); } } public void process(Socket socket) throws Exception { try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) { RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); Object result = rpcRequestHandler.handle(rpcRequest); objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId())); objectOutputStream.flush(); } catch (IOException | ClassNotFoundException e) { log.error("occur exception:", e); } } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21package demo4; import common.Hello; import common.HelloService; import common.dto.RpcServiceConfig; import common.transport.Socket.SocketRpcRequestTransport; import demo4.Client.RpcClientProxy; /** * @Author:Summer * @Data:2022/2/21 21:53 */ public class SocketClientMain { public static void main(String[] args) { RpcClientProxy rpcClientProxy = new RpcClientProxy(new SocketRpcRequestTransport(),new RpcServiceConfig()); HelloService helloService = rpcClientProxy.getProxy(HelloService.class); String hello = helloService.hello(new Hello("111", "222")); System.out.println(hello); } }
复制代码
1
2
3
4
5
6
7
8
9
10public class SocketServerMain { public static void main(String[] args) { SocketRpcServer socketRpcServer = new SocketRpcServer(); RpcServiceConfig rpcServiceConfig = new RpcServiceConfig(); rpcServiceConfig.setService(new HelloServiceImpl()); socketRpcServer.registerService(rpcServiceConfig); socketRpcServer.start(); } }
最后
以上就是谦让毛巾最近收集整理的关于【RPC项目】3.增加注册中心的全部内容,更多相关【RPC项目】3内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复