我是靠谱客的博主 清秀自行车,这篇文章主要介绍netty源码阅读之十一(unsafe中的register,deregister,connect,bind等方法),现在分享给大家,希望可以做个参考。

这个分享阅读的是4.1.52.Final-SNAPSHOT这个版本的源码 

unsafe是每个channel都会对应的一个对象,把它定义为Unasfe是因为用户是无法直接操作这个接口的,它是由netty内部进行调用的,这个接口主要是用来进行具体的IO的处理的。并且除了register,deregister等,其他方法都需要在eventLoop中完成操作的。

下面表中列出了其主要的方法,并列出了由于这些方法可能会fire出的事件,其中红色的方法表示的是这些方法是outbound方法,即这些方法是从channel调用,然后经过ChannelPipeline中的ChannelHandler调用链最终走到的方法。

需要注意的是registerread方法不是outbound方法,而beginRead是outbound方法。beginReadchannelread方法最终会走到的方法,而read方法是NioUnsafe中定义的方法,是NioEventLoop调用的方法

下面来详细介绍一下这些方法

register

下面图展示了rgister做的主要的两件事情。

  • 其主要是将本channel所绑定的javaChannel对应的socketChannel注册到NioEventLoop的Selector中。
  • 将EventLoop绑定到Channel

其中第一件将SocketChannel注册到Selector中保证了NioEventLoop中对应Selector能处理这个channel对应的IO事件,而将EventLoop绑定到channel中则让这个channel能利用ChannelPipeline进行inbound事件和outbound事件调用,因为对于ChannelPipeline的对应的事件的调用都是需要在channel的EventLoop中进行的。

下面是register方法的具体的代码的实现。

另外需要注意的是这个方法是用户线程调用的,而不是由eventLoop线程进行调用的,其最终调用的register0方法则是会在EventLoop线程进行调用。

可以看到这个方法首先进行的操作则是将本channel的eventLoop赋值为传入的EventLoop,并且将具体的register操作register0在eventLoop中进行调用。register0则将SocketChannel绑定到Selector的操作留给AbstractNioChannel来进行完成,并在注册成后fire了相应的事件。

这个方法在注册成功后会调用invokeHandlerAddedIfNeeded来通知用户已经可以进行处理事件。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
//这个方法是从eventLoop的register调用到此的 public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) {//这里如果是同一个eventLoop register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { closeForcibly();//这里直接关闭channel不会有新的event被fire出来 closeFuture.setClosed();//设置closeFuture的状态 safeSetFailure(promise, t);//通知传入的promise已经结束 } } } private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister();//这里对于AbstractNioChannel是将其包装的channel注册到EventLoop中 neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded();//在给后续传递 safeSetSuccess(promise);//通知客户端已经执行成功 pipeline.fireChannelRegistered();//通知已经注册成功 // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { //autoRead的对应的beginRead()方法的调用是在fireChannelActive后进行调用的 //但是由于fireChannelActive只fire一次,对于先register然后deregister,在register等情况下,可能已经active过一次 //再次register则不会调用对应的beginRead(),则需要重新beginRead beginRead();//向SelectionKey注册InterestOps等 } } } catch (Throwable t) { closeForcibly();//直接关闭对应的channel,不进行fireEvent等操作。 closeFuture.setClosed();//通知closeFuture已经关闭 safeSetFailure(promise, t);//通知用户线程register结果 } } //AbstractNioChannel的方法 protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }

deregister

deregister是register的相反的操作,它主要进行的操作是将SocketChannel从Selector解除注册,不过没有将NioEventLoopChannel中解除绑定,故此channel在deregister后对应的inbound,outbound事件还是能进行继续操作。

下面的代码展示了具体的逻辑,其解除注册的操作委托给doDeregister来完成。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) { if (!promise.setUncancellable()) { return; } if (!registered) { safeSetSuccess(promise); return; } //这里直接将deregister在register之后进行,因为调用deregister的线程一定是eventLoop,但是可能原先线程还没有将事情做完 //而又调用了register操作将当前的eventLoop切换为新的eventLoop,则原先的eventLoop在deregister后则有可能用的是新的eventLoop invokeLater(new Runnable() { @Override public void run() { try { doDeregister();//对于AbstractNioChannel则是调用eventLoop的cancel操作 } catch (Throwable t) { logger.warn("Unexpected exception occurred while deregistering a channel.", t); } finally { if (fireChannelInactive) { pipeline.fireChannelInactive();//这里的eventLoop没有改变,则这个channel还是能执行fire操作让pipeline执行 } if (registered) {//对于没有调用registered则调用fireChannelUnregistered registered = false; pipeline.fireChannelUnregistered(); } safeSetSuccess(promise);//通知外部线程deregister结束 } } }); } //AbstractNioChannel中的方法 protected void doDeregister() throws Exception { eventLoop().cancel(selectionKey()); }

connect

connect的主要是连接到对应的服务器,其连接操作如果没有直接连接成功会注册一个OP_CONNECT到Selector中等待连接成功Selector对应事件的通知。

 下面的代码就是connect的具体的代码逻辑,它的具体的连接逻辑在doConnect方法中,这个doConnect方法具体是在NioSocketChannel中实现的,可以看到它是直接调用socketChannel来进行连接操作的,如果没有连接成功则注册一个OP_CONNECT到selector中,并且注册一个超时等待的定时器。等待NioEventLoop收到OP_CONNECT连接成功的事件会调用finishConnect来通知用户连接完成。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { if (connectPromise != null) { throw new ConnectionPendingException(); } boolean wasActive = isActive(); if (doConnect(remoteAddress, localAddress)) {//当前连接成功,promise完成 fulfillConnectPromise(promise, wasActive); } else {//当前还没有连接成功,设置一个超时定时器,等待 connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " +remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } //外部调用了cancel清理等待的promise promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } } protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress);//本地绑定 } boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);//直接连接 if (!connected) {//没有连接成功,则将连接注册到OP_CONNECT中 selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } } public final void finishConnect() { try { boolean wasActive = isActive(); doFinishConnect(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { if (connectTimeoutFuture != null) {//取消超时定时器 connectTimeoutFuture.cancel(false); } connectPromise = null;//释放连接promise,表示不在连接中 } }

disconnect

这个方法主要是将SocketChannel关闭了,并fire相应的事件。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final void disconnect(final ChannelPromise promise) { if (!promise.setUncancellable()) { return; } boolean wasActive = isActive(); try { doDisconnect();//javaChannel().close() remoteAddress = null; localAddress = null; } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (wasActive && !isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelInactive(); } }); } safeSetSuccess(promise); closeIfClosed(); // 将javaChannel关闭了后则调用close方法关闭这个channel }

bind

bind事件主要是直接用javaChannel进行bind操作,并fire相应的事件

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean wasActive = isActive(); try { doBind(localAddress);//javaChannel().bind(localAddress, config.getBacklog()); } catch (Throwable t) { safeSetFailure(promise, t);//通知用户bind失败 closeIfClosed();//channel关闭了就直接进行close逻辑 return; } if (!wasActive && isActive()) {//这边没有进行eventLoop的判断 invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise);//通知用户绑定成功 }

write

write主要进行的操作是将外部写入的数据最终写到ChannelOutboundBuffer的缓存链表的尾部,下面图展示了这个流程,其中虚线表示的这次写入的msg,而红色表示的是可以刷入,但是还没有输入到javaChannel中的消息。

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) {//对于outboundBuffer为空时则需要将msg释放以防止内存泄漏 safeSetFailure(promise, newClosedChannelException(initialCloseCause)); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg);//计算msg的大小,默认对于ByteBuf就是看readableBytes if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise);//将msg加到outboundBuffer中 }

flush

flush主要分为两步,一步是将ChannelOutBoundBuffer中的msg都标记为可以为flush的消息,另一步则在flush0操作,flush0会将ChannelOutBoundBuffer中的标记为可以flush的消息一点一点地写到SocketChannel中。

下面是flush和flush0的具体的操作,可以看到flush0的具体操作其实是委托给doWrite来 进行具体的写操作。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush();//将当前写到ChannelOutboundBuffer的消息都标记为可以被flush的状态 flush0(); } protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { //写失败后如果配了自动关闭则调用close将当前的channel进行close掉 initialCloseCause = t; close(voidPromise(), t, newClosedChannelException(t), false); } else { try { shutdownOutput(voidPromise(), t);//关闭outboundBuffer } catch (Throwable t2) { initialCloseCause = t; close(voidPromise(), t2, newClosedChannelException(t), false); } } } finally { inFlush0 = false; } }

下面来具体分析一下NioSocketChannel的doWrite操作。可以看到每次doWrite最多会进行writeSpinCount写操作,每次写最多会写入maxBytesPerGatheringWrite 的数据,并且这个maxBytesPerGatheringWrite 是动态变化的,netty会根据每次想要写入maxBytesPerGatheringWrite 以及最终写入到SocketChannel的字节localWrittenBytes来重新计算maxBytesPerGatheringWrite 的大小,如果一个字节都没有写入到SocketChannel,则会通过incompleteWrite方法注册一个OP_WRITE到NioEventLoop,NioEventLoop会在这个SocketChannel能再次写入数据时再次调用flush0继续写数据到SocketChannel。如果经历writeSpinCount次循环还是没有写完所有数据,则利用incompleteWrite方法再次加入一个flush0的事件继续写入数据

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { clearOpWrite(); return; } int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0: //不能转换为ByteBuffer,采用一般的写逻辑 writeSpinCount -= doWrite0(in); break; case 1: { ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) {//ch的写buffer已经满了,则注册OP_WRITE到NioEventLoop中等待可以再次写入是再写 incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);//自适应maxBytesPerGatheringWrite in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); //没写完则清理掉OP_WRITE,不让NioEventLoop触发flush,并加入flush任务等下次再写 //写完了则加入OP_WRITE等NioEventLoop来写入 incompleteWrite(writeSpinCount < 0); }

beginRead

beginRead的是channel的read方法走过DefaultChannelPipeline最终调用的方法,它主要做的一件事情就是将对应的readOps注册到NioEventLoop的Selector中,不过对于不同的Channel这个readOps是不同的,对于NioSocketChannel,其对应的readOps是OP_READ,而对于NioServerSocketChannel,其对应的readOps则是OP_ACCEPT。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { doBeginRead();//将readInterestOp注册到selector中 } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }

 read

read操作不是ChannelOutBound方法,它也不是从ChannelPipeline走过socketC来的方法,它是有NioEventLoop在收到OP_READ的事件时会调用的方法,主要是SocketChannel中有可读的数据时,或者ServerSocketChannel受到了客户端的连接时会调用这个read方法,对于NioSocketChannel会读取SocketChannel中的数据最终会转换为ByteBuf通过fireChannelRead给用户去处理,而对于NioServerSocketChannel会将ServerSocketChanel接收到的SocketChannel包装成NioSocketChannel给用户去处理,一般情况下,利用ServerBootstrap来启动服务器时,ServerBootstrap会将这个NioSocketChannel直接注册到子NioEventLoopGroup中进行相关的处理,从而形成了Reactor的线程模型。

下面的代码是NIoSocketChannel 的对应的读操作,其主要是通过Handle这个分析器来分析每次需要读多少字节的数据,并读了一个ByteBuf再调用fireChannelRead给客户端去对这个ByteBuf进行操作。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator();//配置的ByteBufAllocator final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator);//分配一个ByteBuf,这个ByteBuf的初始大小是通过历史的大小计算的 allocHandle.lastBytesRead(doReadBytes(byteBuf));//从javaChannel读取数据到ByteBuf中 if (allocHandle.lastBytesRead() <= 0) {//没有数据可读 byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0;//收到了EOF if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//评估能否继续读 allocHandle.readComplete(); pipeline.fireChannelReadComplete();//读完成 if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }

下面是AbstractNioMessageUnsafe中的read方法,和上面的方法逻辑差不多,主要的区别是此处是doReadMessages,其获取的msg是NioSocketChannel。 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf);//NioServerSocketChannel if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }

close

close方法很多操作失败都可能会调用的方法,它主要是清理outboundBuffer,关闭对应的SocketChannel,并且会进行deregister操作解除绑定。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
//close的方法来源一个是channel的close最终会调用到,一个是flush0的close最终会调用到 private void close(final ChannelPromise promise, final Throwable cause, final ClosedChannelException closeCause, final boolean notify) { if (!promise.setUncancellable()) { return; } if (closeInitiated) { //可能因为当前的close事件而触发了其他的事件,而另外的事件而再次触发了这个close事件 //最终导致了递归调用,这里对这个递归调用进行处理 if (closeFuture.isDone()) {//当前closeFuture已经结束过 safeSetSuccess(promise); } else if (!(promise instanceof VoidChannelPromise)) { closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { promise.setSuccess(); } }); } return; } closeInitiated = true; final boolean wasActive = isActive(); final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; //这里直接将outboundBuffer设置为空,不让再有消息写入buffer或者buffer写出消息 this.outboundBuffer = null; Executor closeExecutor = prepareToClose(); if (closeExecutor != null) { closeExecutor.execute(new Runnable() {//这里的closeExecutor可能不是eventLoop @Override public void run() { try { doClose0(promise);//这里表示close操作是在另外一个 } finally { // Call invokeLater so closeAndDeregister is executed in the EventLoop again! invokeLater(new Runnable() { @Override public void run() { if (outboundBuffer != null) { outboundBuffer.failFlushed(cause, notify); outboundBuffer.close(closeCause); } fireChannelInactiveAndDeregister(wasActive); } }); } } }); } else { try { doClose0(promise);//关闭channel } finally { if (outboundBuffer != null) { outboundBuffer.failFlushed(cause, notify);//清理掉flushedEntry到unflushedEntry之间的消息 outboundBuffer.close(closeCause);//清理从unflushedEntry之后的消息 } } if (inFlush0) {//正在flush中表示这个close是因为flush0失败而导致的close,所以让其先走完原先的flush0的操作 invokeLater(new Runnable() { @Override public void run() { fireChannelInactiveAndDeregister(wasActive); } }); } else { fireChannelInactiveAndDeregister(wasActive);//调用deregister从eventLoop中解绑 } } } //这个方法可能会是多线程调用 private void doClose0(ChannelPromise promise) { try { doClose();//关闭对应的channel closeFuture.setClosed(); safeSetSuccess(promise); } catch (Throwable t) { closeFuture.setClosed(); safeSetFailure(promise, t); } }

 

最后

以上就是清秀自行车最近收集整理的关于netty源码阅读之十一(unsafe中的register,deregister,connect,bind等方法)的全部内容,更多相关netty源码阅读之十一(unsafe中内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(85)

评论列表共有 0 条评论

立即
投稿
返回
顶部