概述
【resin】 resin3 线程池与IO模型(2)
上篇文章主要介绍了resin3的线程池逻辑,下面具体分析下resin3的网络IO模型
resin3开源版使用的IO模型是BIO模型
相关的核心类是Port和TcpConnection
下面贴出源码:为说明核心逻辑,只展示核心的代码
本文所分析为个人理解,欢迎指出不足之处或分享你的观点
下面看看 Port 类:
/**
* The port thread is responsible for creating new connections.
*
* 负责创建 tcp connection task的主线程
*/
public
void
run()
{
while
(!
_lifecycle
.isDestroyed()) {
boolean
isStart;
try
{
// Thread.sleep(10);
synchronized
(
this
) {
// need delay to avoid spawning too many threads over a short time,
// when the load doesn't justify it
if
(
_idleThreadCount
> 0) {
wait(1);
}
/**
* port 线程创建 tcp connection task的条件有三个:
* 1.lifecycle.isActive() 为true 这个应该是只要resin服务没有停止就肯定是true了
* 2.isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin 第二个条件是这个 isStart 为true
* _acceptThreadMin 是用户设置的最小创建来accept socket的线程数
* _idleThreadCount 这个可以理解为当前block在 accept的线程数,相当于等待,就是idle了
* _startThreadCount 这个值的作用,我的理解是主要用来控制创建 tcp accept的task数量, 防止无限制的创建
*
* 3._connectionMax <= _connectionCount 这个条件基本上可以忽略了,resin默认的最大链接数是 1024*1024 这个够用了
*
*/
isStart =
_startThreadCount
+
_idleThreadCount
<
_acceptThreadMin
;
if
(
_connectionMax
<=
_connectionCount
)
isStart =
false
;
if
(!
_lifecycle
.isActive())
isStart =
false
;
/**
* 如果不需要创建新的 connection 则port线程就等待 1 分钟
*/
if
(! isStart) {
Thread. interrupted();
wait(60000);
}
if
(isStart) {
_connectionCount
++;
_startThreadCount
++;
}
}
if
(isStart &&
_lifecycle
.isActive()) {
TcpConnection conn =
_freeConn
.allocate();
if
(conn ==
null
) {
conn =
new
TcpConnection(
this
,
_serverSocket
.createSocket());
conn.setRequest(
_protocol
.createRequest(conn));
}
conn.start();
/**
* 创建完 tcp connection task 是 Runnable 的
* 丢进resin的线程池里执行
*/
ThreadPool. getThreadPool().schedule (conn);
}
}
catch
(Throwable e) {
e.printStackTrace();
}
}
}
// port 线程在resin IO处理逻辑中起着关键作用,而且是单线程执行
// port 线程的逻辑很简单,就是不断轮询,只要满足条件就创建 TcpConnection task 提交线程池处理
// 不满足条件就 wait 60秒
//
下面看看 TcpConnection
tcpconnection task的run方法很长,我直接省略了很多代码,保留核心逻辑
/**
* Runs as a task.
*/
public
void
run ()
{
Port port = getPort();
boolean
isKeepalive =
_isKeepalive
;
_isKeepalive
=
false
;
boolean
isResume =
_isResume
;
_isResume
=
false
;
_isWake
=
false
;
boolean
isFirst = ! isKeepalive;
ServerRequest request = getRequest();
boolean
isWaitForRead = request.isWaitForRead();
if
(isKeepalive)
port.keepaliveEnd(
this
);
/**
* tcp connection task 的主要任务就是 accept 等待新的的链接
*/
while
(!
_isDead
) {
if
(isKeepalive) {
}
/**
* accept 在等待
*/
else
if
(port.accept(
this
, isFirst))
_connectionStartTime
= Alarm. getCurrentTime();
else
return
;
isFirst =
false
;
ConnectionController controller =
null
;
try
{
do
{
controller =
null
;
isKeepalive =
false
;
if
(! port.isClosed()
&& (! isWaitForRead || getReadStream().waitForRead())) {
/**
* accept 之后,这里处理 http request了
*/
synchronized
(
_requestLock
) {
isKeepalive = request.handleRequest();
}
controller = getController();
if
(controller !=
null
&& controller.isActive()) {
isKeepalive =
true
;
return
;
}
}
}
while
(isKeepalive && waitForKeepalive() && ! port.isClosed());
if
(isKeepalive) {
return
;
}
else
{
getRequest().protocolCloseEvent();
}
}
catch
(ClientDisconnectException e) {
}
catch
(IOException e) {
}
finally
{
if
(! isKeepalive)
closeImpl();
}
}
}
catch
(Throwable e) {
}
finally
{
/**
* 任务执行完了,这个 tcp task就结束了
*/
if
(isKeepalive) {
keepalive();
}
else
free();
}
}
// 可以看到 tcp connection task如果是keepalive的,会一直轮询,等待处理http请求
// 整个task 执行流程是:
// 首先执行Port的accept方法,等待tcp链接,port.accept(
this
, isFirst)
// 创建完链接后,就会去处理这个http请求
request.handleRequest();
// 整个处理流程包含了 tcp链接创建和http请求处理
去看下Port的accept方法
/**
* Accepts a new connection.
*/
public
boolean
accept (TcpConnection conn,
boolean
isFirst)
{
try
{
synchronized
(
this
) {
_idleThreadCount
++;
/**
* _startThreadCount 在这里进行了
--
自减操作
*/
if
(isFirst) {
_startThreadCount--;
}
if
(
_acceptThreadMax
<
_idleThreadCount
) {
return
false
;
}
}
while
(
_lifecycle
.isActive()) {
QSocket socket = conn.startSocket();
/**
* 这里将会调用底层 jdk的accept 方法
*/
if
(
_serverSocket
.accept(socket)) {
conn.initSocket();
if
(
_throttle
.accept(socket))
return
true
;
else
socket.close();
}
else
{
if
(
_acceptThreadMax
<
_idleThreadCount
) {
return
false
;
}
}
}
}
catch
(Throwable e) {
}
finally
{
synchronized
(
this
) {
_idleThreadCount
--;
if
(
_idleThreadCount
+
_startThreadCount
<
_acceptThreadMin
) {
notify();
}
}
}
return
false
;
}
// 可以看到 Port类的 accept方法是同步方法块
// 整个方法的逻辑也简单
//
我们的一个线上服务用的是resin3的开源版,一直都运行良好
但是每当流量暴涨时,resin就是出现假死的情况,进行多番排查,基本定位问题了
下面给出分析:
主要是 Port 线程的逻辑 和 resin参数配置的问题
在 Port 线程逻辑里,创建 tcp connection task有3个条件 :起关键影响的是这个
/**
* 着重讲讲这个条件 isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;
*
* _acceptThreadMin resin默认值为5
* _acceptThreadMax resin默认值为10
*
* 当resin服务的外部流量稳定时,即平滑增长状态,则 port线程运行的很良好
* 但是当出现流量暴涨时,而且resin的线程池已经耗尽,已经达到 threadMax,此时会出现问题
* _idleThreadCount 一直为0,threadCount 也没有呈现增长的状态,外部链接出现了响应超时情况,感觉是resin服务“挂了”
*
* 推测这个时候的状态是这样的:
* 1.虽然 _idleThreadCount 为0,但是 _startThreadCount 增长到 _acceptThreadMin了
* 即 isStart = _acceptThreadMin + 0 < _acceptThreadMin ; isStart 为 false
* _startThreadCount 要减少的情况就是 新创建的 tcp connection task开始执行accept了
*
* 2.此时已经有 最多threadMax 的线程在执行,时间片分配得花点时间了
* 考虑极端情况,新创建的 _startThreadCount(值已经等于_acceptThreadMin)个线程都在等待时间片分配
* 另外, tcp connection task要开始执行accept,还得要先获得 port 线程对象的锁
* synchronized (this) {
* 省略代码。。。
* if (isFirst) {
* _startThreadCount
--
;
* }
* 省略代码。。。
* }
*
* 3.既然 isStart 为 false 则 port 线程就会进行等待 wait(60000) 足足等待一分钟,而且 port线程只有一个
* 如果后续 isStart 还不为 true ,则就还会继续等待 60 秒,这将会是灾难了
*
* 4.结合了上述的情况,外部的链接就没有办法得到响应了,只能等当前的请求都处理完,才能进行处理后续的请求了
*
* PS:我曾经试着去掉 _startThreadCount 这个条件进行了测试,程序没跑多久就出现 OOM等的问题了
* 明显,这个_startThreadCount 是限制创建 tcp task的数量,因为 resin的线程池里的task队列是无界的,用的是 ArrayList
<Runnable>
*
*
* 要避免这样的情况:就把 _acceptThreadMin _acceptThreadMax 分别设置的大些
* 防止 port线程因为 isStart条件的问题导致无法分配 tcp accept的线程,从而导致无法响应更多的外部请求
*
* 开源版resin是BIO线程池模型,是阻塞的,即创建一定量的 accept thread,等待请求并处理
* resin的并发能力取决于 accept thread的线程数 与 最大 threadMax
*
* 专业版resin貌似是使用 nio,通过select机制,效率吞吐量都会更好
*
*/
最后
以上就是秀丽小懒虫为你收集整理的【resin】 resin3 线程池与IO模型(2)的全部内容,希望文章能够帮你解决【resin】 resin3 线程池与IO模型(2)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复