我是靠谱客的博主 闪闪冬日,最近开发中收集的这篇文章主要介绍Netty源码学习-ReadTimeoutHandler,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

ReadTimeoutHandler的实现思路:
开启一个定时任务,如果在指定时间内没有接收到消息,则抛出ReadTimeoutException
这个异常的捕获,在开发中,交给跟在ReadTimeoutHandler后面的ChannelHandler,例如


private final ChannelHandler timeoutHandler =
new ReadTimeoutHandler(timer, READ_TIMEOUT);
private final ChannelHandler uptimeHandler =
new UptimeClientHandler(bootstrap, timer);

public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
timeoutHandler, uptimeHandler);
}

public class UptimeClientHandler ...{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
Throwable cause = e.getCause();
if (cause instanceof ReadTimeoutException) {
// The connection was OK but there was no traffic for last period.
println("Disconnecting due to no inbound traffic");
} else {
cause.printStackTrace();
}
ctx.getChannel().close();
}
}


ReadTimeoutHandler的关键源码:


//ChannelOpen时启动定时任务:
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
initialize(ctx);
ctx.sendUpstream(e);
}
private void initialize(ChannelHandlerContext ctx) {
State state = state(ctx);
state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS);
}


//每次接收到消息时更新lastReadTime
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
State state = (State) ctx.getAttachment();
state.lastReadTime = System.currentTimeMillis();
ctx.sendUpstream(e);
}

/*定时任务:判断指定时间内是否有消息到达
举例:
假设超时时间设为30秒,初始的lastReadTime=10:00:00
那么,定时任务在10:00:30时执行run方法,如果:
1.在10:00:18有消息到达(lastReadTime更新为10:00:18),则表示没有超时,
继续监听下一个30秒,也就是定时任务需要在10:00:48再跑一次
因此下一次定时任务的执行距离现在是:nextDelay=30-(30-18)=18(秒)
2.没有消息到达,超时,抛异常
*/
private final class ReadTimeoutTask implements TimerTask {
public void run(Timeout timeout) throws Exception {
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis();
long nextDelay = timeoutMillis - (currentTime - state.lastReadTime);
if (nextDelay <= 0) {
// Read timed out - set a new timeout and notify the callback.
state.timeout =
timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
try {
// FIXME This should be called from an I/O thread.
// To be fixed in Netty 4.
readTimedOut(ctx);
} catch (Throwable t) {
fireExceptionCaught(ctx, t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
state.timeout =
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
}
}
}

//为什么这里会调用initialize方法?分析在下面
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.getPipeline().isAttached()) {
// channelOpen event has been fired already, which means
// this.channelOpen() will not be invoked.
// We have to initialize here instead.
initialize(ctx);
} else {
// channelOpen event has not been fired yet.
// this.channelOpen() will be invoked and initialization will occur there.
}
}



上面的beforeAdd方法不太好理解
先看看ClientBootstrap的connect方法:

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

ChannelPipeline pipeline;
try {

//这里调用ChannelPipeline.addLast,在真正往链表里面插入之前,调用beforeAdd
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}

//创建一个代表Client的SocketChannel,SocketChannel的构造函数里会调用:
// pipeline.attach(this, sink);
//然后会fireChannelOpen
Channel ch = getFactory().newChannel(pipeline);

//...
}


从正常的流程来说,是先创建创建pipeline再创建channel,
也就是beforeAdd会在channel创建之前调用,那么beforeAdd里面的判断:
if (ctx.getPipeline().isAttached()) 就不会返回true(因为此时channel还未创建,更不可能与pipeline关联了)
这样看来,只需要在channelOpen中调用initialize就可以了?
不是的,
因为还有一种情况:动态添加ChannelHandler
有可能channel已经创建(与pipeline关联了),且channelOpen已经执行过了,
那就需要在添加ReadTimeoutHandler时,执行initialize

最后

以上就是闪闪冬日为你收集整理的Netty源码学习-ReadTimeoutHandler的全部内容,希望文章能够帮你解决Netty源码学习-ReadTimeoutHandler所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部