概述
MINA源码的SVN地址https://svn.apache.org/repos/asf/mina/mina/tags/2.0.7
MINA源码结构紧凑,各类分工明确,以处理Socket的相关类为例,主要有:NioSocketAcceptor、NioSocketConnector、NioSocketSession、NioProcessor
1、NiosocketAcceptor用户服务端初始化,监听服务端地址上来自客户端建立连接请求OP_ACCEPT。与之对应的是,客户端的NioSocketConnector,向服务端发起建立连接的请求后,监听通道上来自服务端完成建立连接的请求OP_CONNECT。
2、双方的请求建立完后,双方各初始化一个NioSocketSession,并与之前初始化完成的NioProcessor关联。然后启动NioProcessor父类AbstractPollingIoProcessor的内部类Processor。用于监听通道上的数据OP_READ。
3、当监听到数据到来后,先走一遍过滤器链,再进入处理模块(如服务端的handler)。NioSession主要处理数据的读写。
那么,在一个使用mina的系统中,NioSocketAcceptor对象只有一个,NioSocketConnector对象和NioSocketSession的数量一致,可以多个。NioProcessor对象的数量默认是CPU核数+1。
服务端简要的代码如下:(详见Mina2.0入门——例子)
package mina;
import java.net.InetSocketAddress;
import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class TestMinaServer {
private NioSocketAcceptor acceptor;
private IoFilter filter = new ProtocolCodecFilter(new TextLineCodecFactory());
private InetSocketAddress address = new InetSocketAddress("localhost",12340);
public void config() {
//创建NioSocketAcceptor对象
acceptor = new NioSocketAcceptor();
// 业务处理类
acceptor.setHandler(new TestMinaHandler());
// 编码过滤器
acceptor.getFilterChain().addLast("codec", filter);
acceptor.setDefaultLocalAddress(address);
try{
acceptor.bind();
System.out.println("bind:"+12340);
} catch(Exception e){
e.printStackTrace();
}
}
public static void main(String[] args){
TestMinaServer server = new TestMinaServer();
server.config();
}
}
1.acceptor = new NioSocketAcceptor();NioSocketAcceptor初始化时在其父类AbstractPollingIoAcceptor的构造方法中new了个SimpleIoProcessorPool,其字段IoProcessor<S>[] pool存放N个NioProcessor对象(N默认为CPU核数+1个)。
private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
boolean createdProcessor) {
super(sessionConfig, executor);
if (processor == null) {
throw new IllegalArgumentException("processor");
}
this.processor = processor;
this.createdProcessor = createdProcessor;
try {
// Initialize the selector
init();
// The selector is now ready, we can switch the
// flag to true so that incoming connection can be accepted
selectable = true;
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeIoException("Failed to initialize.", e);
} finally {
if (!selectable) {
try {
destroy();
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}
}
这个IoProcessor<S> processor字段就是SimpleIoProcessorPool,init()方法在其子类NioSocketAcceptor中实现,只为其字段selector初始化:selector = Selector.open();
2.acceptor.bind();
AbstractPollingIoAcceptor类的bindInternal()方法中。
startupAcceptor();
// As we just started the acceptor, we have to unblock the select()
// in order to process the bind request we just have added to the
// registerQueue.
try {
lock.acquire();
// Wait a bit to give a chance to the Acceptor thread to do the select()
Thread.sleep(10);
wakeup();
} finally {
lock.release();
}
startupAcceptor()往后会,运行其内部类Acceptor.run()。
while (selectable) {
try {
// Detect if we have some keys ready to be processed
// The select() will be woke up if some new connection
// have occurred, or if the selector has been explicitly
// woke up
int selected = select();
// this actually sets the selector to OP_ACCEPT,
// and binds to the port on which this class will
// listen on
nHandles += registerHandles();
// Now, if the number of registred handles is 0, we can
// quit the loop: we don't have any socket listening
// for incoming connection.
if (nHandles == 0) {
acceptorRef.set(null);
if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
assert (acceptorRef.get() != this);
break;
}
if (!acceptorRef.compareAndSet(null, this)) {
assert (acceptorRef.get() != this);
break;
}
assert (acceptorRef.get() == this);
}
if (selected > 0) {
// We have some connection request, let's process
// them here.
processHandles(selectedHandles());
}
// check to see if any cancellation request has been made.
nHandles -= unregisterHandles();
}
……
}
Acceptor.run()方法刚一进来就select()给阻塞了。不过Acceptor这是个多线程类。run线程阻塞了,AbstractPollingIoAcceptor.bindInternal()方法还是会接着执行的,
其后有wakeup()方法将Selector唤醒Acceptor.run()接着执行nHandles += registerHandles()。
AbstractPollingIoAcceptor.registerHandles()方法中会遍历、绑定监听所有设置的IP、端口。channel.register(selector, SelectionKey.OP_ACCEPT);熟悉吧。
protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
// Creates the listening ServerSocket
ServerSocketChannel channel = ServerSocketChannel.open();
boolean success = false;
try {
// This is a non blocking socket channel
channel.configureBlocking(false);
// Configure the server socket,
ServerSocket socket = channel.socket();
// Set the reuseAddress flag accordingly with the setting
socket.setReuseAddress(isReuseAddress());
// and bind.
socket.bind(localAddress, getBacklog());
// Register the channel within the selector for ACCEPT event
channel.register(selector, SelectionKey.OP_ACCEPT);
success = true;
} finally {
if (!success) {
close(channel);
}
}
return channel;
}
因为Selector.select()是被wakeup()的,所以Acceptor.run()中selected=0,并不会执行到processHandles(selectedHandles());
初始化后,在Acceptor.run()中的selector就真的阻塞着监听:服务端通道上的OP_ACCEPT请求了。
开头的的config方法里,往NioSocketAcceptor中设置的过滤器链和业务处理类。将在处理请求的时候调用到。
最后
以上就是背后口红为你收集整理的[MINA2.0源码](一)服务端建立监听——NioSocketAccepter的全部内容,希望文章能够帮你解决[MINA2.0源码](一)服务端建立监听——NioSocketAccepter所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复