我是靠谱客的博主 超帅飞鸟,这篇文章主要介绍从Mina源码看心跳超时机制,现在分享给大家,希望可以做个参考。

首先明确一点,在Mina或者其他相似的框架中,心跳机制肯定都是和计时挂钩的,只要找到计时相关的代码,其实就离真正的心跳逻辑不远了。

Mina中的心跳超时其实都是指会话空闲,即在一定时间段内未接收或未发送消息的状态,此时会触发sessionIdle,也就是IoFilter中的方法。

如何触发sessionIdle

找到计时的地方,就很好理解了:

//AbstractPollingIoProcessor.java
    private static final long SELECT_TIMEOUT = 1000L;
    
    private class Processor implements Runnable {
        /**
         * {@inheritDoc}
         */
        @Override
        public void run() {
            assert processorRef.get() == this;

            lastIdleCheckTime = System.currentTimeMillis();
            int nbTries = 10;

            for (;;) {
                try {
					//略过部分代码
                    
                    // Write the pending requests
                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);
                    
                    // Last, not least, send Idle events to the idle sessions
                    notifyIdleSessions(currentTime);
                    
                    // And manage removed sessions
                    removeSessions();                    
                   
                   //略过部分代码
                } catch (ClosedSelectorException cse) {
                    ExceptionMonitor.getInstance().exceptionCaught(cse);
                    break;
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

           //略过部分代码
        }
        
        private void notifyIdleSessions(long currentTime) throws Exception {
            // process idle sessions
            if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
                lastIdleCheckTime = currentTime;
                AbstractIoSession.notifyIdleness(allSessions(), currentTime);
            }
        }        
        
        //其他大量代码略
    }

可以看到,这个Processor(AbstractPollingIoProcessor 内部类)自己作为Runable,在线程启动后会不停地将当前时间传进notifyIdleSessions方法,这也就是时间的来源。

然后会将当前时间与上一次记录的时间进行比较,如果时间已经过去1000ms即1s,则更新记录时间,并调用notifyIdleness方法,这就意味着这里的计时单位,默认是1000ms,也就是以秒为单位。

接下来看这个notifyIdleness方法在干什么:

//AbstractIoSession.java
    public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
        while (sessions.hasNext()) {
            IoSession session = sessions.next();
            
            if (!session.getCloseFuture().isClosed()) {
                notifyIdleSession(session, currentTime);
            }
        }
    }

    public static void notifyIdleSession(IoSession session, long currentTime) {
        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));

        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));

        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));

        notifyWriteTimeout(session, currentTime);
    }

    private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, long lastIoTime) {
        if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
            session.getFilterChain().fireSessionIdle(status);
        }
    }

最终看到fireSessionIdle心里就有数了。
不同的状态有不同的触发参数,都是以IdleStatus为基准的,有三种状态,读空闲、写空闲、读写皆空闲:

public class IdleStatus {
    public static final IdleStatus READER_IDLE = new IdleStatus("reader idle");

    public static final IdleStatus WRITER_IDLE = new IdleStatus("writer idle");

    public static final IdleStatus BOTH_IDLE = new IdleStatus("both idle");

    private final String strValue;

    private IdleStatus(String strValue) {
        this.strValue = strValue;
    }

    @Override
    public String toString() {
        return strValue;
    }
}

至于最后的fireSessionIdle,顺着IoFilter就调用了sessionIdle:

//DefaultIoFilterChain.java
    @Override
    public void fireSessionIdle(IdleStatus status) {
        session.increaseIdleCount(status, System.currentTimeMillis());
        callNextSessionIdle(head, session, status);
    }

    private void callNextSessionIdle(Entry entry, IoSession session, IdleStatus status) {
        try {
            IoFilter filter = entry.getFilter();
            NextFilter nextFilter = entry.getNextFilter();
            filter.sessionIdle(nextFilter, session, status);
        } catch (Exception e) {
            fireExceptionCaught(e);
        } catch (Error e) {
            fireExceptionCaught(e);
            throw e;
        }
    }

这样自然就可以在各种实现IoFilter接口的类中触发sessionIdle,比如各种IoFilterAdapter,也就是通过下列这种方式添加的过滤器:

mConnector.getFilterChain().addLast("codec", new ProtocolCodecFilter(protocolCodecFactory));

然后…

Handler呢?
Mina中Filter和Handler是分属两个不同的接口(IoFilter与IoHandler)的,上面的filter.sessionIdle能让IoFilter触发sessionIdle,那Handler呢?

不要慌,其实在DefaultIoFilterChain中,建立session时默认都是使用的DefaultIoFilterChain,而Handler中几乎所有的方法都是由DefaultIoFilterChain经过一定的处理传递到Handler的:

//DefaultIoFilterChain.java
    public DefaultIoFilterChain(AbstractIoSession session) {
        if (session == null) {
            throw new IllegalArgumentException("session");
        }

        this.session = session;
        head = new EntryImpl(null, null, "head", new HeadFilter());
        tail = new EntryImpl(head, null, "tail", new TailFilter());
        head.nextEntry = tail;
    }

    private static class TailFilter extends IoFilterAdapter {
        @Override
        public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
            try {
                session.getHandler().sessionCreated(session);
            } finally {
                // Notify the related future.
                ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);

                if (future != null) {
                    future.setSession(session);
                }
            }
        }

        @Override
        public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
            session.getHandler().sessionOpened(session);
        }

        @Override
        public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
            AbstractIoSession s = (AbstractIoSession) session;

            try {
                s.getHandler().sessionClosed(session);
            } finally {
                try {
                    s.getWriteRequestQueue().dispose(session);
                } finally {
                    try {
                        s.getAttributeMap().dispose(session);
                    } finally {
                        try {
                            // Remove all filters.
                            session.getFilterChain().clear();
                        } finally {
                            if (s.getConfig().isUseReadOperation()) {
                                s.offerClosedReadFuture();
                            }
                        }
                    }
                }
            }
        }

        @Override
        public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
            session.getHandler().sessionIdle(session, status);
        }

        @Override
        public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
            AbstractIoSession s = (AbstractIoSession) session;

            try {
                s.getHandler().exceptionCaught(s, cause);
            } finally {
                if (s.getConfig().isUseReadOperation()) {
                    s.offerFailedReadFuture(cause);
                }
            }
        }

        @Override
        public void inputClosed(NextFilter nextFilter, IoSession session) throws Exception {
            session.getHandler().inputClosed(session);
        }

        @Override
        public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
            AbstractIoSession s = (AbstractIoSession) session;

            if (!(message instanceof IoBuffer) || !((IoBuffer) message).hasRemaining()) {
                s.increaseReadMessages(System.currentTimeMillis());
            }

            // Update the statistics
            if (session.getService() instanceof AbstractIoService) {
                ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
            }

            // Propagate the message
            try {
                session.getHandler().messageReceived(s, message);
            } finally {
                if (s.getConfig().isUseReadOperation()) {
                    s.offerReadFuture(message);
                }
            }
        }

        @Override
        public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
            ((AbstractIoSession) session).increaseWrittenMessages(writeRequest, System.currentTimeMillis());

            // Update the statistics
            if (session.getService() instanceof AbstractIoService) {
                ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
            }

            // Propagate the message
            session.getHandler().messageSent(session, writeRequest.getMessage());
        }

        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
            nextFilter.filterWrite(session, writeRequest);
        }

        @Override
        public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
            nextFilter.filterClose(session);
        }
    }

可以看到,messageReceived、sessionIdle之类的方法都是在TailFilter中使用session.getHandler().xxx方式传递给Handler的。

至此,已经知道计时在Processor中开始的,那么Processor是被谁启动起来的呢?

要弄清楚这个问题,其实就要把整个连接建立的流程摸清。

Processor的建立

比如一个SocketConnector,开始连接时是这样的:

    SocketConnector mConnector = new NioSocketConnector();
    ConnectFuture future = connector.connect(socketAddress);    

在开始看源码之前,可以先看一下NioSocketConnector这个类的继承关系,后面肯定会接连遇到图中的一些类的:
在这里插入图片描述
那么现在就从这个connect开始,这里调用的其实是AbstractIoConnector的方法:

//AbstractIoConnector.java
    @Override
    public final ConnectFuture connect(SocketAddress remoteAddress) {
        return connect(remoteAddress, null, null);
    }

    @Override
    public final ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
        //各种判断,略过

        if (getHandler() == null) {
            if (getSessionConfig().isUseReadOperation()) {
                setHandler(new IoHandler() {
                    //各种实现的方法,这里省略掉
                });
            } else {
                throw new IllegalStateException("handler is not set.");
            }
        }

        return connect0(remoteAddress, localAddress, sessionInitializer);
    }

而connect0却又回到了AbstractPollingIoConnector:

//AbstractPollingIoConnector.java
    @Override
    @SuppressWarnings("unchecked")
    protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
        H handle = null;
        boolean success = false;
        try {
            handle = newHandle(localAddress);
            if (connect(handle, remoteAddress)) {
                ConnectFuture future = new DefaultConnectFuture();
                S session = newSession(processor, handle);
                initSession(session, future, sessionInitializer);
                // Forward the remaining process to the IoProcessor.
                session.getProcessor().add(session);
                success = true;
                return future;
            }

            success = true;
        } catch (Exception e) {
            return DefaultConnectFuture.newFailedFuture(e);
        } finally {
            if (!success && handle != null) {
                try {
                    close(handle);
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);
                }
            }
        }

        ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
        connectQueue.add(request);
        startupWorker();
        wakeup();

        return request;
    }

上面有一个connect的判断,是在NioSocketConnector中实现的:

//NioSocketConnector.java
    @Override
    protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
        return handle.connect(remoteAddress);
    }

在AbstractPollingIoConnector中的connect0方法中,已经可以看到大量的逻辑细节了,如果连接成功,则会创建session、初始化session,以及使用到processor:

    S session = newSession(processor, handle);
    initSession(session, future, sessionInitializer);
    // Forward the remaining process to the IoProcessor.
    session.getProcessor().add(session);

重点看一下这几个方法的实现。

先看这个newSession,是在NioSocketConnector中实现的,当然少不了其父类的配合:

//NioSocketConnector.java
    @Override
    protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
        return new NioSocketSession(this, processor, handle);
    }
//NioSocketSession.java
    public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel channel) {
        super(processor, service, channel);
        config = new SessionConfigImpl();
        config.setAll(service.getSessionConfig());
    }
//NioSession.java
    protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
        super(service);
        this.channel = channel;
        this.processor = processor;
        filterChain = new DefaultIoFilterChain(this);
    }
//AbstractIoSession.java
    protected AbstractIoSession(IoService service) {
        this.service = service;
        this.handler = service.getHandler();

        // Initialize all the Session counters to the current time
        long currentTime = System.currentTimeMillis();
        creationTime = currentTime;
        lastThroughputCalculationTime = currentTime;
        lastReadTime = currentTime;
        lastWriteTime = currentTime;
        lastIdleTimeForBoth = currentTime;
        lastIdleTimeForRead = currentTime;
        lastIdleTimeForWrite = currentTime;

        // TODO add documentation
        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);

        // Set a new ID for this session
        sessionId = idGenerator.incrementAndGet();
    }

几个关于session的类关系都是继承:
session的继承关系
这几个类把session的各种属性和配置安排得明明白白的,没什么好说。
其中可以看到在NioSession的构造方法中,为session创建了默认的filterChain,这也与之前的TailFilter逻辑相呼应了。
另外可以看到AbstractIoSession的构造方法中,对各种时间的记录进行了初始化。

接下来看initSession,这个方法是在AbstractIoService中实现的:

//AbstractIoService.java
    protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
        // Update lastIoTime if needed.
        if (stats.getLastReadTime() == 0) {
            stats.setLastReadTime(getActivationTime());
        }

        if (stats.getLastWriteTime() == 0) {
            stats.setLastWriteTime(getActivationTime());
        }

        try {
            ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
                    .getAttributeMap(session));
        } catch (IoSessionInitializationException e) {
            throw e;
        } catch (Exception e) {
            throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
        }

        try {
            ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
                    .getWriteRequestQueue(session));
        } catch (IoSessionInitializationException e) {
            throw e;
        } catch (Exception e) {
            throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
        }

        if ((future != null) && (future instanceof ConnectFuture)) {
            // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
            session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
        }

        if (sessionInitializer != null) {
            sessionInitializer.initializeSession(session, future);
        }

        finishSessionInitialization0(session, future);
    }


好吧,其实这个方法没做什么事,只是把session存储的一些Attribute进行了初始化。

下一个是session.getProcessor().add(session),首先我们要注意到,在创建session时,已经把processor传进来了,那么sessoin调用的getProcessor会和传进来的是同一个么?

    S session = newSession(processor, handle);
    initSession(session, future, sessionInitializer);
    // Forward the remaining process to the IoProcessor.
    session.getProcessor().add(session);

getProcessor()是在NioSession.java中实现的:

//NioSession.java
    protected final IoProcessor<NioSession> processor;
    
    protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
        super(service);
        this.channel = channel;
        this.processor = processor;
        filterChain = new DefaultIoFilterChain(this);
    }
    
    @Override
    public IoProcessor<NioSession> getProcessor() {
        return processor;
    }

所以答案是肯定是同一个。

那么这个processor在哪里创建的?或者说在AbstractPollingIoConnector出现的processor是在哪里创建的?

其实在NioSocketConnector中就能发现苗头:

//NioSocketConnector.java
    public NioSocketConnector() {
        super(new DefaultSocketSessionConfig(), NioProcessor.class);
        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
    }

super方法:

//AbstractPollingIoConnector.java
    protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
        this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
    }

就是这个了—SimpleIoProcessorPool。

    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
        this(processorType, null, DEFAULT_SIZE, null);
    }

    @SuppressWarnings("unchecked")
    public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size, 
            SelectorProvider selectorProvider) {
        if (processorType == null) {
            throw new IllegalArgumentException("processorType");
        }

        if (size <= 0) {
            throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
        }

        // Create the executor if none is provided
        createdExecutor = executor == null;

        if (createdExecutor) {
            this.executor = Executors.newCachedThreadPool();
            // Set a default reject handler
            ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        } else {
            this.executor = executor;
        }

        pool = new IoProcessor[size];

        boolean success = false;
        Constructor<? extends IoProcessor<S>> processorConstructor = null;
        boolean usesExecutorArg = true;

        try {
            // We create at least one processor
            try {
                try {
                    processorConstructor = processorType.getConstructor(ExecutorService.class);
                    pool[0] = processorConstructor.newInstance(this.executor);
                } catch (NoSuchMethodException e1) {
                    // To the next step...
                    try {
                        if(selectorProvider==null) {
                            processorConstructor = processorType.getConstructor(Executor.class);
                            pool[0] = processorConstructor.newInstance(this.executor);
                        } else {
                            processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class);
                            pool[0] = processorConstructor.newInstance(this.executor,selectorProvider);
                        }
                    } catch (NoSuchMethodException e2) {
                        // To the next step...
                        try {
                            processorConstructor = processorType.getConstructor();
                            usesExecutorArg = false;
                            pool[0] = processorConstructor.newInstance();
                        } catch (NoSuchMethodException e3) {
                            // To the next step...
                        }
                    }
                }
            } catch (RuntimeException re) {
                LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
                throw re;
            } catch (Exception e) {
                String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
                LOGGER.error(msg, e);
                throw new RuntimeIoException(msg, e);
            }

            if (processorConstructor == null) {
                // Raise an exception if no proper constructor is found.
                String msg = String.valueOf(processorType) + " must have a public constructor with one "
                        + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
                        + Executor.class.getSimpleName() + " parameter or a public default constructor.";
                LOGGER.error(msg);
                throw new IllegalArgumentException(msg);
            }

            // Constructor found now use it for all subsequent instantiations
            for (int i = 1; i < pool.length; i++) {
                try {
                    if (usesExecutorArg) {
                        if(selectorProvider==null) {
                            pool[i] = processorConstructor.newInstance(this.executor);
                        } else {
                            pool[i] = processorConstructor.newInstance(this.executor, selectorProvider);
                        }
                    } else {
                        pool[i] = processorConstructor.newInstance();
                    }
                } catch (Exception e) {
                    // Won't happen because it has been done previously
                }
            }

            success = true;
        } finally {
            if (!success) {
                dispose();
            }
        }
    }

中间这一大段是在利用反射,创建一个NioProcessor,因为从子类传进来的就是NioProcessor.class。

所以这个processor真实身份是一个NioProcessor。

    session.getProcessor().add(session);

而NioProcessor并没有实现add方法,是由父类AbstractPollingIoProcessor来实现的:

    @Override
    public final void add(S session) {
        if (disposed || disposing) {
            throw new IllegalStateException("Already disposed.");
        }

        // Adds the session to the newSession queue and starts the worker
        newSessions.add(session);
        startupProcessor();
    }

    private void startupProcessor() {
        Processor processor = processorRef.get();

        if (processor == null) {
            processor = new Processor();

            if (processorRef.compareAndSet(null, processor)) {
                executor.execute(new NamePreservingRunnable(processor, threadName));
            }
        }

        // Just stop the select() and start it again, so that the processor
        // can be activated immediately.
        wakeup();
    }

最后由startupProcessor将其启动起来了。
用一句话概括就是:在创建SocketConnector的同时创建了Processor,并在Connector连接成功后,启动了Processor。

这个NioProcessor的继承关系是这样的:
NioProcessor继承

来一张整体时序图,凑合看:
Connector触发

以上都是作为客户端看到的,服务端与其大同小异。

//AbstractPollingIoAcceptor.java
    private class Acceptor implements Runnable {
        /**
         * {@inheritDoc}
         */
        @Override
        public void run() {
            assert acceptorRef.get() == this;

            int nHandles = 0;

            // Release the lock
            lock.release();

            while (selectable) {
                try {
                    nHandles += registerHandles();

                    int selected = select();

                    if (nHandles == 0) {
                        acceptorRef.set(null);

                        if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                            assert acceptorRef.get() != this;
                            break;
                        }

                        if (!acceptorRef.compareAndSet(null, this)) {
                            assert acceptorRef.get() != this;
                            break;
                        }

                        assert acceptorRef.get() == this;
                    }

                    if (selected > 0) {
                        // We have some connection request, let's process
                        // them here.
                        processHandles(selectedHandles());
                    }

                    // check to see if any cancellation request has been made.
                    nHandles -= unregisterHandles();
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    ExceptionMonitor.getInstance().exceptionCaught(cse);
                    break;
                } catch (Exception e) {
                    ExceptionMonitor.getInstance().exceptionCaught(e);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            // Cleanup all the processors, and shutdown the acceptor.
            if (selectable && isDisposing()) {
                selectable = false;
                try {
                    if (createdProcessor) {
                        processor.dispose();
                    }
                } finally {
                    try {
                        synchronized (disposalLock) {
                            if (isDisposing()) {
                                destroy();
                            }
                        }
                    } catch (Exception e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    } finally {
                        disposalFuture.setDone();
                    }
                }
            }
        }
		
        @SuppressWarnings("unchecked")
        private void processHandles(Iterator<H> handles) throws Exception {
            while (handles.hasNext()) {
                H handle = handles.next();
                handles.remove();

                // Associates a new created connection to a processor,
                // and get back a session
                S session = accept(processor, handle);

                if (session == null) {
                    continue;
                }

                initSession(session, null, null);

                // add the session to the SocketIoProcessor
                session.getProcessor().add(session);
            }
        }		
		//其他大量方法
    }

可以看到,这里的Acceptor是类似于客户端Processor的存在,不同的是,作为服务端,Acceptor负责接收连接并建立session;最终会为每条连接的session调用到各自的processor,也就是进入之前客户端的逻辑。

//NioSocketAcceptor.java
    public NioSocketAcceptor() {
        super(new DefaultSocketSessionConfig(), NioProcessor.class);
        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
    }

服务端SocketAcceptor构造方法几乎与客户端一样,因为也需要NioProcessor维护session。
看时序图:
Acceptor触发

从图中也可以看出,服务端的不同主要在于Acceptor这一接收连接的角色,其他后续和客户端基本相同。

以上。

最后

以上就是超帅飞鸟最近收集整理的关于从Mina源码看心跳超时机制的全部内容,更多相关从Mina源码看心跳超时机制内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(66)

评论列表共有 0 条评论

立即
投稿
返回
顶部