概述
这个分享阅读的是4.1.52.Final-SNAPSHOT这个版本的源码
unsafe是每个channel都会对应的一个对象,把它定义为Unasfe是因为用户是无法直接操作这个接口的,它是由netty内部进行调用的,这个接口主要是用来进行具体的IO的处理的。并且除了register,deregister等,其他方法都需要在eventLoop中完成操作的。
下面表中列出了其主要的方法,并列出了由于这些方法可能会fire出的事件,其中红色的方法表示的是这些方法是outbound方法,即这些方法是从channel调用,然后经过ChannelPipeline中的ChannelHandler调用链最终走到的方法。
需要注意的是register和read方法不是outbound方法,而beginRead是outbound方法。beginRead是channel的read方法最终会走到的方法,而read方法是NioUnsafe中定义的方法,是NioEventLoop调用的方法
下面来详细介绍一下这些方法
register
下面图展示了rgister做的主要的两件事情。
- 其主要是将本channel所绑定的javaChannel对应的socketChannel注册到NioEventLoop的Selector中。
- 将EventLoop绑定到Channel
其中第一件将SocketChannel注册到Selector中保证了NioEventLoop中对应Selector能处理这个channel对应的IO事件,而将EventLoop绑定到channel中则让这个channel能利用ChannelPipeline进行inbound事件和outbound事件调用,因为对于ChannelPipeline的对应的事件的调用都是需要在channel的EventLoop中进行的。
下面是register方法的具体的代码的实现。
另外需要注意的是这个方法是用户线程调用的,而不是由eventLoop线程进行调用的,其最终调用的register0方法则是会在EventLoop线程进行调用。
可以看到这个方法首先进行的操作则是将本channel的eventLoop赋值为传入的EventLoop,并且将具体的register操作register0在eventLoop中进行调用。register0则将SocketChannel绑定到Selector的操作留给AbstractNioChannel来进行完成,并在注册成后fire了相应的事件。
这个方法在注册成功后会调用invokeHandlerAddedIfNeeded来通知用户已经可以进行处理事件。
//这个方法是从eventLoop的register调用到此的
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {//这里如果是同一个eventLoop
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
closeForcibly();//这里直接关闭channel不会有新的event被fire出来
closeFuture.setClosed();//设置closeFuture的状态
safeSetFailure(promise, t);//通知传入的promise已经结束
}
}
}
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();//这里对于AbstractNioChannel是将其包装的channel注册到EventLoop中
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();//在给后续传递
safeSetSuccess(promise);//通知客户端已经执行成功
pipeline.fireChannelRegistered();//通知已经注册成功
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
//autoRead的对应的beginRead()方法的调用是在fireChannelActive后进行调用的
//但是由于fireChannelActive只fire一次,对于先register然后deregister,在register等情况下,可能已经active过一次
//再次register则不会调用对应的beginRead(),则需要重新beginRead
beginRead();//向SelectionKey注册InterestOps等
}
}
} catch (Throwable t) {
closeForcibly();//直接关闭对应的channel,不进行fireEvent等操作。
closeFuture.setClosed();//通知closeFuture已经关闭
safeSetFailure(promise, t);//通知用户线程register结果
}
}
//AbstractNioChannel的方法
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
deregister
deregister是register的相反的操作,它主要进行的操作是将SocketChannel从Selector解除注册,不过没有将NioEventLoop从Channel中解除绑定,故此channel在deregister后对应的inbound,outbound事件还是能进行继续操作。
下面的代码展示了具体的逻辑,其解除注册的操作委托给doDeregister来完成。
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) {
safeSetSuccess(promise);
return;
}
//这里直接将deregister在register之后进行,因为调用deregister的线程一定是eventLoop,但是可能原先线程还没有将事情做完
//而又调用了register操作将当前的eventLoop切换为新的eventLoop,则原先的eventLoop在deregister后则有可能用的是新的eventLoop
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister();//对于AbstractNioChannel则是调用eventLoop的cancel操作
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive();//这里的eventLoop没有改变,则这个channel还是能执行fire操作让pipeline执行
}
if (registered) {//对于没有调用registered则调用fireChannelUnregistered
registered = false;
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);//通知外部线程deregister结束
}
}
});
}
//AbstractNioChannel中的方法
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
connect
connect的主要是连接到对应的服务器,其连接操作如果没有直接连接成功会注册一个OP_CONNECT到Selector中等待连接成功Selector对应事件的通知。
下面的代码就是connect的具体的代码逻辑,它的具体的连接逻辑在doConnect方法中,这个doConnect方法具体是在NioSocketChannel中实现的,可以看到它是直接调用socketChannel来进行连接操作的,如果没有连接成功则注册一个OP_CONNECT到selector中,并且注册一个超时等待的定时器。等待NioEventLoop收到OP_CONNECT连接成功的事件会调用finishConnect来通知用户连接完成。
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {//当前连接成功,promise完成
fulfillConnectPromise(promise, wasActive);
} else {//当前还没有连接成功,设置一个超时定时器,等待
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " +remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
//外部调用了cancel清理等待的promise
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);//本地绑定
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);//直接连接
if (!connected) {//没有连接成功,则将连接注册到OP_CONNECT中
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
public final void finishConnect() {
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
if (connectTimeoutFuture != null) {//取消超时定时器
connectTimeoutFuture.cancel(false);
}
connectPromise = null;//释放连接promise,表示不在连接中
}
}
disconnect
这个方法主要是将SocketChannel关闭了,并fire相应的事件。
public final void disconnect(final ChannelPromise promise) {
if (!promise.setUncancellable()) {
return;
}
boolean wasActive = isActive();
try {
doDisconnect();//javaChannel().close()
remoteAddress = null;
localAddress = null;
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
safeSetSuccess(promise);
closeIfClosed(); // 将javaChannel关闭了后则调用close方法关闭这个channel
}
bind
bind事件主要是直接用javaChannel进行bind操作,并fire相应的事件
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean wasActive = isActive();
try {
doBind(localAddress);//javaChannel().bind(localAddress, config.getBacklog());
} catch (Throwable t) {
safeSetFailure(promise, t);//通知用户bind失败
closeIfClosed();//channel关闭了就直接进行close逻辑
return;
}
if (!wasActive && isActive()) {//这边没有进行eventLoop的判断
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);//通知用户绑定成功
}
write
write主要进行的操作是将外部写入的数据最终写到ChannelOutboundBuffer的缓存链表的尾部,下面图展示了这个流程,其中虚线表示的这次写入的msg,而红色表示的是可以刷入,但是还没有输入到javaChannel中的消息。
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {//对于outboundBuffer为空时则需要将msg释放以防止内存泄漏
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);//计算msg的大小,默认对于ByteBuf就是看readableBytes
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);//将msg加到outboundBuffer中
}
flush
flush主要分为两步,一步是将ChannelOutBoundBuffer中的msg都标记为可以为flush的消息,另一步则在flush0操作,flush0会将ChannelOutBoundBuffer中的标记为可以flush的消息一点一点地写到SocketChannel中。
下面是flush和flush0的具体的操作,可以看到flush0的具体操作其实是委托给doWrite来 进行具体的写操作。
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();//将当前写到ChannelOutboundBuffer的消息都标记为可以被flush的状态
flush0();
}
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
//写失败后如果配了自动关闭则调用close将当前的channel进行close掉
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t), false);
} else {
try {
shutdownOutput(voidPromise(), t);//关闭outboundBuffer
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t), false);
}
}
} finally {
inFlush0 = false;
}
}
下面来具体分析一下NioSocketChannel的doWrite操作。可以看到每次doWrite最多会进行writeSpinCount写操作,每次写最多会写入maxBytesPerGatheringWrite 的数据,并且这个maxBytesPerGatheringWrite 是动态变化的,netty会根据每次想要写入maxBytesPerGatheringWrite 以及最终写入到SocketChannel的字节localWrittenBytes来重新计算maxBytesPerGatheringWrite 的大小,如果一个字节都没有写入到SocketChannel,则会通过incompleteWrite方法注册一个OP_WRITE到NioEventLoop,NioEventLoop会在这个SocketChannel能再次写入数据时再次调用flush0继续写数据到SocketChannel。如果经历writeSpinCount次循环还是没有写完所有数据,则利用incompleteWrite方法再次加入一个flush0的事件继续写入数据
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
clearOpWrite();
return;
}
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
//不能转换为ByteBuffer,采用一般的写逻辑
writeSpinCount -= doWrite0(in);
break;
case 1: {
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {//ch的写buffer已经满了,则注册OP_WRITE到NioEventLoop中等待可以再次写入是再写
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);//自适应maxBytesPerGatheringWrite
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
//没写完则清理掉OP_WRITE,不让NioEventLoop触发flush,并加入flush任务等下次再写
//写完了则加入OP_WRITE等NioEventLoop来写入
incompleteWrite(writeSpinCount < 0);
}
beginRead
beginRead的是channel的read方法走过DefaultChannelPipeline最终调用的方法,它主要做的一件事情就是将对应的readOps注册到NioEventLoop的Selector中,不过对于不同的Channel这个readOps是不同的,对于NioSocketChannel,其对应的readOps是OP_READ,而对于NioServerSocketChannel,其对应的readOps则是OP_ACCEPT。
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();//将readInterestOp注册到selector中
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
read
read操作不是ChannelOutBound方法,它也不是从ChannelPipeline走过socketC来的方法,它是有NioEventLoop在收到OP_READ的事件时会调用的方法,主要是SocketChannel中有可读的数据时,或者ServerSocketChannel受到了客户端的连接时会调用这个read方法,对于NioSocketChannel会读取SocketChannel中的数据最终会转换为ByteBuf通过fireChannelRead给用户去处理,而对于NioServerSocketChannel会将ServerSocketChanel接收到的SocketChannel包装成NioSocketChannel给用户去处理,一般情况下,利用ServerBootstrap来启动服务器时,ServerBootstrap会将这个NioSocketChannel直接注册到子NioEventLoopGroup中进行相关的处理,从而形成了Reactor的线程模型。
下面的代码是NIoSocketChannel 的对应的读操作,其主要是通过Handle这个分析器来分析每次需要读多少字节的数据,并读了一个ByteBuf再调用fireChannelRead给客户端去对这个ByteBuf进行操作。
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();//配置的ByteBufAllocator
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);//分配一个ByteBuf,这个ByteBuf的初始大小是通过历史的大小计算的
allocHandle.lastBytesRead(doReadBytes(byteBuf));//从javaChannel读取数据到ByteBuf中
if (allocHandle.lastBytesRead() <= 0) {//没有数据可读
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;//收到了EOF
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());//评估能否继续读
allocHandle.readComplete();
pipeline.fireChannelReadComplete();//读完成
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
下面是AbstractNioMessageUnsafe中的read方法,和上面的方法逻辑差不多,主要的区别是此处是doReadMessages,其获取的msg是NioSocketChannel。
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);//NioServerSocketChannel
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
close
close方法很多操作失败都可能会调用的方法,它主要是清理outboundBuffer,关闭对应的SocketChannel,并且会进行deregister操作解除绑定。
//close的方法来源一个是channel的close最终会调用到,一个是flush0的close最终会调用到
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) {
//可能因为当前的close事件而触发了其他的事件,而另外的事件而再次触发了这个close事件
//最终导致了递归调用,这里对这个递归调用进行处理
if (closeFuture.isDone()) {//当前closeFuture已经结束过
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) {
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//这里直接将outboundBuffer设置为空,不让再有消息写入buffer或者buffer写出消息
this.outboundBuffer = null;
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {//这里的closeExecutor可能不是eventLoop
@Override
public void run() {
try {
doClose0(promise);//这里表示close操作是在另外一个
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
doClose0(promise);//关闭channel
} finally {
if (outboundBuffer != null) {
outboundBuffer.failFlushed(cause, notify);//清理掉flushedEntry到unflushedEntry之间的消息
outboundBuffer.close(closeCause);//清理从unflushedEntry之后的消息
}
}
if (inFlush0) {//正在flush中表示这个close是因为flush0失败而导致的close,所以让其先走完原先的flush0的操作
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);//调用deregister从eventLoop中解绑
}
}
}
//这个方法可能会是多线程调用
private void doClose0(ChannelPromise promise) {
try {
doClose();//关闭对应的channel
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
最后
以上就是清秀自行车为你收集整理的netty源码阅读之十一(unsafe中的register,deregister,connect,bind等方法)的全部内容,希望文章能够帮你解决netty源码阅读之十一(unsafe中的register,deregister,connect,bind等方法)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复