概述
需求
- 基于Netty实现两个服务端,一个客户端,客户端通过Channel通道调用服务端
- 服务端的上线与下线,客户端能动态感知,并能重新构成负载均衡。
- 客户端选择性能好的服务器处理(响应时间短的服务器即为性能好)。Zookeeper记录客户端响应有效时间为5s,超时判定该客户端失效。
一、思路
实现思路如下:
- 服务端启动时连接zookeeper,在
/netty-server
下创建临时节点,节点格式是/ip:port
,例如:127.0.0.1:8899
,记录服务端的IP和端口。节点数据代表节点对应服务的响应时间,默认为“0”,“0”表示尚无客户端请求或服务响应数据失效。 - 客户端启动时连接zookeeper,并添加监听,然后获取
/netty-server
下的子节点,及获取可用的服务端的IP和端口,与服务端建立连接。启动定时任务。 - 客户端接收到浏览器请求,先是zookeeper上的服务端响应时间数据,获取响应时间最短的服务。——负载均衡的实现。
- 客户端请求服务端时,记录请求时间和服务端响应时长。通过定时任务定时同步服务端响应数据到zookeeper中。这里的定时任务使用分布式调度
elastic-job
。
二、实现
2.1 服务端实现
引入zookeeper客户端curator依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
</dependency>
服务启动时连接zookeeper,创建临时节点
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.connectionTimeoutMs(3000)
.sessionTimeoutMs(5000)
.namespace("netty-server")
.retryPolicy(new RetryUntilElapsed(3000, 5))
.build();
curatorFramework.start();
// 在 /netty-server 节点下创建临时节点 /ip:port,节点数据默认0
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/" + serverIp +
":" + port, LongUtils.toByteArray(-1L));
2.2 客户端实现
引入依赖,基于zookeeper的分布式调度
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
RpcClient
的initClient()
初始化Netty客户端,启动zookeeper连接添加监听,获取可的服务端列表,创建连接。
public void initClient() throws Exception {
// 启动zookeeper连接,并添加监听
curatorClient = CuratorClient.getInstance();
curatorClient.startAndListener(new PathChildrenListener(this));
// 获取可用的服务端列表
serverList = curatorClient.getRpcServers();
//1.创建线程组
group = new NioEventLoopGroup();
for (RpcServerProperties.Server server : serverList) {
// 连接服务
connectServer(server);
}
}
Curator启动和添加监听
public static CuratorClient getInstance() {
return curatorClient;
}
private CuratorFramework curatorFramework;
public void startAndListener(PathChildrenCacheListener listener) throws Exception {
startCurator();
startPathChildrenListener(listener);
}
public void startCurator() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.connectionTimeoutMs(30000)
.sessionTimeoutMs(50000)
.namespace("netty-server")
.retryPolicy(new RetryUntilElapsed(1000, 3))
.build();
curatorFramework.start();
}
/**
* 监听/netty-server的子节点路径变化
*/
public void startPathChildrenListener(PathChildrenCacheListener listener) throws Exception {
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, "/", true);
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
pathChildrenCache.getListenable().addListener(
listener, Executors.newSingleThreadExecutor());
}
监听器类,实现PathChildrenCacheListener
接口
public class PathChildrenListener implements PathChildrenCacheListener {
private RpcClient rpcClient;
public PathChildrenListener(RpcClient rpcClient) {
this.rpcClient = rpcClient;
}
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
ChildData childData = pathChildrenCacheEvent.getData();
if (childData == null) {
return;
}
switch (type) {
case CHILD_ADDED: {
// 添加节点
GetServer getServer = new GetServer(childData).invoke();
String serverIpPort = getServer.getServerIpPort();
RpcServerProperties.Server server = getServer.getServer();
System.out.println("新建连接:"+serverIpPort);
rpcClient.connectServer(server);
break;
}
case CHILD_REMOVED: { // 删除节点
GetServer getServer = new GetServer(childData).invoke();
String serverIpPort = getServer.getServerIpPort();
RpcServerProperties.Server server = getServer.getServer();
System.out.println("删除连接:" + serverIpPort);
rpcClient.removeServer(server);
break;
}
}
}
private class GetServer {
private ChildData childData;
private String serverIpPort;
private RpcServerProperties.Server server;
public GetServer(ChildData childData) {
this.childData = childData;
}
public String getServerIpPort() {
return serverIpPort;
}
public RpcServerProperties.Server getServer() {
return server;
}
public GetServer invoke() {
String path = childData.getPath();
serverIpPort = path.substring(path.lastIndexOf('/') + 1);
String[] split = serverIpPort.split(":");
server = new RpcServerProperties.Server();
server.setIp(split[0]);
server.setPort(Integer.valueOf(split[1]));
return this;
}
}
}
连接服务和移除服务,用一个Map保存服务连接,key为ip:port
,value为channel
/**
* 连接服务
*/
public void connectServer(RpcServerProperties.Server server) {
//2.创建启动助手
Bootstrap bootstrap = new Bootstrap();
//3.设置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//String类型编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
//添加客户端处理类
pipeline.addLast(rpcClientHandler);
}
});
try {
lock.lock();
System.out.println("建立连接:" + server.getKey());
Channel channel = bootstrap.connect(server.getIp(), server.getPort()).sync().channel();
channelMap.put(server.getKey(), channel);
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
lock.unlock();
throw new IllegalStateException("连接失败:" + server.getKey());
}
}
/**
* 移除server
*/
public void removeServer(RpcServerProperties.Server server) {
this.serverList = serverList.stream().filter(server1 ->
server.getPort() != server1.getPort() || !server.getIp().equals(server1.getIp())
).collect(Collectors.toList());
lock.lock();
Set<Map.Entry<String, Channel>> entries = channelMap.entrySet();
this.channelMap = entries.stream().filter(entry -> {
if (entry.getKey().equals(server.getKey())) {
entry.getValue().close();
return false;
}
return true;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
lock.unlock();
}
请求服务端,记录响应时长【这里想用Aspect切面,但是在切不进doSend()方法】
public Object doSend(RpcServerProperties.Server server, String msg) throws Exception {
String serverKey = server.getKey();
// 获取服务的channel
Channel channel = channelMap.get(serverKey);
rpcClientHandler.setRequestMsg(channel, msg);
// 开始时间
long startTime = System.currentTimeMillis();
Future submit = executorService.submit(rpcClientHandler);
Object o = submit.get();
long endTime = System.currentTimeMillis();
/* 记录服务响应信息 */
// 记录服务响应时间
curatorClient.setDataForPath("/" +server.getKey(), LongUtils.toByteArray(endTime - startTime));
return o;
}
public RpcResponse send(RpcRequest rpcRequest) throws Exception {
// 通过负载均衡获取服务
RpcServerProperties.Server server = curatorClient.balanceServer();
Object responseMsg = doSend(server, JSON.toJSONString(rpcRequest));
RpcResponse rpcResponse = JSON.parseObject(responseMsg.toString(), RpcResponse.class);
if (rpcResponse.getError() != null) {
throw new RuntimeException(rpcResponse.getError());
}
return rpcResponse;
}
分布式定时任务
public class ElasticService {
public static void startJob() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "data-elastic" +
"-job");
ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
zookeeperRegistryCenter.init();
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("elastic-job", "*/5 * * * * ?", 1).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, ReportJob.class.getName());
new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build()).init();
}
}
任务类
public class ReportJob implements SimpleJob {
@SneakyThrows
@Override
public void execute(ShardingContext shardingContext) {
CuratorClient curatorClient = CuratorClient.getInstance();
List<RpcServerProperties.Server> rpcServers = curatorClient.getRpcServers();
for (RpcServerProperties.Server rpcServer : rpcServers) {
// 获取节点的统计信息
Stat stat = curatorClient.getStatForPath("/" + rpcServer.getKey());
if ((System.currentTimeMillis() - stat.getMtime() > 5 * 1000)) {
curatorClient.setDataForPath("/" + rpcServer.getKey(), LongUtils.toByteArray(0L));
}
}
}
}
三、验证
-
启动服务端-1,端口8898
-
启动客户端
启动并获取了zookeeper上的服务信息,并建立了连接。
- 启动服务端-2,端口8899,看到客户端控制台输出如下:
客户端监听到zookeeper上有新服务,获取并连接服务。这说明服务发现是实现了的。
- 为了验证获取响应较快的服务,这里让8898端口服务执行了
Thread.sleep(random.nextInt(1000));
睡眠了不到1秒。让8899端口服务执行了Thread.sleep(random.nextInt(3000));
睡眠了不到3秒。那么8898服务端会响应更快些。
连续请求几次服务,从调用服务端口看出请求8898的次数更多些。实现的负载均衡
- 关闭8899端口服务,过一会客户端收到监听事件,将8899端口服务连接删除。实现了服务断连。
总结:
从验证结果看,
- 实现了客户端通过RPC调用服务端。
- 实现了服务上线和下线的自动连接和删除连接功能;
- 实现了负载均衡,响应块的服务接收的请求较多些。
最后
以上就是细腻胡萝卜为你收集整理的基于Netty+Zookeeper手写RPC和服务注册、感知和负载均衡需求的全部内容,希望文章能够帮你解决基于Netty+Zookeeper手写RPC和服务注册、感知和负载均衡需求所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复