概述
前言:
前一篇中介绍了NioSocketChannel连接远程Server(connect),并接收信息(read)事件的处理过程。
那么NioSocketChannel的主要功能中,还剩下写出(write)事件的处理了。我们一起来了解下该功能的实现。
1.write方法的分类
write方法的入口在哪里呢?我们在之前Bootstrap的示例中有以下代码
Channel ch = b.connect(HOST, PORT).sync().channel();
// 控制台输入
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (; ; ) {
String line = in.readLine();
if (line == null) {
continue;
}
// 这里将msg写出
ch.writeAndFlush(line + "rn");
}
可以看到,是通过ChannelOutboundInvoker的相关write方法写出的。那ChannelOutboundInvoker还提供了哪些方法呢?我们来看下其API
public interface ChannelOutboundInvoker {
/**
* Request to write a message via this {@link ChannelHandlerContext} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
*/
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
/**
* Request to flush all pending messages via this ChannelOutboundInvoker.
*/
ChannelOutboundInvoker flush();
/**
* Shortcut for call {@link #write(Object, ChannelPromise)} and {@link #flush()}.
*/
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
}
提供了三类方法:
write:只是将数据写入到内存队列中,
flush:将内存队列中的数据发送到对端(此时才是真正将数据写出)
writeAndFlush:将数据写入到内存队列后,立即刷新,将数据发送到对端
在分析write方法时,我们按照其方法调用顺序来介绍下,这一点不同于read相关方法的分析,write方法相对而言更复杂些
2.NioSocketChannel.writeAndFlush()
// 直接调用了AbstractChannel的方法
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
public ChannelFuture writeAndFlush(Object msg) {
// 交由ChannelPipeline来执行
return pipeline.writeAndFlush(msg);
}
}
2.1 ChannelPipeline.writeAndFlush()
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
}
// TailHeader.writeAndFlush()
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
...
// 获取下一个合适的ChannelHandler,交由其来处理
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 交由下一个Handler执行
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
// writeAndFlush
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
// 当前Handler状态正常,则直接调用其write和flush方法
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
// 执行当前Handler的write方法
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
// 执行当前Handler的flush方法
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
}
我们知道,针对write相关事件执行,ChannelHandler的执行顺序为Tail-->Handler... -->Head
所以,在执行完成中间的一系列ChannelHandler的write方法之后,最终会到达HeadContext
2.2 HeadContext.write
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
// 写出
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// 直接交由NioSocketChannelUnsafe来执行
unsafe.write(msg, promise);
}
// 刷新
public void flush(ChannelHandlerContext ctx) {
// 直接交由NioSocketChannelUnsafe来执行
unsafe.flush();
}
}
2.3 NioSocketChannelUnsafe.write & NioSocketChannelUnsafe.flush
// 交由AbstractUnsafe.write方法来自执行
protected abstract class AbstractUnsafe implements Unsafe {
// 2.3.1 写出
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
// ChannelOutboundBuffer作为一个ByteBuf的内部缓存队列
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// 对msg信息进行过滤,AbstractNioByteChannel对filterOutboundMessage有重写
msg = filterOutboundMessage(msg);
// 计算msg信息长度
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
...
}
// 将信息添加到缓存队列中
outboundBuffer.addMessage(msg, size, promise);
}
// 2.3.2 将队列中的数据刷新到对端
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 添加一个刷新标志位
outboundBuffer.addFlush();
flush0();
}
protected void flush0() {
...
...
try {
// 交由子类实现,默认是AbstractNioByteChannel来实现
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
}
// AbstractNioByteChannel.filterOutboundMessage()
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
// 非direct类型,需要转换为direct类型
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
}
2.4 AbstractNioByteChannel.doWrite()刷新方法
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
// 获取缓存队列中的当前msg
Object msg = in.current();
if (msg == null) {
clearOpWrite();
return;
}
// 调用doWriteInternal执行
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
// doWriteInternal
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
...
// 交由doWriteBytes执行,在NioSocketChannel中实现了这个方法
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
..
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
}
2.5 NioSocketChannel.doWriteBytes()
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
// 通过直接调用ByteBuf.readBytes方法,将数据写出到channel中
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
}
一路走到现在,我们发现了write相关方法只是将ByteBuf信息暂存在outboundBuffer队列中;flush方法才是真正的将outboundBuffer中的消息发送到对端。
注意:有关于ChannelOutboundBuffer的相关知识点,本文中不再多介绍,后续会专门有文章来介绍。
3.总结
Netty真正的写出分为两步,write和flush。同样,通过一个时序图来展示下全过程
最后
以上就是柔弱蜡烛为你收集整理的Netty源码解析-NioSocketChannel之write的全部内容,希望文章能够帮你解决Netty源码解析-NioSocketChannel之write所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复