我是靠谱客的博主 清秀自行车,最近开发中收集的这篇文章主要介绍netty源码阅读之十一(unsafe中的register,deregister,connect,bind等方法),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

这个分享阅读的是4.1.52.Final-SNAPSHOT这个版本的源码 

unsafe是每个channel都会对应的一个对象,把它定义为Unasfe是因为用户是无法直接操作这个接口的,它是由netty内部进行调用的,这个接口主要是用来进行具体的IO的处理的。并且除了register,deregister等,其他方法都需要在eventLoop中完成操作的。

下面表中列出了其主要的方法,并列出了由于这些方法可能会fire出的事件,其中红色的方法表示的是这些方法是outbound方法,即这些方法是从channel调用,然后经过ChannelPipeline中的ChannelHandler调用链最终走到的方法。

需要注意的是registerread方法不是outbound方法,而beginRead是outbound方法。beginReadchannelread方法最终会走到的方法,而read方法是NioUnsafe中定义的方法,是NioEventLoop调用的方法

下面来详细介绍一下这些方法

register

下面图展示了rgister做的主要的两件事情。

  • 其主要是将本channel所绑定的javaChannel对应的socketChannel注册到NioEventLoop的Selector中。
  • 将EventLoop绑定到Channel

其中第一件将SocketChannel注册到Selector中保证了NioEventLoop中对应Selector能处理这个channel对应的IO事件,而将EventLoop绑定到channel中则让这个channel能利用ChannelPipeline进行inbound事件和outbound事件调用,因为对于ChannelPipeline的对应的事件的调用都是需要在channel的EventLoop中进行的。

下面是register方法的具体的代码的实现。

另外需要注意的是这个方法是用户线程调用的,而不是由eventLoop线程进行调用的,其最终调用的register0方法则是会在EventLoop线程进行调用。

可以看到这个方法首先进行的操作则是将本channel的eventLoop赋值为传入的EventLoop,并且将具体的register操作register0在eventLoop中进行调用。register0则将SocketChannel绑定到Selector的操作留给AbstractNioChannel来进行完成,并在注册成后fire了相应的事件。

这个方法在注册成功后会调用invokeHandlerAddedIfNeeded来通知用户已经可以进行处理事件。

//这个方法是从eventLoop的register调用到此的
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(new IllegalStateException("incompatible event loop type: " eventLoop.getClass().getName()));
        return;
    }
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {//这里如果是同一个eventLoop
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            closeForcibly();//这里直接关闭channel不会有新的event被fire出来
            closeFuture.setClosed();//设置closeFuture的状态
            safeSetFailure(promise, t);//通知传入的promise已经结束
        }
    }
}

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();//这里对于AbstractNioChannel是将其包装的channel注册到EventLoop中
        neverRegistered = false;
        registered = true;
        pipeline.invokeHandlerAddedIfNeeded();//在给后续传递
        safeSetSuccess(promise);//通知客户端已经执行成功
        pipeline.fireChannelRegistered();//通知已经注册成功
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
            	//autoRead的对应的beginRead()方法的调用是在fireChannelActive后进行调用的
            	//但是由于fireChannelActive只fire一次,对于先register然后deregister,在register等情况下,可能已经active过一次
            	//再次register则不会调用对应的beginRead(),则需要重新beginRead
                beginRead();//向SelectionKey注册InterestOps等
            }
        }
    } catch (Throwable t) {
        closeForcibly();//直接关闭对应的channel,不进行fireEvent等操作。
        closeFuture.setClosed();//通知closeFuture已经关闭
        safeSetFailure(promise, t);//通知用户线程register结果
    }
}
//AbstractNioChannel的方法
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

deregister

deregister是register的相反的操作,它主要进行的操作是将SocketChannel从Selector解除注册,不过没有将NioEventLoopChannel中解除绑定,故此channel在deregister后对应的inbound,outbound事件还是能进行继续操作。

下面的代码展示了具体的逻辑,其解除注册的操作委托给doDeregister来完成。

private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
    if (!promise.setUncancellable()) {
        return;
    }
    if (!registered) {
        safeSetSuccess(promise);
        return;
    }
    //这里直接将deregister在register之后进行,因为调用deregister的线程一定是eventLoop,但是可能原先线程还没有将事情做完
    //而又调用了register操作将当前的eventLoop切换为新的eventLoop,则原先的eventLoop在deregister后则有可能用的是新的eventLoop
    invokeLater(new Runnable() {
        @Override
        public void run() {
            try {
                doDeregister();//对于AbstractNioChannel则是调用eventLoop的cancel操作
            } catch (Throwable t) {
                logger.warn("Unexpected exception occurred while deregistering a channel.", t);
            } finally {
                if (fireChannelInactive) {
                    pipeline.fireChannelInactive();//这里的eventLoop没有改变,则这个channel还是能执行fire操作让pipeline执行
                }
                if (registered) {//对于没有调用registered则调用fireChannelUnregistered
                    registered = false;
                    pipeline.fireChannelUnregistered();
                }
                safeSetSuccess(promise);//通知外部线程deregister结束
            }
        }
    });
}

//AbstractNioChannel中的方法
protected void doDeregister() throws Exception {
    eventLoop().cancel(selectionKey());
} 

connect

connect的主要是连接到对应的服务器,其连接操作如果没有直接连接成功会注册一个OP_CONNECT到Selector中等待连接成功Selector对应事件的通知。

 下面的代码就是connect的具体的代码逻辑,它的具体的连接逻辑在doConnect方法中,这个doConnect方法具体是在NioSocketChannel中实现的,可以看到它是直接调用socketChannel来进行连接操作的,如果没有连接成功则注册一个OP_CONNECT到selector中,并且注册一个超时等待的定时器。等待NioEventLoop收到OP_CONNECT连接成功的事件会调用finishConnect来通知用户连接完成。

public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }
    try {
        if (connectPromise != null) {
            throw new ConnectionPendingException();
        }
        boolean wasActive = isActive();
        if (doConnect(remoteAddress, localAddress)) {//当前连接成功,promise完成
            fulfillConnectPromise(promise, wasActive);
        } else {//当前还没有连接成功,设置一个超时定时器,等待
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;
            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " +remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            //外部调用了cancel清理等待的promise
            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);//本地绑定
    }
    boolean success = false;
    try {
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);//直接连接
        if (!connected) {//没有连接成功,则将连接注册到OP_CONNECT中
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}
public final void finishConnect() {
    try {
        boolean wasActive = isActive();
        doFinishConnect();
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {
        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
    } finally {
        if (connectTimeoutFuture != null) {//取消超时定时器
            connectTimeoutFuture.cancel(false);
        }
        connectPromise = null;//释放连接promise,表示不在连接中
    }
}

disconnect

这个方法主要是将SocketChannel关闭了,并fire相应的事件。

public final void disconnect(final ChannelPromise promise) {
    if (!promise.setUncancellable()) {
        return;
    }
    boolean wasActive = isActive();
    try {
        doDisconnect();//javaChannel().close()
        remoteAddress = null;
        localAddress = null;
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    if (wasActive && !isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelInactive();
            }
        });
    }
    safeSetSuccess(promise);
    closeIfClosed(); // 将javaChannel关闭了后则调用close方法关闭这个channel
}

bind

bind事件主要是直接用javaChannel进行bind操作,并fire相应的事件

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }
    boolean wasActive = isActive();
    try {
        doBind(localAddress);//javaChannel().bind(localAddress, config.getBacklog());
    } catch (Throwable t) {
        safeSetFailure(promise, t);//通知用户bind失败
        closeIfClosed();//channel关闭了就直接进行close逻辑
        return;
    }
    if (!wasActive && isActive()) {//这边没有进行eventLoop的判断
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    safeSetSuccess(promise);//通知用户绑定成功
}

write

write主要进行的操作是将外部写入的数据最终写到ChannelOutboundBuffer的缓存链表的尾部,下面图展示了这个流程,其中虚线表示的这次写入的msg,而红色表示的是可以刷入,但是还没有输入到javaChannel中的消息。

 

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {//对于outboundBuffer为空时则需要将msg释放以防止内存泄漏
        safeSetFailure(promise, newClosedChannelException(initialCloseCause));
        ReferenceCountUtil.release(msg);
        return;
    }
    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);//计算msg的大小,默认对于ByteBuf就是看readableBytes
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }
    outboundBuffer.addMessage(msg, size, promise);//将msg加到outboundBuffer中
}

flush

flush主要分为两步,一步是将ChannelOutBoundBuffer中的msg都标记为可以为flush的消息,另一步则在flush0操作,flush0会将ChannelOutBoundBuffer中的标记为可以flush的消息一点一点地写到SocketChannel中。

下面是flush和flush0的具体的操作,可以看到flush0的具体操作其实是委托给doWrite来 进行具体的写操作。

public final void flush() {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    outboundBuffer.addFlush();//将当前写到ChannelOutboundBuffer的消息都标记为可以被flush的状态
    flush0();
}
protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }
    inFlush0 = true;
    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(new NotYetConnectedException(), true);
            } else {
                outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }
    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            //写失败后如果配了自动关闭则调用close将当前的channel进行close掉
            initialCloseCause = t;
            close(voidPromise(), t, newClosedChannelException(t), false);
        } else {
            try {
                shutdownOutput(voidPromise(), t);//关闭outboundBuffer
            } catch (Throwable t2) {
                initialCloseCause = t;
                close(voidPromise(), t2, newClosedChannelException(t), false);
            }
        }
    } finally {
        inFlush0 = false;
    }
}

下面来具体分析一下NioSocketChannel的doWrite操作。可以看到每次doWrite最多会进行writeSpinCount写操作,每次写最多会写入maxBytesPerGatheringWrite 的数据,并且这个maxBytesPerGatheringWrite 是动态变化的,netty会根据每次想要写入maxBytesPerGatheringWrite 以及最终写入到SocketChannel的字节localWrittenBytes来重新计算maxBytesPerGatheringWrite 的大小,如果一个字节都没有写入到SocketChannel,则会通过incompleteWrite方法注册一个OP_WRITE到NioEventLoop,NioEventLoop会在这个SocketChannel能再次写入数据时再次调用flush0继续写数据到SocketChannel。如果经历writeSpinCount次循环还是没有写完所有数据,则利用incompleteWrite方法再次加入一个flush0的事件继续写入数据

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
    int writeSpinCount = config().getWriteSpinCount();
    do {
        if (in.isEmpty()) {
            clearOpWrite();
            return;
        }
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
        int nioBufferCnt = in.nioBufferCount();
        switch (nioBufferCnt) {
            case 0:
                //不能转换为ByteBuffer,采用一般的写逻辑
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                long attemptedBytes = in.nioBufferSize();
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {//ch的写buffer已经满了,则注册OP_WRITE到NioEventLoop中等待可以再次写入是再写
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);//自适应maxBytesPerGatheringWrite
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);
    //没写完则清理掉OP_WRITE,不让NioEventLoop触发flush,并加入flush任务等下次再写
    //写完了则加入OP_WRITE等NioEventLoop来写入
    incompleteWrite(writeSpinCount < 0);
}

beginRead

beginRead的是channel的read方法走过DefaultChannelPipeline最终调用的方法,它主要做的一件事情就是将对应的readOps注册到NioEventLoop的Selector中,不过对于不同的Channel这个readOps是不同的,对于NioSocketChannel,其对应的readOps是OP_READ,而对于NioServerSocketChannel,其对应的readOps则是OP_ACCEPT。

public final void beginRead() {
    assertEventLoop();
    if (!isActive()) {
        return;
    }
    try {
        doBeginRead();//将readInterestOp注册到selector中
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

 read

read操作不是ChannelOutBound方法,它也不是从ChannelPipeline走过socketC来的方法,它是有NioEventLoop在收到OP_READ的事件时会调用的方法,主要是SocketChannel中有可读的数据时,或者ServerSocketChannel受到了客户端的连接时会调用这个read方法,对于NioSocketChannel会读取SocketChannel中的数据最终会转换为ByteBuf通过fireChannelRead给用户去处理,而对于NioServerSocketChannel会将ServerSocketChanel接收到的SocketChannel包装成NioSocketChannel给用户去处理,一般情况下,利用ServerBootstrap来启动服务器时,ServerBootstrap会将这个NioSocketChannel直接注册到子NioEventLoopGroup中进行相关的处理,从而形成了Reactor的线程模型。

下面的代码是NIoSocketChannel 的对应的读操作,其主要是通过Handle这个分析器来分析每次需要读多少字节的数据,并读了一个ByteBuf再调用fireChannelRead给客户端去对这个ByteBuf进行操作。

public final void read() {
      final ChannelConfig config = config();
      if (shouldBreakReadReady(config)) {
          clearReadPending();
          return;
      }
      final ChannelPipeline pipeline = pipeline();
      final ByteBufAllocator allocator = config.getAllocator();//配置的ByteBufAllocator 
      final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
      allocHandle.reset(config);
      ByteBuf byteBuf = null;
      boolean close = false;
      try {
          do {
              byteBuf = allocHandle.allocate(allocator);//分配一个ByteBuf,这个ByteBuf的初始大小是通过历史的大小计算的
              allocHandle.lastBytesRead(doReadBytes(byteBuf));//从javaChannel读取数据到ByteBuf中
              if (allocHandle.lastBytesRead() <= 0) {//没有数据可读
                  byteBuf.release();
                  byteBuf = null;
                  close = allocHandle.lastBytesRead() < 0;//收到了EOF
                  if (close) {
                      readPending = false;
                  }
                  break;
              }
              allocHandle.incMessagesRead(1);
              readPending = false;
              pipeline.fireChannelRead(byteBuf);
              byteBuf = null;
          } while (allocHandle.continueReading());//评估能否继续读
          allocHandle.readComplete();
          pipeline.fireChannelReadComplete();//读完成
          if (close) {
              closeOnRead(pipeline);
          }
      } catch (Throwable t) {
          handleReadException(pipeline, byteBuf, t, close, allocHandle);
      } finally {
          if (!readPending && !config.isAutoRead()) {
              removeReadOp();
          }
      }
  }
}

下面是AbstractNioMessageUnsafe中的read方法,和上面的方法逻辑差不多,主要的区别是此处是doReadMessages,其获取的msg是NioSocketChannel。 

public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    int localRead = doReadMessages(readBuf);//NioServerSocketChannel
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
            if (exception != null) {
                closed = closeOnReadError(exception);
                pipeline.fireExceptionCaught(exception);
            }
            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

close

close方法很多操作失败都可能会调用的方法,它主要是清理outboundBuffer,关闭对应的SocketChannel,并且会进行deregister操作解除绑定。

//close的方法来源一个是channel的close最终会调用到,一个是flush0的close最终会调用到
private void close(final ChannelPromise promise, final Throwable cause,
                   final ClosedChannelException closeCause, final boolean notify) {
    if (!promise.setUncancellable()) {
        return;
    }
    if (closeInitiated) {
    	//可能因为当前的close事件而触发了其他的事件,而另外的事件而再次触发了这个close事件
    	//最终导致了递归调用,这里对这个递归调用进行处理
        if (closeFuture.isDone()) {//当前closeFuture已经结束过
            safeSetSuccess(promise);
        } else if (!(promise instanceof VoidChannelPromise)) { 
            closeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    promise.setSuccess();
                }
            });
        }
        return;
    }
    closeInitiated = true;
    final boolean wasActive = isActive();
    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    //这里直接将outboundBuffer设置为空,不让再有消息写入buffer或者buffer写出消息
    this.outboundBuffer = null; 
    Executor closeExecutor = prepareToClose();
    if (closeExecutor != null) {
        closeExecutor.execute(new Runnable() {//这里的closeExecutor可能不是eventLoop
            @Override
            public void run() {
                try {
                    doClose0(promise);//这里表示close操作是在另外一个
                } finally {
                    // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            if (outboundBuffer != null) {
                                outboundBuffer.failFlushed(cause, notify);
                                outboundBuffer.close(closeCause);
                            }
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                }
            }
        });
    } else {
        try {
            doClose0(promise);//关闭channel
        } finally {
            if (outboundBuffer != null) {
                outboundBuffer.failFlushed(cause, notify);//清理掉flushedEntry到unflushedEntry之间的消息
                outboundBuffer.close(closeCause);//清理从unflushedEntry之后的消息
            }
        }
        if (inFlush0) {//正在flush中表示这个close是因为flush0失败而导致的close,所以让其先走完原先的flush0的操作
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    fireChannelInactiveAndDeregister(wasActive);
                }
            });
        } else {
            fireChannelInactiveAndDeregister(wasActive);//调用deregister从eventLoop中解绑
        }
    }
}
//这个方法可能会是多线程调用	
private void doClose0(ChannelPromise promise) {
    try {
        doClose();//关闭对应的channel
        closeFuture.setClosed();
        safeSetSuccess(promise);
    } catch (Throwable t) {
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

 

最后

以上就是清秀自行车为你收集整理的netty源码阅读之十一(unsafe中的register,deregister,connect,bind等方法)的全部内容,希望文章能够帮你解决netty源码阅读之十一(unsafe中的register,deregister,connect,bind等方法)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(74)

评论列表共有 0 条评论

立即
投稿
返回
顶部