概述
Channel是netty网络操作抽象类,包括网络的读,写,链路关闭,发起连接等。我们拿出NioServerSocketChannel来进行分析,NioServerSocketChannel的类图如下所示:
Netty使用了聚合的方使来实现channel的功能,先看看看AbstractChannel 都聚合了那些功能,源码如下,关键代码会加上相应的注释:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
//这是channel的父channel:所谓的父channel是指处理读写的channel有个连接的父channel
private final Channel parent;
// id表示channel的唯一标识
private final ChannelId id;
//unsafe类里实现了具体的连接与写数据,之所以命名为unsafe是不希望外部使用,并非是不安全的
private final Unsafe unsafe;
//管理channelHandler的pipeline。
private final DefaultChannelPipeline pipeline;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
//地址信息
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
//eventLoop就是react线程对象啦
private volatile EventLoop eventLoop;
private volatile boolean registered;
private boolean closeInitiated;
/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
//可以看出下面的方法都会触发pipeline的链路调用
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture disconnect() {
return pipeline.disconnect();
}
@Override
public ChannelFuture close() {
return pipeline.close();
}
@Override
public Channel read() {
pipeline.read();
return this;
}
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return pipeline.write(msg, promise);
}
}
AbstractNioChannel 集成了 JDK内部的channel, SelectionKey。 这个类里实现了将Channel注册到对应的Selector上。源码如下:
public abstract class AbstractNioChannel extends AbstractChannel {
//jdk内部的 channel对象
private final SelectableChannel ch;
//设置读操作位,
protected final int readInterestOp;
// 这里定义了selectionKey,里面包含了发生了的事件
volatile SelectionKey selectionKey;
// promise相关
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
//请求的地址信息
private SocketAddress requestedRemoteAddress;
//构造方法,需要传入下面几个参数
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//通过SelectionKey 设置注册读事件
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
//将channel注册到 Selector上
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
}
NioServerSocketChannel 提供了主要功能是绑定某个端口与接收client端的连接,关键源码如下:
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
//这个方法属属于NioServerSocketChannel 类方法 调用provider生成jdk底层的ServerSocketChannel 对象
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
*
Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
*
{@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
*
See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
//执行bind操作
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
//javaChannel()方法返回的实际上就是上一个方法生成的ServerSocketChannel 对象
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
//接收客户端的请求会调用到的方法
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//调用java底层的accept方法
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
//可以看出这里新建了个NioSocketChannel对象,并把当前对象 当成新创建对象的父对象
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
//将channel注册到Selector上
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
}
从上面源码上分析Netty在设计Channel这个对象的时候主要做了如下几个功能:
- 对JDK自带Channel进行了包装, 内部持有JDK Channel的引用
- Channel对象作为中间层,读写操作会触发pipeline调用链。内部通过unsafe对象实现读写的调用。
Unsafe对象
Unsafe对象作为channel的内部类,承担着channel网络相关的功能,比如具体的读写操作。其中AbstractUnsafe是AbstractChannel的内部类,部分源码如下:
protected abstract class AbstractUnsafe implements Unsafe {
//下面是绑定方法的逻辑 传入的是SocketAddress
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 确认当前channel已经注册
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
//验证传入的参数
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
//具体的绑定操作在doBind方法里执行 这个方法是channel的方法,也就是我们在上面NioServerSocketChannel 里分析的逻辑
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//触发 active事件,在pipline链里传播
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
//往外写数据的操作(这里只往缓存里写数据)
@Override
public final void write(Object msg, ChannelPromise promise) {
//验证是否已经注册并且react线程是否已经准备好
assertEventLoop();
//ChannelOutboundBuffer
表示要往外写数据的缓存
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
//对需要写的数据进行过滤
msg = filterOutboundMessage(msg);
//对需要写的数据进行大小的预估
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//将数据增加到缓存中
outboundBuffer.addMessage(msg, size, promise);
}
// flush方法用于将数据写入到网络中
@Override
public final void flush() {
assertEventLoop();
//往外发送的缓存对象不能为空
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
//最终会调用到这个方法
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
//对channel的状态进行验证
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
//会调用到Channel的doWrite方法,具体实现的源码可以看NioSocketChannel
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
//开始读方法的逻辑
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
//会调用channel的doBeginRead方法 可以看上面AbstractNioChannel方法里的注释
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
//注册方法,channel会往selector里注册关注的事件
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//验证数据
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
//调用下面的方法
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//会调用到channel里的方法 可以参考上面的AbstractNioChannel类
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
Netty服务端接收客户端数据的调用流程
读数据的调用流程
NioEventLoop的run方法会监控SelectionKey对象,当有读事件时,会调用unsafe对象的read()方法,在read方法的逻辑里会触发pipeline对象链的调用,最终调用到设置的各种ChannelHandler
写数据的调用流程
通过Channel的writeAndFlush会调用到pipeline的writeAndFlush方法里,在pipeline的调用链里会调用链中的各种ChannelHandler(各以对需要写入的数据进行格式转换)最终通过HeadContext的write方法调用到unsafe里的write逻辑。这里只是把数据写入到ByteBuffer里。通过调用unsafe的flash方法才能最终将数据写入到网络中,也就是上面的分析过程。
最后
以上就是诚心星月为你收集整理的Netty中Channel与Unsafe源码解读的全部内容,希望文章能够帮你解决Netty中Channel与Unsafe源码解读所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复