概述
Server bind之后,就可以对外提供服务了。Netty使用了reactor模式来提升服务的并发处理能力。boss线程负责监听新的连接请求,当有新的连接进来时,将对应的channel指派一个worker线程来处理。Worker线程负责对该Channel的读写操作。
一.Boss线程
1.阻塞Select
for (;;) {
try {
// Boss线程专门负责监听新入连接,所以阻塞select
selector.select();
// 如果有新连接,先把key清掉
selector.selectedKeys().clear();
// 循环请求队列,处理连接
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
registerAcceptedChannel(acceptedSocket, currentThread);
}
......
}
2.注册新连接
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
......
//根据用户自定义的的PipelineFactory创建pipeline
ChannelPipeline pipeline =
channel.getConfig().getPipelineFactory().getPipeline();
//hash分配worker线程,默认使用递增循环worker数组方式
NioWorker worker = nextWorker();
//将新的连接注册到worker线程,让worker线程负责后续读写
//新的channel是主channel的子channel,而PipelineSink和主channel是同一个
worker.register(new NioAcceptedSocketChannel(
channel.getFactory(), pipeline, channel,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
......
}
void register(AbstractNioChannel<?> channel, ChannelFuture future) {
synchronized (startStopLock) {
......
//创建注册通道的任务
Runnable registerTask = createRegisterTask(channel, future);
//提交任务到阻塞队列
boolean offered = registerTaskQueue.offer(registerTask);
//唤醒selector
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
3.创建注册任务
protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
return new RegisterTask((NioSocketChannel) channel, future, server);
}
二.worker线程
worker线程负责对应channel的读写操作,一个worker对应一个selector,会同时处理多个channel的读写。
1.主循环
for (;;) {
wakenUp.set(false);
......
if (wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
cancelledKeys = 0;
//处理注册通道的任务
processRegisterTaskQueue();
//处理异步事件,比如writeComplete事件
processEventQueue();
//处理写数据任务,如果业务线程有异步写的时候,会有WriteTask放入队列
processWriteTaskQueue();
//处理IO准备好的那些channel
processSelectedKeys(selector.selectedKeys());
......
}
2.RegisterTask执行
public void run() {
......
//如果是server,则使用异步模式
if (server) {
channel.channel.configureBlocking(false);
}
//将新的channel注册到worker线程的selector上,默认监听READ事件
synchronized (channel.interestOpsLock) {
channel.channel.register(
selector, channel.getRawInterestOps(), channel);
}
......
//触发BOUND的upstream事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件
if (server || !((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
//触发CONNECTED的upsteam事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件
fireChannelConnected(channel, remoteAddress);
......
}
3.处理读写准备好的那些channel
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
try {
int readyOps = k.readyOps();
//如果某个channel写就位,则读数据
if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
if (!read(k)) {
// Connection already closed - no need to handle write.
continue;
}
}
//如果写就位,则写数据
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
}
......
}
4. 读取
//从channel中读取数据到内部的buffer,转换成内部的ChannelBuffer,触发messageReceived事件
protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel();
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
//预测下次读将读取的buffer大小,默认使用自适应的预测算法,如果上次读取把buffer读满,则增大该值,如果连续2次都没读满,则减小该值
//如果以上都不满足,则保持不变,默认长度1024
final ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
//默认BufferFactory为HeapChannelBufferFactory,默认使用Big Endian字节序
final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
int ret = 0;
int readBytes = 0;
boolean failure = true;
//从共享pool中拿配额,从channel中读取对应数据
ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
try {
while ((ret = ch.read(bb)) > 0) {
readBytes += ret;
if (!bb.hasRemaining()) {
break;
}
}
failure = false;
}
......
//有数据读入,则转换成自己的ChannelBuffer,并触发messageReceived事件,该事件将在用户自定义的Pipeline中执行
if (readBytes > 0) {
bb.flip();
//构造一个ChannelBuffer,默认使用堆内的数组实现
final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
//复制数据到channelBuffer
buffer.setBytes(0, bb);
//写游标
buffer.writerIndex(readBytes);
// 修改预测器的下次读取buffer大小
predictor.previousReceiveBufferSize(readBytes);
// 触发messageReceived事件
fireMessageReceived(channel, buffer);
}
......
}
5.EchoServerHandler接受消息
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
//通过channel将数据写回
e.getChannel().write(e.getMessage());
}
6.数据写回,write方法其实是触发一个Downsteam事件
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
channel.getPipeline().sendDownstream(
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
7.ChannelPipeline中的处理
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
//如果handler已经处理完了,则转发到ChannelSink处理,对于nioserver来说就是NioServerSocketPipelineSink
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
//否则,继续调用其他handler
sendDownstream(tail, e);
}
8.NioServerSocketPipelineSink中处理channel事件
else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e;
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
//先放入写任务队列
boolean offered = channel.writeBufferQueue.offer(event);
assert offered;
//最后还是要通过work来写回数据
channel.worker.writeFromUserCode(channel);
}
9.worker线程的处理
void writeFromUserCode(final AbstractNioChannel<?> channel) {
......
//如果业务方使用了业务线程异步写,则直接往worker线程的写队列扔一个WriteTask任务
if (scheduleWriteIfNecessary(channel)) {
return;
}
......
//如果业务方没有使用业务线程异步写,说明现在还在netty的Worker线程中,直接写
write0(channel);
}
10.Worker线程直接写
protected void write0(AbstractNioChannel<?> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
//循环写入,如果都写成功了,则将去掉该channel在selector中注册的WRITE事件监听
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf = null;
ChannelFuture future = null;
try {
if (evt == null) {
//从队列中拿需要写回的数据内容,如果没有了,则认为写成功了
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
future = evt.getFuture();
//将ChannelBuffer转换成ByteBuffer,此处使用PooledSendBuffer
channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
future = evt.getFuture();
buf = channel.currentWriteBuffer;
}
long localWrittenBytes = 0;
for (int i = writeSpinCount; i > 0; i --) {
//将Buffer里的数据写出,因为是异步channel,如果socket的write队列满,会导致写处返回0,则重试
localWrittenBytes = buf.transferTo(ch);
//有数据写出就返回,不管是否全部写出
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
if (buf.finished()) {
break;
}
}
//如果全部写出,则通知调用方
if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
evt = null;
buf = null;
future.setSuccess();
}
//如果还没写完,则需要让selector也关心这个channel的write事件,让write就位时,继续写
else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
......
}
}
......
}
channel.inWriteNowLoop = false;
//让selector监听write事件
if (addOpWrite) {
setOpWrite(channel);
}
//写成功后,把write监听去掉
else if (removeOpWrite) {
clearOpWrite(channel);
}
}
//如果worker线程直接写,直接触发writeComplete upstream事件,让handler处理
if (iothread) {
fireWriteComplete(channel, writtenBytes);
}
//如果是业务线程异步写,将通过worker线程的eventQueue实现异步延时触发writeComplete事件
else {
fireWriteCompleteLater(channel, writtenBytes);
}
}
11.Worker线程异步写,当业务方使用多线程处理时,写回的动作对worker来说是异步的
12.业务线程放入写任务队列
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = writeTaskQueue.offer(channel.writeTask);
assert offered;
}
13.worker线程执行写任务
private void processWriteTaskQueue() throws IOException {
for (;;) {
final Runnable task = writeTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
14.WriteTask执行
private final class WriteTask implements Runnable {
WriteTask() {
}
public void run() {
writeTaskInTaskQueue.set(false);
worker.writeFromTaskLoop(AbstractNioChannel.this);
}
}
15.worker线程执行数据写入
void writeFromSelectorLoop(final SelectionKey k) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
最后
以上就是多情大山为你收集整理的深入浅出Netty之三 Server请求处理的全部内容,希望文章能够帮你解决深入浅出Netty之三 Server请求处理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复