我是靠谱客的博主 柔弱蜡烛,最近开发中收集的这篇文章主要介绍Netty源码解析-NioSocketChannel之write,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言:

    前一篇中介绍了NioSocketChannel连接远程Server(connect),并接收信息(read)事件的处理过程。

    那么NioSocketChannel的主要功能中,还剩下写出(write)事件的处理了。我们一起来了解下该功能的实现。

1.write方法的分类

    write方法的入口在哪里呢?我们在之前Bootstrap的示例中有以下代码

Channel ch = b.connect(HOST, PORT).sync().channel();
// 控制台输入
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
    String line = in.readLine();
    if (line == null) {
        continue;
    }
    // 这里将msg写出
    ch.writeAndFlush(line + "rn");
}

可以看到,是通过ChannelOutboundInvoker的相关write方法写出的。那ChannelOutboundInvoker还提供了哪些方法呢?我们来看下其API

public interface ChannelOutboundInvoker {
    /**
     * Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
     * This method will not request to actual flush, so be sure to call {@link #flush()}
     * once you want to request to flush all pending data to the actual transport.
     */
    ChannelFuture write(Object msg);
    ChannelFuture write(Object msg, ChannelPromise promise);
    /**
     * Request to flush all pending messages via this ChannelOutboundInvoker.
     */
    ChannelOutboundInvoker flush();
    /**
     * Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.
     */
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);
}

提供了三类方法:

write:只是将数据写入到内存队列中,

flush:将内存队列中的数据发送到对端(此时才是真正将数据写出)

writeAndFlush:将数据写入到内存队列后,立即刷新,将数据发送到对端

在分析write方法时,我们按照其方法调用顺序来介绍下,这一点不同于read相关方法的分析,write方法相对而言更复杂些

2.NioSocketChannel.writeAndFlush()

// 直接调用了AbstractChannel的方法
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
	public ChannelFuture writeAndFlush(Object msg) {
        // 交由ChannelPipeline来执行
        return pipeline.writeAndFlush(msg);
    }
}

2.1 ChannelPipeline.writeAndFlush()

public class DefaultChannelPipeline implements ChannelPipeline {
	public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }
}
// TailHeader.writeAndFlush()
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        write(msg, true, promise);
        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        ...

        // 获取下一个合适的ChannelHandler,交由其来处理
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 交由下一个Handler执行
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }
    }
    
    // writeAndFlush
    void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        // 当前Handler状态正常,则直接调用其write和flush方法
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }
    
    // 执行当前Handler的write方法
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
    // 执行当前Handler的flush方法
    private void invokeFlush0() {
        try {
            ((ChannelOutboundHandler) handler()).flush(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    }
}

我们知道,针对write相关事件执行,ChannelHandler的执行顺序为Tail-->Handler... -->Head

所以,在执行完成中间的一系列ChannelHandler的write方法之后,最终会到达HeadContext

2.2 HeadContext.write

final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
 
    // 写出
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        // 直接交由NioSocketChannelUnsafe来执行
        unsafe.write(msg, promise);
    }
    // 刷新
    public void flush(ChannelHandlerContext ctx) {
        // 直接交由NioSocketChannelUnsafe来执行
        unsafe.flush();
    }
}

2.3 NioSocketChannelUnsafe.write & NioSocketChannelUnsafe.flush

// 交由AbstractUnsafe.write方法来自执行
protected abstract class AbstractUnsafe implements Unsafe {
 
    // 2.3.1 写出
    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();

        // ChannelOutboundBuffer作为一个ByteBuf的内部缓存队列
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
            ReferenceCountUtil.release(msg);
            return;
        }

        int size;
        try {
            // 对msg信息进行过滤,AbstractNioByteChannel对filterOutboundMessage有重写
            msg = filterOutboundMessage(msg);
            // 计算msg信息长度
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            ...
        }

        // 将信息添加到缓存队列中    
        outboundBuffer.addMessage(msg, size, promise);
    }
    
    // 2.3.2 将队列中的数据刷新到对端
    public final void flush() {
        assertEventLoop();

        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }

        // 添加一个刷新标志位
        outboundBuffer.addFlush();
        flush0();
    }
        
    protected void flush0() {
        ...

        ...
        try {
            // 交由子类实现,默认是AbstractNioByteChannel来实现
            doWrite(outboundBuffer);
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                initialCloseCause = t;
                close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
            } else {
                try {
                    shutdownOutput(voidPromise(), t);
                } catch (Throwable t2) {
                    initialCloseCause = t;
                    close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
                }
            }
        } finally {
            inFlush0 = false;
        }
    }
        
    // AbstractNioByteChannel.filterOutboundMessage()
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }
            // 非direct类型,需要转换为direct类型
            return newDirectBuffer(buf);
        }
        if (msg instanceof FileRegion) {
            return msg;
        }
        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }
}

2.4 AbstractNioByteChannel.doWrite()刷新方法

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
	protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        int writeSpinCount = config().getWriteSpinCount();
        do {
            // 获取缓存队列中的当前msg
            Object msg = in.current();
            if (msg == null) {
                clearOpWrite();
                return;
            }
            // 调用doWriteInternal执行
            writeSpinCount -= doWriteInternal(in, msg);
        } while (writeSpinCount > 0);

        incompleteWrite(writeSpinCount < 0);
    }

	// doWriteInternal
	private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            ...
			// 交由doWriteBytes执行,在NioSocketChannel中实现了这个方法
            final int localFlushedAmount = doWriteBytes(buf);
            if (localFlushedAmount > 0) {
                in.progress(localFlushedAmount);
                if (!buf.isReadable()) {
                    in.remove();
                }
                return 1;
            }
        } else if (msg instanceof FileRegion) {
            ..
        } else {
            // Should not reach here.
            throw new Error();
        }
        return WRITE_STATUS_SNDBUF_FULL;
    }
}

2.5 NioSocketChannel.doWriteBytes()

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
 
    protected int doWriteBytes(ByteBuf buf) throws Exception {
        final int expectedWrittenBytes = buf.readableBytes();
        // 通过直接调用ByteBuf.readBytes方法,将数据写出到channel中
        return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }
}

一路走到现在,我们发现了write相关方法只是将ByteBuf信息暂存在outboundBuffer队列中;flush方法才是真正的将outboundBuffer中的消息发送到对端。

注意:有关于ChannelOutboundBuffer的相关知识点,本文中不再多介绍,后续会专门有文章来介绍。

3.总结

    Netty真正的写出分为两步,write和flush。同样,通过一个时序图来展示下全过程

 

最后

以上就是柔弱蜡烛为你收集整理的Netty源码解析-NioSocketChannel之write的全部内容,希望文章能够帮你解决Netty源码解析-NioSocketChannel之write所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(68)

评论列表共有 0 条评论

立即
投稿
返回
顶部