首先明确一点,在Mina或者其他相似的框架中,心跳机制肯定都是和计时挂钩的,只要找到计时相关的代码,其实就离真正的心跳逻辑不远了。
Mina中的心跳超时其实都是指会话空闲,即在一定时间段内未接收或未发送消息的状态,此时会触发sessionIdle,也就是IoFilter中的方法。
如何触发sessionIdle
找到计时的地方,就很好理解了:
//AbstractPollingIoProcessor.java
private static final long SELECT_TIMEOUT = 1000L;
private class Processor implements Runnable {
/**
* {@inheritDoc}
*/
@Override
public void run() {
assert processorRef.get() == this;
lastIdleCheckTime = System.currentTimeMillis();
int nbTries = 10;
for (;;) {
try {
//略过部分代码
// Write the pending requests
long currentTime = System.currentTimeMillis();
flush(currentTime);
// Last, not least, send Idle events to the idle sessions
notifyIdleSessions(currentTime);
// And manage removed sessions
removeSessions();
//略过部分代码
} catch (ClosedSelectorException cse) {
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
//略过部分代码
}
private void notifyIdleSessions(long currentTime) throws Exception {
// process idle sessions
if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
lastIdleCheckTime = currentTime;
AbstractIoSession.notifyIdleness(allSessions(), currentTime);
}
}
//其他大量代码略
}
可以看到,这个Processor(AbstractPollingIoProcessor 内部类)自己作为Runable,在线程启动后会不停地将当前时间传进notifyIdleSessions方法,这也就是时间的来源。
然后会将当前时间与上一次记录的时间进行比较,如果时间已经过去1000ms即1s,则更新记录时间,并调用notifyIdleness方法,这就意味着这里的计时单位,默认是1000ms,也就是以秒为单位。
接下来看这个notifyIdleness方法在干什么:
//AbstractIoSession.java
public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
while (sessions.hasNext()) {
IoSession session = sessions.next();
if (!session.getCloseFuture().isClosed()) {
notifyIdleSession(session, currentTime);
}
}
}
public static void notifyIdleSession(IoSession session, long currentTime) {
notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
notifyWriteTimeout(session, currentTime);
}
private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, long lastIoTime) {
if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
session.getFilterChain().fireSessionIdle(status);
}
}
最终看到fireSessionIdle心里就有数了。
不同的状态有不同的触发参数,都是以IdleStatus为基准的,有三种状态,读空闲、写空闲、读写皆空闲:
public class IdleStatus {
public static final IdleStatus READER_IDLE = new IdleStatus("reader idle");
public static final IdleStatus WRITER_IDLE = new IdleStatus("writer idle");
public static final IdleStatus BOTH_IDLE = new IdleStatus("both idle");
private final String strValue;
private IdleStatus(String strValue) {
this.strValue = strValue;
}
@Override
public String toString() {
return strValue;
}
}
至于最后的fireSessionIdle,顺着IoFilter就调用了sessionIdle:
//DefaultIoFilterChain.java
@Override
public void fireSessionIdle(IdleStatus status) {
session.increaseIdleCount(status, System.currentTimeMillis());
callNextSessionIdle(head, session, status);
}
private void callNextSessionIdle(Entry entry, IoSession session, IdleStatus status) {
try {
IoFilter filter = entry.getFilter();
NextFilter nextFilter = entry.getNextFilter();
filter.sessionIdle(nextFilter, session, status);
} catch (Exception e) {
fireExceptionCaught(e);
} catch (Error e) {
fireExceptionCaught(e);
throw e;
}
}
这样自然就可以在各种实现IoFilter接口的类中触发sessionIdle,比如各种IoFilterAdapter,也就是通过下列这种方式添加的过滤器:
mConnector.getFilterChain().addLast("codec", new ProtocolCodecFilter(protocolCodecFactory));
然后…
Handler呢?
Mina中Filter和Handler是分属两个不同的接口(IoFilter与IoHandler)的,上面的filter.sessionIdle能让IoFilter触发sessionIdle,那Handler呢?
不要慌,其实在DefaultIoFilterChain中,建立session时默认都是使用的DefaultIoFilterChain,而Handler中几乎所有的方法都是由DefaultIoFilterChain经过一定的处理传递到Handler的:
//DefaultIoFilterChain.java
public DefaultIoFilterChain(AbstractIoSession session) {
if (session == null) {
throw new IllegalArgumentException("session");
}
this.session = session;
head = new EntryImpl(null, null, "head", new HeadFilter());
tail = new EntryImpl(head, null, "tail", new TailFilter());
head.nextEntry = tail;
}
private static class TailFilter extends IoFilterAdapter {
@Override
public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
try {
session.getHandler().sessionCreated(session);
} finally {
// Notify the related future.
ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);
if (future != null) {
future.setSession(session);
}
}
}
@Override
public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
session.getHandler().sessionOpened(session);
}
@Override
public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
try {
s.getHandler().sessionClosed(session);
} finally {
try {
s.getWriteRequestQueue().dispose(session);
} finally {
try {
s.getAttributeMap().dispose(session);
} finally {
try {
// Remove all filters.
session.getFilterChain().clear();
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerClosedReadFuture();
}
}
}
}
}
}
@Override
public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception {
session.getHandler().sessionIdle(session, status);
}
@Override
public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
try {
s.getHandler().exceptionCaught(s, cause);
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerFailedReadFuture(cause);
}
}
}
@Override
public void inputClosed(NextFilter nextFilter, IoSession session) throws Exception {
session.getHandler().inputClosed(session);
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
if (!(message instanceof IoBuffer) || !((IoBuffer) message).hasRemaining()) {
s.increaseReadMessages(System.currentTimeMillis());
}
// Update the statistics
if (session.getService() instanceof AbstractIoService) {
((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
}
// Propagate the message
try {
session.getHandler().messageReceived(s, message);
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerReadFuture(message);
}
}
}
@Override
public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
((AbstractIoSession) session).increaseWrittenMessages(writeRequest, System.currentTimeMillis());
// Update the statistics
if (session.getService() instanceof AbstractIoService) {
((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
}
// Propagate the message
session.getHandler().messageSent(session, writeRequest.getMessage());
}
@Override
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
nextFilter.filterWrite(session, writeRequest);
}
@Override
public void filterClose(NextFilter nextFilter, IoSession session) throws Exception {
nextFilter.filterClose(session);
}
}
可以看到,messageReceived、sessionIdle之类的方法都是在TailFilter中使用session.getHandler().xxx方式传递给Handler的。
至此,已经知道计时在Processor中开始的,那么Processor是被谁启动起来的呢?
要弄清楚这个问题,其实就要把整个连接建立的流程摸清。
Processor的建立
比如一个SocketConnector,开始连接时是这样的:
SocketConnector mConnector = new NioSocketConnector();
ConnectFuture future = connector.connect(socketAddress);
在开始看源码之前,可以先看一下NioSocketConnector这个类的继承关系,后面肯定会接连遇到图中的一些类的:
那么现在就从这个connect开始,这里调用的其实是AbstractIoConnector的方法:
//AbstractIoConnector.java
@Override
public final ConnectFuture connect(SocketAddress remoteAddress) {
return connect(remoteAddress, null, null);
}
@Override
public final ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
//各种判断,略过
if (getHandler() == null) {
if (getSessionConfig().isUseReadOperation()) {
setHandler(new IoHandler() {
//各种实现的方法,这里省略掉
});
} else {
throw new IllegalStateException("handler is not set.");
}
}
return connect0(remoteAddress, localAddress, sessionInitializer);
}
而connect0却又回到了AbstractPollingIoConnector:
//AbstractPollingIoConnector.java
@Override
@SuppressWarnings("unchecked")
protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
H handle = null;
boolean success = false;
try {
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture();
S session = newSession(processor, handle);
initSession(session, future, sessionInitializer);
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session);
success = true;
return future;
}
success = true;
} catch (Exception e) {
return DefaultConnectFuture.newFailedFuture(e);
} finally {
if (!success && handle != null) {
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
connectQueue.add(request);
startupWorker();
wakeup();
return request;
}
上面有一个connect的判断,是在NioSocketConnector中实现的:
//NioSocketConnector.java
@Override
protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
return handle.connect(remoteAddress);
}
在AbstractPollingIoConnector中的connect0方法中,已经可以看到大量的逻辑细节了,如果连接成功,则会创建session、初始化session,以及使用到processor:
S session = newSession(processor, handle);
initSession(session, future, sessionInitializer);
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session);
重点看一下这几个方法的实现。
先看这个newSession,是在NioSocketConnector中实现的,当然少不了其父类的配合:
//NioSocketConnector.java
@Override
protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
return new NioSocketSession(this, processor, handle);
}
//NioSocketSession.java
public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel channel) {
super(processor, service, channel);
config = new SessionConfigImpl();
config.setAll(service.getSessionConfig());
}
//NioSession.java
protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
super(service);
this.channel = channel;
this.processor = processor;
filterChain = new DefaultIoFilterChain(this);
}
//AbstractIoSession.java
protected AbstractIoSession(IoService service) {
this.service = service;
this.handler = service.getHandler();
// Initialize all the Session counters to the current time
long currentTime = System.currentTimeMillis();
creationTime = currentTime;
lastThroughputCalculationTime = currentTime;
lastReadTime = currentTime;
lastWriteTime = currentTime;
lastIdleTimeForBoth = currentTime;
lastIdleTimeForRead = currentTime;
lastIdleTimeForWrite = currentTime;
// TODO add documentation
closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
// Set a new ID for this session
sessionId = idGenerator.incrementAndGet();
}
几个关于session的类关系都是继承:
这几个类把session的各种属性和配置安排得明明白白的,没什么好说。
其中可以看到在NioSession的构造方法中,为session创建了默认的filterChain,这也与之前的TailFilter逻辑相呼应了。
另外可以看到AbstractIoSession的构造方法中,对各种时间的记录进行了初始化。
接下来看initSession,这个方法是在AbstractIoService中实现的:
//AbstractIoService.java
protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
// Update lastIoTime if needed.
if (stats.getLastReadTime() == 0) {
stats.setLastReadTime(getActivationTime());
}
if (stats.getLastWriteTime() == 0) {
stats.setLastWriteTime(getActivationTime());
}
try {
((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
.getAttributeMap(session));
} catch (IoSessionInitializationException e) {
throw e;
} catch (Exception e) {
throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
}
try {
((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
.getWriteRequestQueue(session));
} catch (IoSessionInitializationException e) {
throw e;
} catch (Exception e) {
throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
}
if ((future != null) && (future instanceof ConnectFuture)) {
// DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
}
if (sessionInitializer != null) {
sessionInitializer.initializeSession(session, future);
}
finishSessionInitialization0(session, future);
}
…
好吧,其实这个方法没做什么事,只是把session存储的一些Attribute进行了初始化。
下一个是session.getProcessor().add(session),首先我们要注意到,在创建session时,已经把processor传进来了,那么sessoin调用的getProcessor会和传进来的是同一个么?
S session = newSession(processor, handle);
initSession(session, future, sessionInitializer);
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session);
getProcessor()是在NioSession.java中实现的:
//NioSession.java
protected final IoProcessor<NioSession> processor;
protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
super(service);
this.channel = channel;
this.processor = processor;
filterChain = new DefaultIoFilterChain(this);
}
@Override
public IoProcessor<NioSession> getProcessor() {
return processor;
}
所以答案是肯定是同一个。
那么这个processor在哪里创建的?或者说在AbstractPollingIoConnector出现的processor是在哪里创建的?
其实在NioSocketConnector中就能发现苗头:
//NioSocketConnector.java
public NioSocketConnector() {
super(new DefaultSocketSessionConfig(), NioProcessor.class);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
super方法:
//AbstractPollingIoConnector.java
protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
}
就是这个了—SimpleIoProcessorPool。
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType) {
this(processorType, null, DEFAULT_SIZE, null);
}
@SuppressWarnings("unchecked")
public SimpleIoProcessorPool(Class<? extends IoProcessor<S>> processorType, Executor executor, int size,
SelectorProvider selectorProvider) {
if (processorType == null) {
throw new IllegalArgumentException("processorType");
}
if (size <= 0) {
throw new IllegalArgumentException("size: " + size + " (expected: positive integer)");
}
// Create the executor if none is provided
createdExecutor = executor == null;
if (createdExecutor) {
this.executor = Executors.newCachedThreadPool();
// Set a default reject handler
((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
} else {
this.executor = executor;
}
pool = new IoProcessor[size];
boolean success = false;
Constructor<? extends IoProcessor<S>> processorConstructor = null;
boolean usesExecutorArg = true;
try {
// We create at least one processor
try {
try {
processorConstructor = processorType.getConstructor(ExecutorService.class);
pool[0] = processorConstructor.newInstance(this.executor);
} catch (NoSuchMethodException e1) {
// To the next step...
try {
if(selectorProvider==null) {
processorConstructor = processorType.getConstructor(Executor.class);
pool[0] = processorConstructor.newInstance(this.executor);
} else {
processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class);
pool[0] = processorConstructor.newInstance(this.executor,selectorProvider);
}
} catch (NoSuchMethodException e2) {
// To the next step...
try {
processorConstructor = processorType.getConstructor();
usesExecutorArg = false;
pool[0] = processorConstructor.newInstance();
} catch (NoSuchMethodException e3) {
// To the next step...
}
}
}
} catch (RuntimeException re) {
LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage());
throw re;
} catch (Exception e) {
String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage();
LOGGER.error(msg, e);
throw new RuntimeIoException(msg, e);
}
if (processorConstructor == null) {
// Raise an exception if no proper constructor is found.
String msg = String.valueOf(processorType) + " must have a public constructor with one "
+ ExecutorService.class.getSimpleName() + " parameter, a public constructor with one "
+ Executor.class.getSimpleName() + " parameter or a public default constructor.";
LOGGER.error(msg);
throw new IllegalArgumentException(msg);
}
// Constructor found now use it for all subsequent instantiations
for (int i = 1; i < pool.length; i++) {
try {
if (usesExecutorArg) {
if(selectorProvider==null) {
pool[i] = processorConstructor.newInstance(this.executor);
} else {
pool[i] = processorConstructor.newInstance(this.executor, selectorProvider);
}
} else {
pool[i] = processorConstructor.newInstance();
}
} catch (Exception e) {
// Won't happen because it has been done previously
}
}
success = true;
} finally {
if (!success) {
dispose();
}
}
}
中间这一大段是在利用反射,创建一个NioProcessor,因为从子类传进来的就是NioProcessor.class。
所以这个processor真实身份是一个NioProcessor。
session.getProcessor().add(session);
而NioProcessor并没有实现add方法,是由父类AbstractPollingIoProcessor来实现的:
@Override
public final void add(S session) {
if (disposed || disposing) {
throw new IllegalStateException("Already disposed.");
}
// Adds the session to the newSession queue and starts the worker
newSessions.add(session);
startupProcessor();
}
private void startupProcessor() {
Processor processor = processorRef.get();
if (processor == null) {
processor = new Processor();
if (processorRef.compareAndSet(null, processor)) {
executor.execute(new NamePreservingRunnable(processor, threadName));
}
}
// Just stop the select() and start it again, so that the processor
// can be activated immediately.
wakeup();
}
最后由startupProcessor将其启动起来了。
用一句话概括就是:在创建SocketConnector的同时创建了Processor,并在Connector连接成功后,启动了Processor。
这个NioProcessor的继承关系是这样的:
来一张整体时序图,凑合看:
以上都是作为客户端看到的,服务端与其大同小异。
//AbstractPollingIoAcceptor.java
private class Acceptor implements Runnable {
/**
* {@inheritDoc}
*/
@Override
public void run() {
assert acceptorRef.get() == this;
int nHandles = 0;
// Release the lock
lock.release();
while (selectable) {
try {
nHandles += registerHandles();
int selected = select();
if (nHandles == 0) {
acceptorRef.set(null);
if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
assert acceptorRef.get() != this;
break;
}
if (!acceptorRef.compareAndSet(null, this)) {
assert acceptorRef.get() != this;
break;
}
assert acceptorRef.get() == this;
}
if (selected > 0) {
// We have some connection request, let's process
// them here.
processHandles(selectedHandles());
}
// check to see if any cancellation request has been made.
nHandles -= unregisterHandles();
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
// Cleanup all the processors, and shutdown the acceptor.
if (selectable && isDisposing()) {
selectable = false;
try {
if (createdProcessor) {
processor.dispose();
}
} finally {
try {
synchronized (disposalLock) {
if (isDisposing()) {
destroy();
}
}
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
}
}
}
}
@SuppressWarnings("unchecked")
private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
// Associates a new created connection to a processor,
// and get back a session
S session = accept(processor, handle);
if (session == null) {
continue;
}
initSession(session, null, null);
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
}
}
//其他大量方法
}
可以看到,这里的Acceptor是类似于客户端Processor的存在,不同的是,作为服务端,Acceptor负责接收连接并建立session;最终会为每条连接的session调用到各自的processor,也就是进入之前客户端的逻辑。
//NioSocketAcceptor.java
public NioSocketAcceptor() {
super(new DefaultSocketSessionConfig(), NioProcessor.class);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
服务端SocketAcceptor构造方法几乎与客户端一样,因为也需要NioProcessor维护session。
看时序图:
从图中也可以看出,服务端的不同主要在于Acceptor这一接收连接的角色,其他后续和客户端基本相同。
以上。
最后
以上就是超帅飞鸟最近收集整理的关于从Mina源码看心跳超时机制的全部内容,更多相关从Mina源码看心跳超时机制内容请搜索靠谱客的其他文章。
发表评论 取消回复