我是靠谱客的博主 秀丽小懒虫,最近开发中收集的这篇文章主要介绍【resin】 resin3 线程池与IO模型(2),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

【resin】 resin3 线程池与IO模型(2)

上篇文章主要介绍了resin3的线程池逻辑,下面具体分析下resin3的网络IO模型

resin3开源版使用的IO模型是BIO模型
相关的核心类是Port和TcpConnection

下面贴出源码:为说明核心逻辑,只展示核心的代码
本文所分析为个人理解,欢迎指出不足之处或分享你的观点

下面看看 Port 类:

  /**
   * The port thread is responsible for creating new connections.
   *
   * 负责创建 tcp connection task的主线程
   */
    public   void   run()
  {
      while   (!   _lifecycle .isDestroyed()) {
        boolean   isStart;
        try   {       
          // Thread.sleep(10);
          synchronized   (   this ) {
          // need delay to avoid spawning too many threads over a short time,
          // when the load doesn't justify it
          if   ( _idleThreadCount   > 0) {
          wait(1);
        }
       
         /**
         * port 线程创建 tcp connection task的条件有三个:
         * 1.lifecycle.isActive() 为true   这个应该是只要resin服务没有停止就肯定是true了
         * 2.isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin   第二个条件是这个 isStart 为true
         *  _acceptThreadMin 是用户设置的最小创建来accept socket的线程数
         *  _idleThreadCount 这个可以理解为当前block在 accept的线程数,相当于等待,就是idle了
         *  _startThreadCount 这个值的作用,我的理解是主要用来控制创建 tcp accept的task数量, 防止无限制的创建
         *
         * 3._connectionMax <= _connectionCount 这个条件基本上可以忽略了,resin默认的最大链接数是 1024*1024 这个够用了
         *
         */
          isStart =   _startThreadCount   +   _idleThreadCount   <   _acceptThreadMin ;

            if   (   _connectionMax   <=   _connectionCount )
            isStart =   false ;
          if   (!   _lifecycle .isActive())
          isStart =   false ;

          /**
         * 如果不需要创建新的 connection 则port线程就等待 1 分钟
         */
            if   (! isStart) {
            Thread. interrupted();
            wait(60000);
          }

            if   (isStart) {
              _connectionCount ++;
              _startThreadCount ++;
          }
        }

          if   (isStart &&   _lifecycle .isActive()) {

          TcpConnection conn =   _freeConn .allocate();
            if   (conn ==   null ) {
            conn =   new   TcpConnection(   this ,   _serverSocket .createSocket());
            conn.setRequest(   _protocol .createRequest(conn));
          }

        conn.start();

          /**
         * 创建完 tcp connection task 是 Runnable 的
         * 丢进resin的线程池里执行
         */
          ThreadPool. getThreadPool().schedule (conn);
        }
      }   catch   (Throwable e) {
        e.printStackTrace();
      }
    }
  }

// port 线程在resin IO处理逻辑中起着关键作用,而且是单线程执行
// port 线程的逻辑很简单,就是不断轮询,只要满足条件就创建 TcpConnection task 提交线程池处理
// 不满足条件就 wait 60秒
//

下面看看 TcpConnection 
tcpconnection task的run方法很长,我直接省略了很多代码,保留核心逻辑
   /**
   * Runs as a task.
   */
    public   void   run ()
  {
    Port port = getPort();

      boolean   isKeepalive =   _isKeepalive ;
      _isKeepalive   =   false ;

      boolean   isResume =   _isResume ;
      _isResume   =   false ;
      _isWake   =   false ;

      boolean   isFirst = ! isKeepalive;

    ServerRequest request = getRequest();
      boolean   isWaitForRead = request.isWaitForRead();

      if   (isKeepalive)
      port.keepaliveEnd(   this );

        /**
       * tcp connection task 的主要任务就是 accept 等待新的的链接
       */
        while   (!   _isDead ) {
          if   (isKeepalive) {
        }
          /**
         * accept 在等待
         */
          else   if   (port.accept(   this , isFirst))
            _connectionStartTime   = Alarm. getCurrentTime();
          else
            return ;

        isFirst =   false ;
        ConnectionController controller =   null ;

          try   {
            do   {
            controller =   null ;

            isKeepalive =   false ;

              if   (! port.isClosed()
                && (! isWaitForRead || getReadStream().waitForRead())) {

                     /**
                   * accept 之后,这里处理 http request了
                   */
                synchronized   (   _requestLock ) {
                isKeepalive = request.handleRequest();
              }

              controller = getController();
                if   (controller !=   null   && controller.isActive()) {
                isKeepalive =   true ;
                  return ;
              }
            }
          }   while   (isKeepalive && waitForKeepalive() && ! port.isClosed());

            if   (isKeepalive) {
              return ;
          }
            else   {
            getRequest().protocolCloseEvent();
          }
        }
          catch   (ClientDisconnectException e) {
        }
          catch   (IOException e) {
        }
          finally   {
            if   (! isKeepalive)
            closeImpl();
        }
      }
    }   catch   (Throwable e) {
    }   finally   {
        /**
       * 任务执行完了,这个 tcp task就结束了
       */
        if   (isKeepalive) {
        keepalive();
      }
        else
        free();
    }
  }
// 可以看到 tcp connection task如果是keepalive的,会一直轮询,等待处理http请求
// 整个task 执行流程是:
// 首先执行Port的accept方法,等待tcp链接,port.accept(   this , isFirst)
// 创建完链接后,就会去处理这个http请求  request.handleRequest();
// 整个处理流程包含了 tcp链接创建和http请求处理

去看下Port的accept方法
   /**
   * Accepts a new connection.
   */
    public   boolean   accept (TcpConnection conn,   boolean   isFirst)
  {
      try   {
        synchronized   (   this ) {
          _idleThreadCount ++;

          /**
         * _startThreadCount 在这里进行了   --   自减操作
         */
          if   (isFirst) {
          _startThreadCount--;
        }

          if   ( _acceptThreadMax   <   _idleThreadCount ) {
            return   false ;
        }
      }

        while   ( _lifecycle .isActive()) {
        QSocket socket = conn.startSocket();
       
          /**
         * 这里将会调用底层 jdk的accept 方法
         */
          if   ( _serverSocket .accept(socket)) {
          conn.initSocket();

          if   ( _throttle .accept(socket))
            return   true ;
          else
          socket.close();
        }
          else   {
            if   (   _acceptThreadMax   <   _idleThreadCount ) {
              return   false ;
          }
        }
      }
    }   catch   (Throwable e) {
    }   finally   {
        synchronized   (   this ) {
          _idleThreadCount --;

          if   ( _idleThreadCount   +   _startThreadCount   <   _acceptThreadMin ) {
          notify();
        }
      }
    }
      return   false ;
  }
// 可以看到 Port类的 accept方法是同步方法块
// 整个方法的逻辑也简单
//

我们的一个线上服务用的是resin3的开源版,一直都运行良好
但是每当流量暴涨时,resin就是出现假死的情况,进行多番排查,基本定位问题了
下面给出分析:

主要是 Port 线程的逻辑 和 resin参数配置的问题
在 Port 线程逻辑里,创建 tcp connection task有3个条件 :起关键影响的是这个
        /**
         * 着重讲讲这个条件 isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;
         *
         *  _acceptThreadMin resin默认值为5
         *  _acceptThreadMax resin默认值为10
         * 
         *  当resin服务的外部流量稳定时,即平滑增长状态,则 port线程运行的很良好
         *  但是当出现流量暴涨时,而且resin的线程池已经耗尽,已经达到 threadMax,此时会出现问题
         *  _idleThreadCount 一直为0,threadCount 也没有呈现增长的状态,外部链接出现了响应超时情况,感觉是resin服务“挂了”
         * 
         *  推测这个时候的状态是这样的:
         *  1.虽然 _idleThreadCount 为0,但是 _startThreadCount 增长到 _acceptThreadMin了
         *        即 isStart = _acceptThreadMin + 0 < _acceptThreadMin ; isStart 为 false
         *        _startThreadCount 要减少的情况就是 新创建的 tcp connection task开始执行accept了
         * 
         *  2.此时已经有 最多threadMax 的线程在执行,时间片分配得花点时间了
         *        考虑极端情况,新创建的 _startThreadCount(值已经等于_acceptThreadMin)个线程都在等待时间片分配
         *        另外, tcp connection task要开始执行accept,还得要先获得 port 线程对象的锁
         *        synchronized (this) {
         *              省略代码。。。
         *               if (isFirst) {
         *                    _startThreadCount   -- ;
         *              }
         *              省略代码。。。
         *        }
         *
         *  3.既然 isStart 为 false 则 port 线程就会进行等待 wait(60000) 足足等待一分钟,而且 port线程只有一个
         *        如果后续 isStart 还不为 true ,则就还会继续等待 60 秒,这将会是灾难了
         * 
         *  4.结合了上述的情况,外部的链接就没有办法得到响应了,只能等当前的请求都处理完,才能进行处理后续的请求了
         * 
         *  PS:我曾经试着去掉 _startThreadCount 这个条件进行了测试,程序没跑多久就出现 OOM等的问题了
         *  明显,这个_startThreadCount 是限制创建 tcp task的数量,因为 resin的线程池里的task队列是无界的,用的是 ArrayList   <Runnable>
         * 
         * 
         *  要避免这样的情况:就把 _acceptThreadMin _acceptThreadMax 分别设置的大些
         *  防止 port线程因为 isStart条件的问题导致无法分配 tcp accept的线程,从而导致无法响应更多的外部请求
         * 
         *  开源版resin是BIO线程池模型,是阻塞的,即创建一定量的 accept thread,等待请求并处理
         *  resin的并发能力取决于 accept thread的线程数 与 最大 threadMax
         * 
         *  专业版resin貌似是使用 nio,通过select机制,效率吞吐量都会更好
         * 
         */


最后

以上就是秀丽小懒虫为你收集整理的【resin】 resin3 线程池与IO模型(2)的全部内容,希望文章能够帮你解决【resin】 resin3 线程池与IO模型(2)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部