概述
Netty为超时控制封装了两个类ReadTimeoutHandler和WriteTimeoutHandler,ReadTimeoutHandler,用于控制读取数据的时候的超时,如果在设置时间段内都没有数据读取了,那么就引发超时,然后关闭当前的channel;WriteTimeoutHandler,用于控制数据输出的时候的超时,如果在设置时间段内都没有数据写了,那么就超时。它们都是IdleStateHandler的子类。
开刀一下ReadTimeoutHandler:
public class ReadTimeoutHandler extends IdleStateHandler {
private boolean closed;
/**
* Creates a new instance.
*
* @param timeoutSeconds
* read timeout in seconds
*/
public ReadTimeoutHandler(int timeoutSeconds) {
this(timeoutSeconds, TimeUnit.SECONDS);
}
/**
* Creates a new instance.
*
* @param timeout
* read timeout
* @param unit
* the {@link TimeUnit} of {@code timeout}
*/
public ReadTimeoutHandler(long timeout, TimeUnit unit) {
super(timeout, 0, 0, unit);
}
@Override
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
assert evt.state() == IdleState.READER_IDLE;
readTimedOut(ctx);
}
/**
* Is called when a read timeout was detected.
*/
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
}
ReadTimeoutHandler构造函数实际上调用的是父类IdleStateHandler的构造函数,当链路激活事件触发后,会初始化相关定时任务:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
}
schedule会获取与该channel绑定的EventLoop来执行定时任务,这里主要看下ReaderIdleTimeoutTask。
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); //这里相当于再设置一下timeout,确保channel关闭
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
当超时发生后,会调用到channelIdle方法,ReadTimeoutHandler重写(Overriding)了该方法,ReadTimeoutHandler会抛出一个ReadTimeoutException.INSTANCE异常,然后关闭掉对应的ChannelHandlerContext。
那个在自定义的Handler中需要关注exceptionCaught:
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof ReadTimeoutException) {
// do something
} else {
super.exceptionCaught(ctx, cause);
}
}
最后
以上就是欢喜汽车为你收集整理的Netty超时控制handler的全部内容,希望文章能够帮你解决Netty超时控制handler所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复