概述
本系列的第二篇,加入netty心跳机制,只是简单地描述了如何监听不活跃的服务端,即自定义一个标记服务是否活跃的规则及空闲链接的监听器,在标记为不活跃后,客户端应该怎么处理,服务端再次活跃了,客户端又如何处理,先看下这个图
monitor负责监控server们的是否活跃,如果不活跃,修改zk上对应节点的值,client则监听zk上的事件,当发生变动的节点的值为不活跃,则加进不活跃的优先队列,这里设置为优先队列是用来定时去除不活跃的节的,在执行任务时,如果不活跃时间最长的节点都没有达到不活跃最长的时长,则不进行删除,如果重新活跃了,就把他加进活跃节点的list里面,现在来看看如何实现。
首先看server和monitor之间的交互,新建一个类用来表示serverchannel的状态
/**
* 用于表示serverchannel的状态类
*/
@Data
public class ChannelStatus {
//重新活跃的时间
private volatile long reActive;
//是否活跃
private volatile boolean active = true;
//持续重新活跃的次数
private AtomicInteger reActiveCount = new AtomicInteger(0);
//持续不活跃的次数
private AtomicInteger inActiveCount = new AtomicInteger(0);
//对应的channelId
private String channelId;
//不活跃的时间
private volatile long InActive;
public ChannelStatus() {
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChannelStatus that = (ChannelStatus) o;
return reActive == that.reActive &&
active == that.active &&
InActive == that.InActive &&
Objects.equals(reActiveCount, that.reActiveCount) &&
Objects.equals(inActiveCount, that.inActiveCount);
}
@Override
public int hashCode() {
return Objects.hash(reActive, active, reActiveCount, inActiveCount, InActive);
}
}
监控server的类,当不活跃时候,通知zk
/**
* @author lulu
* @Date 2019/11/18 22:29
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private final static int MAX_IN_ACTIVE_COUNT = 3;
private final static int COUNT_MINUTE = 2;
private final static int MIN_RE_ACTIVE_COUNT = 3;
//维护channelId和具体地址的map,当发生变化时对其进行删除
private ConcurrentHashMap<String, ChannelStatus> channelUrlMap;
public HeartbeatHandler(ConcurrentHashMap<String, ChannelStatus> map) {
channelUrlMap = map;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String url = msg.toString();
String id = ctx.channel().id().asShortText();
System.out.println("收到channelId:" + id + "发来信息:" + url);
ChannelStatus status;
if ((status = channelUrlMap.get(url)) == null) {
status = new ChannelStatus();
status.setChannelId(id);
channelUrlMap.put(url, status);
} else {
//如果收到不活跃的节点重连发来的信息,
if (!status.isActive()) {
//记录重连的心跳次数
System.out.println(url + "尝试重连");
int i = status.getReActiveCount().incrementAndGet();
//第一次重连的话,记录重连时间
if (i == 1) {
String s = ctx.channel().id().asShortText();
status.setChannelId(s);
status.setReActive(System.currentTimeMillis());
//如果大于最小重连心跳次数
} else if (i >= MIN_RE_ACTIVE_COUNT) {
//计算重连阶段的时间
long minute = (System.currentTimeMillis() - status.getReActive()) / (1000 * 60) + 1;
//如果大于要求的时间,则是认为活跃
if (minute >= COUNT_MINUTE) {
status.setActive(true);
status.setInActiveCount(new AtomicInteger(0));
// 通知连接池重新加入该节点
updateOrRemove(url, ctx, true, ZKConsts.REACTIVE);
}
}
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent state = (IdleStateEvent) evt;
//在一定时间内读写空闲才会关闭链接
if (state.state().equals(IdleState.ALL_IDLE)) {
String s = ctx.channel().id().asShortText();
Integer inActiveCount = 0;
ChannelStatus channelStatus = null;
String url = null;
Object[] objects = getStatusValuesByChannelId(s);
Assert.isTrue(objects != null && objects.length > 0, "该channelId没有东西");
inActiveCount = (Integer) objects[0];
channelStatus = (ChannelStatus) objects[1];
url = (String) objects[2];
if (inActiveCount == 1) {
channelStatus.setInActive(System.currentTimeMillis());
}
//1分钟内出现2次以上不活跃现象,有的话就把它去掉
long minute = (System.currentTimeMillis() - channelStatus.getInActive()) / (1000 * 60) + 1;
System.out.printf("第%s次不活跃,当前分钟%d%n", channelStatus.getInActiveCount().get(), minute);
if (inActiveCount >= MAX_IN_ACTIVE_COUNT && minute <= COUNT_MINUTE) {
System.out.println("移除不活跃的ip" + channelStatus.toString());
//设置不活跃
channelStatus.setActive(false);
updateOrRemove(url, ctx, true, ZKConsts.INACTIVE);
} else {
//重新计算,是活跃的状态
if (minute > COUNT_MINUTE) {
// System.out.println("新周期开始");
channelStatus.setActive(true);
channelStatus.setInActive(0);
channelStatus.setInActiveCount(new AtomicInteger(0));
}
}
}
}
}
/**
* 通过channelId获取server的状态信息
*
* @param channelId
* @return
*/
public Object[] getStatusValuesByChannelId(String channelId) {
Iterator<Map.Entry<String, ChannelStatus>> iterator = channelUrlMap.entrySet().iterator();
Integer inActiveCount = 0;
ChannelStatus channelStatus = null;
String url = null;
System.out.println();
while (iterator.hasNext()) {
Map.Entry<String, ChannelStatus> next = iterator.next();
ChannelStatus status = next.getValue();
if (status.getChannelId().equals(channelId)) {
channelStatus = status;
url = next.getKey();
inActiveCount = channelStatus.getInActiveCount().incrementAndGet();
return new Object[]{inActiveCount, channelStatus, url};
}
}
return null;
}
/**
* 通过ID获取地址,并删除zk上相关的,用于心跳监听的类
*
* @param ctx
*/
private void updateOrRemove(String url, ChannelHandlerContext ctx, Boolean update, String data) {
//移除不活跃的节点
RegisterForClient.getInstance().removeOrUpdate(url, update, data);
//如果不为重新唤醒,则断开连接并且做相应的通知
if (!data.equals(ZKConsts.REACTIVE)) {
channelUrlMap.get(url).setChannelId(null);
ctx.channel().close();
}
}
//当出现异常时关闭链接
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Object[] values = getStatusValuesByChannelId(ctx.channel().id().asShortText());
updateOrRemove((String) values[2], ctx, false, null);
}
}
心跳发送类
/**
* @author lulu
* @Date 2019/11/18 23:30
* 服务端的发送心跳包类
*/
@Getter
public class BeatDataSender {
//状态
private String activeStatus;
//负责定期发送心跳包的线程池
private ScheduledExecutorService service;
//失败后重连的线程池
private ScheduledExecutorService retryConnect;
private boolean reconnect = false;
public BeatDataSender(String localAddress, String remoteIp, Integer remotePort, String serviceName) {
service = Executors.newSingleThreadScheduledExecutor();
retryConnect = Executors.newSingleThreadScheduledExecutor();
this.send(localAddress, remoteIp, remotePort, serviceName);
//如果重连了尝试重新发送心跳包
retryConnect.scheduleAtFixedRate(() -> {
if (activeStatus == ZKConsts.INACTIVE) {
System.out.println("server尝试重连监控器");
send(localAddress, remoteIp, remotePort, serviceName);
activeStatus = ZKConsts.REACTIVE;
reconnect = true;
}
}, 3, 3, TimeUnit.MINUTES);
}
public void close() {
this.service.shutdown();
this.retryConnect.shutdown();
}
public void send(String localAddress, String remoteIp, Integer remotePort, String serviceName) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder())
.addLast(new StringEncoder())
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
activeStatus = ZKConsts.INACTIVE;
System.out.println("由于不活跃次数在2分钟内超过3次,链接被关闭");
ctx.channel().close();
}
});
}
})
.connect(remoteIp, remotePort).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("心跳客户端绑定" + "hostname:" + remoteIp + "remotePort:" + remotePort);
future.channel().writeAndFlush(serviceName + "@" + localAddress);
//这里只是演示心跳机制不活跃的情况下重连,普通的做法只需要定时发送本机地址即可
//进入重连状态后,就稳定发送心跳包
service.scheduleAtFixedRate(() -> {
if (future.channel().isActive()) {
if (reconnect) {
future.channel().writeAndFlush(serviceName + "@" + localAddress);
}
}
}, 30, 30, TimeUnit.SECONDS);
} else {
System.out.println("3s后重连");
TimeUnit.SECONDS.sleep(3);
//重新发送
send(localAddress, remoteIp, remotePort, serviceName);
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
这里的心跳发送类主要作用是发送心跳,当处于不活跃状态时候,就尝试重新发送
server和monitor的交互大概就到这里
接下来就是zk和client的交互了,client要为每个服务下的节点设置监听并作出相应的处理,这里采用观察者模式对zk和client的连接池进行解耦,这里做了一个代码的拆分,把client的功能(RegisterForClient)和server(RegisterForServer)的功能分开了
首先看事件发布者的接口
/**
* @author: lele
* @date: 2019/11/22 下午3:29
*/
public interface NodeChangePublisher {
/**
* 事件标识
*/
int inactive=0;
int remove=1;
int add=2;
int reactive=3;
void addListener(NodeChangeListener listener) ;
void removeListener(NodeChangeListener listener) ;
void notifyListener(int state, String path) ;
}
注册中心给client方的实现类
自定义线程工厂类,建造者模式
/**
* @author lulu
* @Date 2019/11/22 23:43
*/
public final class RpcThreadFactoryBuilder {
private String namePrefix="default";
private int priority=5;
private boolean daemon=false;
private String groupName="rpc";
public RpcThreadFactoryBuilder setNamePrefix(String namePrefix){
this.namePrefix=namePrefix;
return this;
}
public RpcThreadFactoryBuilder setPriority(int priority){
if(priority>10||priority<0){
throw new UnsupportedOperationException("线程优先级设置不正确");
}
this.priority=priority;
return this;
}
public RpcThreadFactoryBuilder setDaemon(boolean daemon){
this.daemon=daemon;
return this;
}
public RpcThreadFactoryBuilder setGroupName(String groupName){
this.groupName=groupName;
return this;
}
public ThreadFactory build(){
return new BaseThreadFactory(this);
}
/**
* 启动客户端链接的自定义线程工厂
*/
static class BaseThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
BaseThreadFactory(RpcThreadFactoryBuilder builder) {
group = new ThreadGroup(builder.groupName);
group.setDaemon(builder.daemon);
group.setMaxPriority(builder.priority);
namePrefix = builder.namePrefix+"-"+
poolNumber.getAndIncrement() +
"-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
return t;
}
}
}
,这里主要是结合线程池来提交通知任务
/**
* @author lulu
* @Date 2019/11/23 22:49
*
*/
public class RegisterForClient implements NodeChangePublisher {
private CuratorFramework client = null;
private List<PathChildrenCache> nodeListenList = new ArrayList<>();
private List<NodeChangeListener> nodeChangeListeners = new ArrayList<>();
private ThreadPoolExecutor notifyPool = new ThreadPoolExecutor(
16, 16, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1024)
, new RpcThreadFactoryBuilder().setNamePrefix("notifyPool").build()
);
private static class Holder {
private static final RegisterForClient j = new RegisterForClient();
}
public static RegisterForClient getInstance() {
return Holder.j;
}
//添加监听者
private RegisterForClient() {
client = ZkUtils.getClient();
this.addListener(new NodeChangeListener.AddServer());
this.addListener(new NodeChangeListener.RemoveServer());
this.addListener(new NodeChangeListener.InactiveServer());
this.addListener(new NodeChangeListener.ReActiveServer());
}
/**
* 获取所有的url
*
* @return
*/
public Map<String, List<URL>> getAllURL() {
Map<String, List<URL>> mapList = null;
try {
List<String> servcieList = client.getChildren().forPath("/");
mapList = new HashMap<>(servcieList.size());
for (String s : servcieList) {
//返回对应的service及其可用的url
mapList.put(s, getService(s));
//为每个服务添加监听
addListenerForService(s);
}
} catch (Exception e) {
e.printStackTrace();
}
return mapList;
}
private void addListenerForService(String serviceName) throws Exception {
//设置监听,监听所有服务下的节点变化,连接管理收到通知后移除相应的节点
final PathChildrenCache childrenCache = new PathChildrenCache(client, ZkUtils.getPath(serviceName), true);
nodeListenList.add(childrenCache);
//同步初始监听节点
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
//建立完监听
return;
}
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String path = event.getData().getPath();
notifyPool.submit(() -> {
System.out.println("删除远程服务端节点:" + path);
notifyListener(NodeChangePublisher.remove, path);
});
}
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
String path = event.getData().getPath();
notifyPool.submit(() -> {
byte[] status = event.getData().getData();
String serverStatus= new String(status);
if (serverStatus.equals(ACTIVE)) {
notifyPool.submit(() -> {
System.out.println("远程服务端上线事件:" + NodeChangePublisher.add + path);
notifyListener(NodeChangePublisher.add, path);
});
} else if (serverStatus.equals(ZKConsts.INACTIVE)) {
//失效事件
notifyPool.submit(() -> {
System.out.println("远程服务端下线事件:" + NodeChangePublisher.inactive + path);
notifyListener(NodeChangePublisher.inactive, path);
});
} else if (serverStatus.equals(ZKConsts.REACTIVE)) {
notifyPool.submit(() -> {
System.out.println("远程服务端重新上线事件:" + path);
notifyListener(NodeChangePublisher.reactive, path);
});
}
});
}
}
});
}
public List<URL> getService(String serviceName) {
List<URL> urls = null;
try {
List<String> urlList = client.getChildren().forPath(ZkUtils.getPath(serviceName));
if (urlList != null) {
urls = new ArrayList<>(urlList.size());
}
for (String s : urlList) {
String[] url = s.split(":");
urls.add(new URL(url[0], Integer.valueOf(url[1])));
}
} catch (Exception e) {
e.printStackTrace();
}
return urls;
}
//hostname:port,遍历所有interface节点,把对应的url节点去掉,或者标记为不活跃的状态
public void removeOrUpdate(String sl, Boolean update, String data) {
String[] serviceUrl = sl.split("@");
try {
String url = serviceUrl[1];
String anInterface = serviceUrl[0];
List<String> urlList = client.getChildren().forPath(ZkUtils.getPath(anInterface));
for (String s : urlList) {
if (s.equals(url)) {
if (update) {
client.setData().forPath(ZkUtils.getPath(anInterface, url), data.getBytes());
} else {
client.delete().forPath(ZkUtils.getPath(anInterface, url));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//同步模式下使用,可以当作废弃
public URL random(String serviceName) {
//通过服务名获取具体的url
try {
List<String> urlList = client.getChildren().forPath(ZkUtils.getPath(serviceName));
String[] url = urlList.get(0).split(":");
return new URL(url[0], Integer.valueOf(url[1]));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public void close() {
ZkUtils.closeZKClient(client);
nodeListenList.forEach(e -> {
try {
e.close();
} catch (IOException e1) {
e1.printStackTrace();
}
});
}
@Override
public void addListener(NodeChangeListener listener) {
nodeChangeListeners.add(listener);
}
@Override
public void removeListener(NodeChangeListener listener) {
nodeChangeListeners.remove(listener);
}
@Override
public void notifyListener(int state, String path) {
int i = path.lastIndexOf("/");
String serviceName = path.substring(1, i);
String[] split = path.substring(i + 1).split(":");
URL url = new URL(split[0], Integer.valueOf(split[1]));
for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
nodeChangeListener.change(state, url, serviceName);
}
}
}
然后到监听者类,对应服务上线,服务不活跃移除,服务再次活跃重新加入,下线这四个事件
/**
* @author: lele
* @date: 2019/11/22 下午3:26
*/
public interface NodeChangeListener {
ConnectManager connect=ConnectManager.getInstance();
//相应的处理
void change(int state,URL url,String serviceName);
class AddServer implements NodeChangeListener{
@Override
public void change(int state, URL url,String serviceName) {
if(state==NodeChangePublisher.add){
System.out.println(Thread.currentThread().getName()+"addNode的listern事件被触发");
connect.addServerAfter(url,serviceName);
}
}
}
class ReActiveServer implements NodeChangeListener{
@Override
public void change(int state, URL url, String serviceName) {
if(state==NodeChangePublisher.reactive){
System.out.println("reActive的listern事件被触发");
connect.reAddActiveURL(url,serviceName);
}
}
}
class InactiveServer implements NodeChangeListener{
@Override
public void change(int state, URL url, String serviceName) {
if(state==NodeChangePublisher.inactive){
System.out.println("InActive的listern事件被触发");
connect.addInactiveURL(url,serviceName);
}
}
}
class RemoveServer implements NodeChangeListener{
@Override
public void change(int state, URL url, String serviceName) {
if(state==NodeChangePublisher.remove){
System.out.println("RemovServer的listern事件被触发");
connect.removeURL(url,serviceName,true);
}
}
}
}
client的连接池类
/**
* @author: lele
* @date: 2019/11/21 上午11:58
* 管理连接池
* 服务端在注册后,不一定可以获得,因为还没提供服务,需要zk设置节点 状态为Active
* //todo 定时更新链接
*/
public class ConnectManager {
private Boolean isShutDown = false;
private ScheduledExecutorService removeInactiveTask;
private final Random random = new Random();
/**
* 客户端链接服务端超时时间
*/
private long connectTimeoutMillis = 6000;
/**
* 不活跃的链接存活时间,单位ms,这里表示5分钟不活跃就去掉
*/
private long maxInActiveTime = 1000 * 60 * 5;
/**
* 自定义6个线程组用于客户端服务
*/
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(6);
/**
* 标示非init状态下,addServerAfter才能起作用
*/
private CountDownLatch serverInitCountDownLatch;
/**
* 存放服务对应的访问数,用于轮询
*/
private Map<String, AtomicInteger> pollingMap = new ConcurrentHashMap<>();
/**
* 对于每个服务都有一个锁,每个锁都有一个条件队列,用于控制链接获取以及添加链接
*/
private Map<String, Object[]> serviceCondition = new ConcurrentHashMap<>();
/**
* 新增/删除链接时的锁
*/
private Map<String, ReentrantLock[]> addOrRemoveConnectionLock = new ConcurrentHashMap<>();
/**
* 新增/删除不活跃链接时的锁
*/
private Map<String, ReentrantLock[]> addOrRemoveInactiveLock = new ConcurrentHashMap<>();
/**
* 存放服务端地址和handler的关系
*/
private Map<String, List<NettyAsynHandler>> serverClientMap = new ConcurrentHashMap<>();
/**
* 存放不活跃的服务端地址和handler的关系,当活跃时添加回正式的handler
*/
private Map<String, PriorityQueue<NettyAsynHandler>> inactiveClientMap = new ConcurrentHashMap<>();
/**
* 用来初始化客户端
*/
private ThreadPoolExecutor clientBooter = new ThreadPoolExecutor(
16, 16, 600, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024)
, new RpcThreadFactoryBuilder().setNamePrefix("clientBooter").build(), new ThreadPoolExecutor.AbortPolicy());
private static class Holder {
private static final ConnectManager j = new ConnectManager();
}
private ConnectManager() {
//初始化时把所有的url加进去,这里可能没有可用链接,所以需要添加对节点的监听
Map<String, List<URL>> allURL = RegisterForClient.getInstance().getAllURL();
for (String s : allURL.keySet()) {
//为每个服务添加锁和条件队列,通过条件队列控制客户端链接获取
addLockToService(s);
}
addServerInit(allURL);
//定时清理不用的链接
removeInactiveTask = Executors.newSingleThreadScheduledExecutor();
removeInactiveTask.scheduleAtFixedRate(() -> removeInactiveURL(), 10, 10, TimeUnit.MINUTES);
}
//为每个服务添加对应的锁
private void addLockToService(String serviceName) {
ReentrantLock lock = new ReentrantLock();
Condition getConnection = lock.newCondition();
//获取可用客户端的链接及条件队列
serviceCondition.put(serviceName, new Object[]{lock, getConnection});
//为创建客户端链接添加锁
ReentrantLock addConnection = new ReentrantLock();
ReentrantLock removeConnection = new ReentrantLock();
addOrRemoveConnectionLock.put(serviceName, new ReentrantLock[]{addConnection, removeConnection});
ReentrantLock addInactive = new ReentrantLock();
ReentrantLock removeInactive = new ReentrantLock();
addOrRemoveInactiveLock.put(serviceName, new ReentrantLock[]{addInactive, removeInactive});
}
public static ConnectManager getInstance() {
return Holder.j;
}
/**
* 添加该服务对应的链接和handler
*
* @param serviceName
* @param handler 由于创建客户端链接的线程都会访问这段代码,这里也会存在并发情况,不然会导致多个server上线后,获取异常
*/
public void addConnection(String serviceName, NettyAsynHandler handler) {
ReentrantLock lock = addOrRemoveConnectionLock.get(serviceName)[0];
lock.lock();
List<NettyAsynHandler> nettyAsynHandlers;
if (!serverClientMap.containsKey(serviceName)) {
nettyAsynHandlers = new ArrayList<>();
} else {
nettyAsynHandlers = serverClientMap.get(serviceName);
}
nettyAsynHandlers.add(handler);
//添加服务名和对应的url:客户端链接
serverClientMap.put(serviceName, nettyAsynHandlers);
//如果处于初始化状态,则countdown防止新增节点事件再次新增客户端
if (serverInitCountDownLatch.getCount() != 0) {
System.out.println("连接池初始化新建客户端链接:" + handler.getUrl());
serverInitCountDownLatch.countDown();
} else {
System.out.println("连接池初始化后新建客户端链接:" + handler.getUrl());
}
//唤醒等待客户端链接的线程
signalAvailableHandler(serviceName);
lock.unlock();
}
//通过对应的负载均衡策略挑选可用客户端连接
public NettyAsynHandler chooseHandler(String serviceName,Integer mode){
List<NettyAsynHandler> handlers = mayWaitBeforeGetConnection(serviceName);
NettyAsynHandler choose = FetchPolicy.getPolicyMap().get(mode).choose(serviceName, handlers);
return choose;
}
//等待可用的客户端连接
private List<NettyAsynHandler> mayWaitBeforeGetConnection(String serviceName) {
List<NettyAsynHandler> nettyAsynHandlers = serverClientMap.get(serviceName);
int size = 0;
//先尝试获取
if (nettyAsynHandlers != null) {
size = nettyAsynHandlers.size();
}
//不行就自选等待
while (!isShutDown && size <= 0) {
try {
//自旋等待可用服务出现,因为客户端与服务链接需要一定的时间,如果直接返回会出现空指针异常
boolean available = waitingForHandler(serviceName);
if (available) {
nettyAsynHandlers = serverClientMap.get(serviceName);
}
} catch (InterruptedException e) {
throw new RuntimeException("出错", e);
}
}
return nettyAsynHandlers;
}
/**
* 等待一定时间,等handler和相应的server建立建立链接,用条件队列控制
*
* @param serviceName
* @return
* @throws InterruptedException
*/
private boolean waitingForHandler(String serviceName) throws InterruptedException {
Object[] objects = serviceCondition.get(serviceName);
ReentrantLock lock = (ReentrantLock) objects[0];
lock.lock();
Condition condition = (Condition) objects[1];
try {
return condition.await(this.connectTimeoutMillis, TimeUnit.MILLISECONDS);
} finally {
lock.unlock();
}
}
/**
* 去掉所有与该url链接的客户端,并且关闭客户端链接
*
* @param url
*/
public void removeURL(URL url, String serviceName, boolean close) {
ReentrantLock lock = addOrRemoveConnectionLock.get(serviceName)[1];
lock.lock();
NettyAsynHandler target = null;
//倒序遍历删除对应的handler
List<NettyAsynHandler> nettyAsynHandlers = serverClientMap.get(serviceName);
for (int i = nettyAsynHandlers.size() - 1; i >= 0; i--) {
if ((target = nettyAsynHandlers.get(i)).getUrl().equals(url)) {
nettyAsynHandlers.remove(i);
if (close) {
target.close();
}
}
}
System.out.println("active:" + serverClientMap.get(serviceName).toString());
lock.unlock();
}
/**
* 定时清除不活跃的链接
*/
public void removeInactiveURL() {
/**
* 移除不活跃列表
*/
Collection<PriorityQueue<NettyAsynHandler>> values = inactiveClientMap.values();
Iterator<PriorityQueue<NettyAsynHandler>> iterator = values.iterator();
while (iterator.hasNext()) {
PriorityQueue<NettyAsynHandler> list = iterator.next();
//遍历所有客户端并根据超时时间删除
NettyAsynHandler target;
long current = System.currentTimeMillis();
while ((current - (target = list.peek()).getInActiveTime()) > maxInActiveTime) {
list.poll();
target.close();
}
}
}
/**
* 去掉可用的服务,把他加入到不活跃的列表
* 由于是通过线程异步操作,可能存在并发问题
*
* @param url
*/
public void addInactiveURL(URL url, String serviceName) {
ReentrantLock lock = addOrRemoveInactiveLock.get(serviceName)[0];
lock.lock();
System.out.println("不活跃链接加入_" + url.toString());
List<NettyAsynHandler> nettyAsynHandlers = serverClientMap.get(serviceName);
NettyAsynHandler inActive = null;
for (NettyAsynHandler nettyAsynHandler : nettyAsynHandlers) {
if (nettyAsynHandler.getUrl().equals(url)) {
nettyAsynHandler.setInActiveTime(System.currentTimeMillis());
inActive = nettyAsynHandler;
break;
}
}
PriorityQueue<NettyAsynHandler> inActiveHandlers = null;
if ((inActiveHandlers = inactiveClientMap.get(serviceName)) == null) {
inActiveHandlers = new PriorityQueue<>();
}
inActiveHandlers.offer(inActive);
inactiveClientMap.put(serviceName, inActiveHandlers);
System.out.println("inactive:" + inactiveClientMap.get(serviceName).toString());
lock.unlock();
//删除url
removeURL(url, serviceName, false);
}
/**
* 重新添加进活跃队列
*
* @param url
* @param serviceName
*/
public void reAddActiveURL(URL url, String serviceName) {
ReentrantLock lock = addOrRemoveInactiveLock.get(serviceName)[1];
lock.lock();
PriorityQueue<NettyAsynHandler> list;
if ((list = inactiveClientMap.get(serviceName)) != null) {
Iterator<NettyAsynHandler> iterator = list.iterator();
NettyAsynHandler nettyAsynHandler;
while (iterator.hasNext()) {
nettyAsynHandler = iterator.next();
if (nettyAsynHandler.getUrl().equals(url)) {
nettyAsynHandler.setInActiveTime(0);
addConnection(serviceName, nettyAsynHandler);
list.remove(nettyAsynHandler);
System.out.printf("%s服务下的%s重新添加进活跃队列%n", serviceName, nettyAsynHandler.toString());
break;
}
}
}
lock.unlock();
}
/**
* 释放对应服务的条件队列,代表有客户端链接可用了
*
* @param serviceName
*/
private void signalAvailableHandler(String serviceName) {
Object[] objects = serviceCondition.get(serviceName);
ReentrantLock lock = (ReentrantLock) objects[0];
lock.lock();
Condition condition = (Condition) objects[1];
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
/**
* 添加server,并启动对应的服务器
*
* @param allURL
*/
public void addServerInit(Map<String, List<URL>> allURL) {
Collection<List<URL>> values = allURL.values();
Iterator<List<URL>> iterator = values.iterator();
int res = 0;
while (iterator.hasNext()) {
List<URL> next = iterator.next();
res += next.size();
}
serverInitCountDownLatch = new CountDownLatch(res);
for (String s : allURL.keySet()) {
pollingMap.put(s, new AtomicInteger(0));
List<URL> urls = allURL.get(s);
for (URL url : urls) {
//提交创建任务
clientBooter.submit(new Runnable() {
@Override
public void run() {
createClient(s, eventLoopGroup, url);
}
});
}
}
}
/**
* 当新节点出现后添加,但这里有个隐患,就是当client尚未监听完所有节点时
* addServerAfter是不允许操作的
*
* @param url
* @param serviceName
*/
public void addServerAfter(URL url, String serviceName) {
if (serverInitCountDownLatch.getCount() == 0) {
//如果还没监听完,就不可以加链接
List<NettyAsynHandler> list = null;
if ((list = serverClientMap.get(serviceName)) == null) {
list = new ArrayList<>();
serverClientMap.put(serviceName, list);
addLockToService(serviceName);
} else {
boolean exists = list.stream().filter(e -> e.getUrl().equals(url)).findFirst().isPresent();
if (exists) {
return;
}
}
clientBooter.submit(new Runnable() {
@Override
public void run() {
createClient(serviceName, eventLoopGroup, url);
}
});
}
}
/**
* 创建客户端,持久化链接
*
* @param serviceName
* @param eventLoopGroup
* @param url
*/
public void createClient(String serviceName, EventLoopGroup eventLoopGroup, URL url) {
System.out.println(Thread.currentThread().getName() + "准备新建客户端");
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler((new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//把request实体变为字节
.addLast(new RpcEncoder(RpcRequest.class))
//把返回的response字节变为对象
.addLast(new RpcDecoder(RpcResponse.class))
.addLast(new NettyAsynHandler(url));
}
}));
ChannelFuture channelFuture = b.connect(url.getHostname(), url.getPort());
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
//链接成功后的操作,把相应的url地址和客户端链接存入
if (channelFuture.isSuccess()) {
NettyAsynHandler handler = channelFuture.channel().pipeline().get(NettyAsynHandler.class);
addConnection(serviceName, handler);
}
}
});
}
/**
* 关闭方法,关闭每个客户端链接,释放所有锁,关掉创建链接的线程池,和客户端的处理器
*/
public void stop() {
isShutDown = true;
serverClientMap.values().forEach(e -> e.forEach(k -> k.close()));
inactiveClientMap.values().forEach(e -> e.forEach(k -> k.close()));
for (String s : serviceCondition.keySet()) {
signalAvailableHandler(s);
}
clientBooter.shutdown();
eventLoopGroup.shutdownGracefully();
}
}
连接池基本都有注释,所以这里不做过多讲解,下面到负载均衡策略
这里定义了四个:随机、轮询、权重随机、最少请求,具体实现
/**
* @author: lele
* @date: 2019/11/22 下午3:24
* 获取链接机制,轮询、随机、权重
*/
public interface FetchPolicy {
Random RANDOM = new Random();
int random = 1;
int polling = 2;
int weight = 3;
int bestRequest = 4;
//策略类
Map<Integer, FetchPolicy> policyMap = new HashMap<>();
static Map<Integer, FetchPolicy> getPolicyMap() {
policyMap.put(random, new RandomFetch());
policyMap.put(polling, new PollingFetch());
policyMap.put(weight, new WeightFetch());
policyMap.put(bestRequest, new BestRequestFetch());
return policyMap;
}
NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers);
class WeightFetch implements FetchPolicy {
@Override
public NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers) {
int length = handlers.size();
//总权重
int totalWeight = 0;
//是否权重一致
boolean sameWeight = true;
//先把所有权重加起来,并且判断权重是否一致
for (int i = 0; i < length; i++) {
int weight = handlers.get(i).getWeight();
totalWeight += weight;
if (sameWeight && i > 0
&& weight != handlers.get(i - 1).getWeight()) {
sameWeight = false;
}
}
//不断减去对应权重所在的区间
if (totalWeight > 0 && !sameWeight) {
int offset = RANDOM.nextInt(totalWeight);
for (int i = 0; i < length; i++) {
offset -= handlers.get(i).getWeight();
if (offset < 0) {
return handlers.get(i);
}
}
}
// 如果权重都一样,则轮询返回
return FetchPolicy.getPolicyMap().get(polling).choose(serviceName,handlers);
}
}
/**
* 主要通过NettyAsynHandler的requestCount属性挑取最小请求的handler进行返回
*/
class BestRequestFetch implements FetchPolicy {
@Override
public NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers) {
int minRequest = Integer.MAX_VALUE;
NettyAsynHandler res = null;
for (NettyAsynHandler handler : handlers) {
if (handler.getRequestCount().get() < minRequest) {
res = handler;
}
}
if(res==null){
// 如果找不到,则轮询返回
return FetchPolicy.getPolicyMap().get(polling).choose(serviceName,handlers);
}
return res;
}
}
/**
* 记录每个服务对应的请求次数,并返回对应的handler
*/
class PollingFetch implements FetchPolicy {
private static Map<String, AtomicInteger> pollingMap = new ConcurrentHashMap<>();
@Override
public NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers) {
if (pollingMap.get(serviceName) == null) {
pollingMap.put(serviceName, new AtomicInteger(0));
}
int next = pollingMap.get(serviceName).incrementAndGet();
int index = RANDOM.nextInt(next);
return handlers.get(index);
}
}
/**
* 随机
*/
class RandomFetch implements FetchPolicy {
@Override
public NettyAsynHandler choose(String serviceName, List<NettyAsynHandler> handlers) {
int index = RANDOM.nextInt(handlers.size());
//取出相应的handler
NettyAsynHandler nettyAsynHandler = handlers.get(index-1);
return nettyAsynHandler;
}
}
}
然后spring扫描相关的注解和代理工厂要做相关的改动
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
//用于接口上,name为服务名,zk则在注册服务改为 服务名/ip,服务端通过传来的接口名通过反射获取类,或者通过给spring托管获取其class
public @interface RpcStudyClient {
String name();
//结果返回是异步还是同步模式
int mode() default sync;
int fetch() default FetchPolicy.polling;
int sync=0;
int asyn=1;
}
其他的相关改动
//代理工厂的改动
RpcRequest rpcRequest = new RpcRequest(requestId, interfaceClass.getName(), method.getName(), args, method.getParameterTypes(), annotation.mode());
//发送请求
//这里的管理连接池通过服务名去访问zk,获取可用的url
RpcFuture res = protocol.sendFuture(annotation.fetch(),annotation.name(), rpcRequest);
//发送请求的改动
public RpcFuture sendFuture(int fetch,String serviceName, RpcRequest request) {
NettyAsynHandler handler = ConnectManager.getInstance().chooseHandler(serviceName,fetch);
RpcFuture future = handler.sendRequest(request);
return future;
}
//连接池改动
public NettyAsynHandler chooseHandler(String serviceName,Integer mode){
List<NettyAsynHandler> handlers = mayWaitBeforeGetConnection(serviceName);
NettyAsynHandler choose = FetchPolicy.getPolicyMap().get(mode).choose(serviceName, handlers);
return choose;
}
心跳机制演示
这里客户端启动、监控器启动、服务端启动
然后服务端与监控器建立连接后,监控器对不活跃的节点进行记录,然后发送到zk端,client通过zk对相关的客户端链接加入到不活跃节点,而server则尝试重连,当监控器觉得该节点活跃了,就发到zk端让他通知client把他加入到活跃队列
监控器:
后来活跃了
client:
服务端
然后演示不活跃节点移除,这里我直接在zk上修改该server为不活跃,看一段时间后有无移除该服务,这里把检查任务的间隙调小一点,
执行 set /register/user/DESKTOP-QU1B3IU:8081 0 后,客户端把他加入不活跃链接,8082也是同样的操作,因为8081比8082下线时间早,所以先移除8081
负载均衡的就不演示了,有兴趣可以自己尝试,手写RPC系列大概就到这里,后续如果有更新就在github上面提交了,经过这段时间的代码编写,从一开始模仿别人的,到根据实际的情况把自己的想法写进去,还是学到挺多东西的,并发,负载均衡实现,spring相关的注解用法,zk、netty一些简单应用,项目地址:https://github.com/97lele/rpcstudy
本系列参考资料:
关于netty相关的可以查看相关书籍或博客,书籍比较全一点
整体结构:https://www.bilibili.com/video/av75673208
连接池和异步:https://www.cnblogs.com/luxiaoxun/p/5272384.html
spring仿feign:https://github.com/admin801122/RPCServer
心跳简介:https://www.cnblogs.com/demingblog/p/9957143.html
负载均衡:https://blog.csdn.net/wudiyong22/article/details/80829808
最后
以上就是虚拟飞鸟为你收集整理的基于netty、zookeeper手写RPC框架之五——心跳机制优化及添加负载均衡的全部内容,希望文章能够帮你解决基于netty、zookeeper手写RPC框架之五——心跳机制优化及添加负载均衡所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复