我是靠谱客的博主 体贴楼房,最近开发中收集的这篇文章主要介绍netty连接nbiot_Netty源码分析之处理新连接,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Netty服务端处理新连接的流程:

1.检测新连接

2.基于NioServerSocketChannel创建客户端的NioSocketChannel

3.分配客户端channel的线程,注册线程所对应的selector

4.向selector注册读事件

新连接检测

服务端在创建完服务端的NioServerSocketChannel之后,绑定完端口号之后,会注册accept事件。当有新连接进入的时候,会触发accpet事件。之前博客有分析过EventLoop的thread的run方法会循环select检测是否有新的IO事件,如果检测到有IO事件,就通过processSelectedKey来处理对应的IO事件,这里的IO事件是accept,就会调用channel内部聚合的UnSafe类的read()方法。

这里循环调用doReadMessage()方法的条件是是否自动读,读取的连接数是否小于最大连接数,服务端channel默认一次最多读取16个新连接。

当没有超过最大连接数,并且是自动读的状态时候,就会循环调用doReadMessage,直到没有读到新连接,跳出while循环,

public void read() {

assert AbstractNioMessageChannel.this.eventLoop().inEventLoop();

ChannelConfig config = AbstractNioMessageChannel.this.config();

if(!config.isAutoRead() && !AbstractNioMessageChannel.this.isReadPending()) {

this.removeReadOp();

} else {

int maxMessagesPerRead = config.getMaxMessagesPerRead();

ChannelPipeline pipeline = AbstractNioMessageChannel.this.pipeline();

boolean closed = false;

Throwable exception = null;

try {

int size;

try {

do {

size = AbstractNioMessageChannel.this.doReadMessages(this.readBuf);

if(size == 0) {

break;

}

if(size < 0) {

closed = true;

break;

}

} while(config.isAutoRead() && this.readBuf.size() < maxMessagesPerRead);

} catch (Throwable var11) {

exception = var11;

}

AbstractNioMessageChannel.this.setReadPending(false);

size = this.readBuf.size();

int i = 0;

while(true) {

if(i >= size) {

this.readBuf.clear();

pipeline.fireChannelReadComplete();

if(exception != null) {

if(exception instanceof IOException && !(exception instanceof PortUnreachableException)) {

closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);

}

pipeline.fireExceptionCaught(exception);

}

if(closed && AbstractNioMessageChannel.this.isOpen()) {

this.close(this.voidPromise());

}

break;

}

pipeline.fireChannelRead(this.readBuf.get(i));

++i;

}

} finally {

if(!config.isAutoRead() && !AbstractNioMessageChannel.this.isReadPending()) {

this.removeReadOp();

}

}

}

}

创建NioSocketChannel

这里read()方法是通过循环调用NioServerSocket的doReadMessage(byteBuf)方法进行实现channel的读取新连接。而doReadMessage是通过java nio的channel的accept获取当前新连接的channel,这里获取的channel也是java nio中的channel,然后将这个channel封装成NioSocketChannel,将NioServerSocketChannel和javaChannel都作为参数构造NioSocketChannel,放到buf中去,返回1,表示已经读取一条连接。

protected int doReadMessages(List buf) throws Exception {

SocketChannel ch = this.javaChannel().accept();

try {

if(ch != null) {

buf.add(new NioSocketChannel(this, ch));

return 1;

}

} catch (Throwable var6) {

logger.warn("Failed to create a new channel from an accepted socket.", var6);

try {

ch.close();

} catch (Throwable var5) {

logger.warn("Failed to close a socket.", var5);

}

}

return 0;

}

NioSocketChannel的构造函数。

//配置Config类

public NioSocketChannel(Channel parent, java.nio.channels.SocketChannel socket) {

super(parent, socket);

this.config = new NioSocketChannel.NioSocketChannelConfig(this, socket.socket(), null);

}

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {

super(parent, ch, 1);

}

//保存channel感兴趣的读事件,并将channel设置为非阻塞的

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {

super(parent);

this.ch = ch;

this.readInterestOp = readInterestOp;

try {

ch.configureBlocking(false);

} catch (IOException var7) {

try {

ch.close();

} catch (IOException var6) {

if(logger.isWarnEnabled()) {

logger.warn("Failed to close a partially initialized socket.", var6);

}

}

throw new ChannelException("Failed to enter non-blocking mode.", var7);

}

}

这里配置channel的Config类使用了setTcpNoDelay(true),这里禁止了Nagle算法,Nagle算法的目的是让小的数据包尽量集合成大的数据包发送出去,Netty为了使数据能够及时发出去,禁止了Nagle算法。

public DefaultSocketChannelConfig(io.netty.channel.socket.SocketChannel channel, Socket javaSocket) {

super(channel);

if(javaSocket == null) {

throw new NullPointerException("javaSocket");

} else {

this.javaSocket = javaSocket;

if(PlatformDependent.canEnableTcpNoDelayByDefault()) {

try {

this.setTcpNoDelay(true);

} catch (Exception var4) {

;

}

}

}

}

public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {

try {

this.javaSocket.setTcpNoDelay(tcpNoDelay);

return this;

} catch (SocketException var3) {

throw new ChannelException(var3);

}

}

新连接NioEventLoop的分配和selector的注册

在读取完新连接之后,会调用fireChannelRead方法,而服务端的NioServerSocketChannel在初始化阶段,在上面的pipeline添加了连接处理器ServerBootstrap.ServerBootstrapAcceptor,read事件会从head传送到serverBootstrapAcceptor,serverBootstrapAcceptor也是一个ChannelHandler,它会对新连接进行处理。

处理流程:

1.设置客户端channel的childHandler

添加channelHandler,这里的channelHandler一般是一个ChannelInitializer,他可以获取channel的pipeline,并且在上面添加一系列的Handler,最后再将ChannelInitializer这个Handler删除。

2.设置options和attrs

options是底层tcp读写的相关参数,attrs可以在客户端channel上面绑定一些属性。这里的options和attrs都是用户通过代码设置的。比如

bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true)

设置的这些都会保存到ServerBootstrap这个类,然后在initChannel的时候会将这些参数都传入,构造一个ServerBootstrapAcceptor,这样当连接器接受到新的连接之后,新建子channel,就会带有这些属性。

3.选择NioEventLoop,并且注册selector

public void channelRead(ChannelHandlerContext ctx, Object msg) {

final Channel child = (Channel)msg;

//添加ChannelHandler

child.pipeline().addLast(new ChannelHandler[]{this.childHandler});

Map.Entry[] t = this.childOptions;

int len$ = t.length;

int i$;

Map.Entry e;

for(i$ = 0; i$ < len$; ++i$) {

e = t[i$];

try {

if(!child.config().setOption((ChannelOption)e.getKey(), e.getValue())) {

ServerBootstrap.logger.warn("Unknown channel option: " + e);

}

} catch (Throwable var10) {

ServerBootstrap.logger.warn("Failed to set a channel option: " + child, var10);

}

}

t = this.childAttrs;

len$ = t.length;

for(i$ = 0; i$ < len$; ++i$) {

e = t[i$];

child.attr((AttributeKey)e.getKey()).set(e.getValue());

}

try {

this.childGroup.register(child).addListener(new ChannelFutureListener() {

public void operationComplete(ChannelFuture future) throws Exception {

if(!future.isSuccess()) {

ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause());

}

}

});

} catch (Throwable var9) {

forceClose(child, var9);

}

}

这里注册是使用用户传进来的workerGroup线程池,使用register方法完成注册。

public ChannelFuture register(Channel channel) {

return this.next().register(channel);

}

这里的next()函数返回一个NioEventLoop,相当于从线程池里面挑选一个线程与这个channel进行绑定。最后通过层层调用,还是调用了java nio中channel的register方法,这时注册的时候,不关心任何事件。

public ChannelFuture register(Channel channel, ChannelPromise promise) {

if(channel == null) {

throw new NullPointerException("channel");

} else if(promise == null) {

throw new NullPointerException("promise");

} else {

channel.unsafe().register(this, promise);

return promise;

}

}

//AbstractChannel

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

if(eventLoop == null) {

throw new NullPointerException("eventLoop");

} else if(AbstractChannel.this.isRegistered()) {

promise.setFailure(new IllegalStateException("registered to an event loop already"));

} else if(!AbstractChannel.this.isCompatible(eventLoop)) {

promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));

} else {

AbstractChannel.this.eventLoop = eventLoop;

if(eventLoop.inEventLoop()) {

this.register0(promise);

} else {

try {

eventLoop.execute(new OneTimeTask() {

public void run() {

AbstractUnsafe.this.register0(promise);

}

});

} catch (Throwable var4) {

AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);

this.closeForcibly();

AbstractChannel.this.closeFuture.setClosed();

this.safeSetFailure(promise, var4);

}

}

}

}

private void register0(ChannelPromise promise) {

try {

if(!promise.setUncancellable() || !this.ensureOpen(promise)) {

return;

}

AbstractChannel.this.doRegister();

AbstractChannel.this.registered = true;

this.safeSetSuccess(promise);

AbstractChannel.this.pipeline.fireChannelRegistered();

if(AbstractChannel.this.isActive()) {

AbstractChannel.this.pipeline.fireChannelActive();

}

} catch (Throwable var3) {

this.closeForcibly();

AbstractChannel.this.closeFuture.setClosed();

this.safeSetFailure(promise, var3);

}

}

protected void doRegister() throws Exception {

boolean selected = false;

while(true) {

try {

this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);

return;

} catch (CancelledKeyException var3) {

if(selected) {

throw var3;

}

this.eventLoop().selectNow();

selected = true;

}

}

}

NioSocketChannel读事件的注册

通过传播channelActive方法,最终会调用channel的read()方法,channel在创建的时候都是默认自动读的。

public ChannelPipeline fireChannelActive() {

this.head.fireChannelActive();

if(this.channel.config().isAutoRead()) {

this.channel.read();

}

return this;

}

会将channel的Active状态在pipeline上面传播,调用read方法,最后会调用doBeginRead,去注册感兴趣的事件,NioSocketChannel感兴趣的事件是读事件,而NioServerSocketChannel感兴趣的事件则是Accept事件。

public ChannelHandlerContext read() {

final AbstractChannelHandlerContext next = this.findContextOutbound();

EventExecutor executor = next.executor();

if(executor.inEventLoop()) {

next.invokeRead();

} else {

Runnable task = next.invokeReadTask;

if(task == null) {

next.invokeReadTask = task = new Runnable() {

public void run() {

next.invokeRead();

}

};

}

executor.execute(task);

}

return this;

}

protected void doBeginRead() throws Exception {

if(!this.inputShutdown) {

SelectionKey selectionKey = this.selectionKey;

if(selectionKey.isValid()) {

this.readPending = true;

int interestOps = selectionKey.interestOps();

if((interestOps & this.readInterestOp) == 0) {

selectionKey.interestOps(interestOps | this.readInterestOp);

}

}

}

}

这里要记住的是当channel注册到一个eventloop上的selector上面的时候,这时并没有绑定任何感兴趣的事件,仅仅完成了register,会调用fireChannelRegister()函数通知pipeline注册完成。但是当channel绑定到ip,端口后会调用fireChannelActive()函数,channel会调用read()函数,此时会将channel中感兴趣的事件注册到selector上面。而每一个eventloop对应线程的启动,都是在第一次调动该eventloop.execute()方法时进行启动的。

最后

以上就是体贴楼房为你收集整理的netty连接nbiot_Netty源码分析之处理新连接的全部内容,希望文章能够帮你解决netty连接nbiot_Netty源码分析之处理新连接所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部