我是靠谱客的博主 虚幻画笔,最近开发中收集的这篇文章主要介绍netty客户端引发的线程血案(三)netty客户端引发的线程血案(三),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

netty客户端引发的线程血案(三)

前言

前文讲述了netty 3.10.5客户端的实现,我们知道了为什么每次递增一定数量的线程,这个跟客户端的线程池的实现机制有很大关系,本文,我们梳理一下3.2.4的客户端流程,寻找一下3.2.4版本不存在线程雪崩的原因。

netty客户端

我们开始先从引导类ClientBootstrap开始我们的旅程,在实例化ClientBootstrap时,我们需要构建channelFactory,代码见下:


// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

可以看到构建了2个newCachedThreadPool线程池,分别是boss和work线程池,work线程池的大小为CPU核数*2,初始的时候,线程池中没有线程,这里跟3.10.5有很大区别,3.10.5在初始化的时候,就把需要的boss线程和work线程都启动起来了。
bootstrap.connect(new InetSocketAddress(host, port))中创建channel和3.10.5基本一致,不在详述,重点看一下ChannelState.CONNECTED触发的连接操作,代码:

 private void connect(
final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
try {
if (channel.socket.connect(remoteAddress)) {
channel.worker.register(channel, cf);
} else {
//走此分支
channel.getCloseFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
throws Exception {
if (!cf.isDone()) {
cf.setFailure(new ClosedChannelException());
}
}
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
//调用boss.register
boss.register(channel);
}
} catch (Throwable t) {
cf.setFailure(t);
fireExceptionCaught(channel, t);
channel.worker.close(channel, succeededFuture(channel));
}
}

boss.register(channel)用来创建select,将select注册到channel上,然后select监听key,监听到事件后,按照下面调用流程处理:
processSelectedKeys(selector.selectedKeys())-> connect(k)->ch.worker.register(ch, ch.connectFuture)
在register中会创建select,创建一个work线程放到线程池中,然后启动线程,执行 SelectorUtil.select(selector)操作,等待异步事件,代码见下:
register

 void register(NioSocketChannel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
Runnable registerTask = new RegisterTask(channel, future, server);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
//创建selector
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
boolean success = false;
try {
//创建线程
DeadLockProofWorker.start(
executor, new ThreadRenamingRunnable(this, threadName));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}

work thread主要负责监听异步channel事件,监听到进行处理

public void run() {
thread = Thread.currentThread();
boolean shutdown = false;
Selector selector = this.selector;
for (;;) {
wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector
// from acquiring its lock.
selectorGuard.writeLock().unlock();
}
try {
//监听事件
SelectorUtil.select(selector);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
//
'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
//
'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
//处理
cancelledKeys = 0;
processRegisterTaskQueue();
processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
//当channel被关闭后,会在 SelectorUtil.select(selector)将置为空,进过条件判断,将shutdown置为true,下次进入时,会进行线程清理工作,退出线程
if (selector.keys().isEmpty()) {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
} else {
// Give one more second.
shutdown = true;
}
} else {
shutdown = false;
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}

通过源码可以看到work thread的退出跟selector.keys().isEmpty()息息相关,keys()很关键,我们看一下Selector对象维护的几个集合:

  1. keys() 当前所有向Selector注册的SelectionKey的集合,不能直接更改,只能通过cancelled和channel deregistered操作
  2. selectedKeys() 被select捕获的SelectionKey集合
  3. cancelledKeys() 已经被取消的SelectionKey集合

keys()是如何改变的呢?首先es-client在操作完成后,在应用层面跟服务器做了正常的四次挥手,关闭了socket连接,这就导致后面channel read的时候,read失败,关闭了channel。
后面在SelectorUtil.select(selector)中会做deregistered操作流程见下:
select->lockAndDoselect->doSelect->procesDeregisterQueue->implDereg
在implDereg函数中,可以看到做了如下动作:

this.fdMap.remove(var1)
this.keys.remove(var1) //移除SelectionKey,keys()为空
this.selectedKeys.remove(var1)
this.deregister(var1)

当keys()为空时,会将shutdown置为true,后面通过判断,work thread线程退出。
boss thread的线程退出机制跟work thread一致。

结语

至此,本文对3.10.5和3.2.4的分析结束,在3.10.5中需要用户显示释放资源,才能结束相关的线程池,而在3.2.4中,如果socket被关闭,可以自动关闭对应的线程,结束任务。

最后

以上就是虚幻画笔为你收集整理的netty客户端引发的线程血案(三)netty客户端引发的线程血案(三)的全部内容,希望文章能够帮你解决netty客户端引发的线程血案(三)netty客户端引发的线程血案(三)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部