概述
什么是IO
-
对数据(Socket、file)的读写,对输入Input和输出Output的处理
-
用户程序进行IO的读写,用到read&write两大系统调用
-
- read系统调用:把数据从内核缓冲区复制到进程缓冲区,并不是直接从物理设备读取数据到内存
- write系统调用:把数据从进程缓冲区复制到内核缓冲区,并不是把数据直接写入到物理设备
-
缓冲区
-
- 目的:为了减少频繁的系统IO调用。
- 系统使用read函数把数据从内核缓冲区复制到进程缓冲区,write函数把数据从进程缓冲区复制到内核缓冲区。等待缓冲区达到限制时,再进行IO的调用。
-
IO读写流程
IO模型
同步与异步
- 同步IO:用户程序主动发起请求,内核空间是接收请求方
- 异步IO:内核主动发起请求,用户程序是接收请求方
阻塞与非阻塞
- 阻塞IO:需要内核IO操作完成后,回到用户空间继续执行用户的操作
- 非阻塞IO:用户程序不需要等待内核IO操作完成后,内核会直接返回给用户程序一个状态值,可以立即返回用户空间执行用户的操作
同步阻塞IO(Blocking IO)
- 用户程序从IO系统调用开始,一直到系统调用返回,这段时间是阻塞的
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
public class TestSocket {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(8090);
System.out.println("Server start at port 8090 ...");
while (true) {
Socket client = server.accept();
System.out.println("client connect at port: " + client.getPort());
new Thread(new Runnable() {
Socket c;
public Runnable setClient(Socket c) {
c = c;
return this;
}
public void run() {
try {
InputStream in = c.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
while (true) {
reader.readLine();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}.setClient(client))
.start();
}
}
}
启动服务端
- 此时文件目录
- 查看主线程信息
启动客户端
- 此时服务端信息
- 此时文件目录
- 查看主线程信息
- 查看客户端连接信息
客户端发送消息
- 此时客户端连接信息
BIO流程
优劣
- 优势:程序简单,每连接每线程
- 劣势:内存、线程切换开销大
同步非阻塞IO(NonBlocking IO)
-
Java中NIO是New IO;OS(Linux)中NIO是NonBlocking IO
-
需要内核支持非阻塞
-
系统调用非阻塞IO会出现两种情况
-
- 在内核缓冲区没有数据的情况下,系统调用立即返回-1
- 在内核缓冲区有数据的情况下,数据从内核缓冲区复制到用户进程缓冲,复制完成之后,系统调用返回,用户线程开始处理用户空间的数据。
1)NIO 用户空间轮询
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
public class SocketNIO {
public static void main(String[] args) throws Exception {
LinkedList<SocketChannel> clients = new LinkedList<>();
ServerSocketChannel ss = ServerSocketChannel.open();
ss.bind(new InetSocketAddress(8091));
ss.configureBlocking(false);// OS NONBLOCKING
while (true) {
Thread.sleep(1000);
// 接收客户端连接
SocketChannel client = ss.accept();
// accept 调用内核
// 若没有客户端连接进来=>BIO中一直阻塞着;NIO不阻塞,返回-1,此时Java中返回null
// 若有客户端连接,则返回客户端的文件描述符FD
// NONBLOCKING就是代码能继续执行,不过会有不同的状态返回
if (client == null) {
System.out.println("no client access... ");
} else {
client.configureBlocking(false);
int port = client.socket().getPort();
System.out.println("client connect at port: " + port);
clients.add(client);
}
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
// 遍历已经连接的客户端能不能读写数据
for (SocketChannel c : clients) {
// read不会阻塞
int num = c.read(buffer);// num返回值:>0 or -1 or 0
if (num > 0) {
buffer.flip();
byte[] dataBytes = new byte[buffer.limit()];
buffer.get(dataBytes);
String data = new String(dataBytes);
System.out.println(c.socket().getPort() + ": " + data);
buffer.clear();
}
}
}
}
}
启动服务端
- 当前目录信息
- 查看主线程信息
启动客户端
- 此时服务端信息
- 此时主线程信息
客户端发送消息
- 此时服务端信息
- 此时主线程信息
流程
优劣
- 优势:规避多线程问题,每次发起IO系统调用,内核的等待数据的过程中可以立即返回;用户线程不阻塞,实时性好
- 劣势:C10K问题,每循环一次,必须向系统发送客户端连接数的系统调用,将占用大量的CPU时间,系统资源利用率低
2)IO多路复用模型(IO Multiplexing)
- 通过一种系统调用,一个进程可以监视多个文件描述符,一旦某个描述符就绪(内核缓冲区可读/写),内核通知程序进行相应的read/write系统调用
- 支持多路复用的系统调用:select、poll、epoll等
select
-
在内核中开辟一块数组空间用来存储用户空间传递的文件描述符进行监控,监控由内核负责;用户程序只需要传递文件描述符,在内核空间进行检测(循环遍历),当有了就绪事件就返回用户空间进行处理。
-
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
-nfds:监控的描述符中最大描述符+1
-fd_set:描述符集合(实际上是一个位图),位图的大小取决于FD_SETSIZE=1024,所以我们最多监控的文件描述符最大只能是1024;查看路径:/usr/include/linux/posix_types.h
-readfds、writefds、exceptfds:分别是读就绪事件、写就绪事件、异常就绪事件的集合。返回之前会将没有就绪的描述符清理掉。
void FD_ZERO(fd_set *set); 清空fdset与所有文件句柄的联系。
void FD_SET(int fd, fd_set *set); 建立文件句柄与fdset的联系。
void FD_CLR(int fd, fd_set *set); 清除文件句柄与fdset的联系。
int FD_ISSET(int fd, fd_set *set); 检查fdset联系的文件句柄fd是否可读写,>0表示可读写。
-timeout:阻塞等待超时时间
调用select函数后会阻塞,直到有描述符就绪(有数据可读、可写或有异常),或超时(timeout=0,select立即返回;timeout=NULL,select一直阻塞)返回。
当select函数返回后,可遍历fdset找到就绪的描述符。 -
流程
-
- 定义一个文件描述符集合fd_set(位图,最大值1024)
- 将集合拷贝到内核中进行监控:对所有的描述符轮询
- 当有描述符就绪时返回,在返回前把未就绪的描述符剔除掉
- 用户程序对所有描述符进行遍历,匹配上集合中的文件描述符就是已就绪的
- 重新将fd_set拷贝到内核中进行监控
-
优劣
-
- 优势:遵循POSIX标准,可以跨平台,基本所有的系统都支持
- 劣势:所监控的文件描述符有上限,默认1024,根据FD_SETSIZE而定;在内核中轮询遍历状态,因此随着文件描述符的增多而性能下降;每次都会修改监控集合中的值。
poll
-
与select类似,都需要在内核中开辟一个空间,poll监控的是事件结构化的事件集合。
-
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
pollfd结构包含了要监控的事件(events)和就绪的事件(revents)。
与select函数一样,poll返回后需要轮询fds来获取就绪的描述符 -
流程
-
- 用户定义事件数组,对描述符添加关心的事件
- 监控也是将数据拷贝到内核中,然后轮询遍历
- 用户程序根据返回的revents判断哪一个事件就绪
- 不会告诉用户程序哪一个描述符就绪,只告诉用户程序就绪事件,需要用户程序遍历查找
-
优劣
-
- 优势:采用了事件结构化的方式对描述符进行监控,简化了多个事件结合的监控方式;没有描述的具体上限
- 劣势:不能跨平台;随着描述符增多而性能下降
epoll
-
epoll是在Linux 2.6内核中提出的,是select和poll的增强版本。epoll在内核中使用红黑树进行节点的删除和添加,使用rdlist双向链表来存储就绪事件
-
epoll操作过程需要3个接口
-
- int epoll_create(int size);
从Linux2.6.8后,size被忽略,传入一个大于0的数即可;
在内核中创建一个结构体eventpoll,结构体中只关心两个参数,一个是红黑树,一个双向链表rdlist;
返回一个描述符,epoll的操作句柄;
使用完epoll之后必须调用close()关闭,否则可能导致描述符被耗尽。 - int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
向epoll红黑树上添加、删除、修改节点
-epfd:是epoll_create()返回的eventpoll结构体的操作句柄
-op:合法的监听事件操作:EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL,即:向内核eventpoll添加、修改、删除对fd的监听事件
-fd:op操作需要监听的文件描述符
-event:告诉内核需要监听什么事
EPOLLIN:表示对应的文件描述符可以读(包括对socket的正常关闭)
EPOLLOUT:表示对应的文件描述符可以写数据
EPOLLRDHUP:Linux 2.6.17加入;流式socket对等关闭连接或关闭半写的连接(当使用边缘触发模式时,此标记可用来检测对等关闭)
EPOLLPRI:表示对应的文件描述符有紧急的数据可读
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂起
EPOLLET:将epoll设为边缘触发模式,默认是水平触发模式
EPOLLONESHOT:Linux 2.6.2加入;只监听一次事件;当监听这次事件之后,对应的文件描述符将不可用,epoll也不会上报任何事件 - int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
开始监控eventpoll中红黑树上保存的事件结构节点,若有就绪事件就拷贝到当前的双向链表rdlist中
-epfd:是epoll_create()的返回值
-events:用来从内核得到事件的集合
-maxevents:用于确定最多获取的就绪事件个数,必须大于0
-timeout:epoll_wait阻塞等待的时间。-1=>epoll_wait将一直阻塞;0=>epoll_wait立即返回
- int epoll_create(int size);
-
epoll的工作模式:Level Trigger(LT,水平触发模式,默认)和Edge Trigger(ET,边缘触发模式)
-
- LT:支持block和no-block socket。内核会告知一个文件描述符已就绪,可以对这个就绪的文件描述符进行IO操作;若不操作,内核还是会继续通知(重复通知,直至IO操作完成)
即:只要缓冲区中的数据大小大于低水位标记的话,就会触发可读或可写事件 - ET:仅支持non-blovk socket。当内核告知一个文件描述符已就绪之后,不会再为那个文件描述符发送就绪通知,直到某些操作导致那个文件描述符不再是就绪状态(一次通知)
即:对可读事件,每次只要有新的事件到来就会触发一次可读事件;对于可写事件,每次只要缓冲区空间从0到大于低水位标记的时候就会触发可写事件
- LT:支持block和no-block socket。内核会告知一个文件描述符已就绪,可以对这个就绪的文件描述符进行IO操作;若不操作,内核还是会继续通知(重复通知,直至IO操作完成)
-
流程
-
- 告诉内核要开始对文件描述符进行监控
- 内核对描述符进行监控,采用的是事件触发方式进行监控,为每一个要监控的描述符定义一个事件,并且对这个事件都有回调函数(ep_poll_callback)
- 事件回调函数将就绪的描述符所对应的epoll_event事件结构添加到双向链表中(将事件结构地址添加到双向链表中)
- epoll_wait并没有立即返回,每隔一段时间就查看双向链表中是否为空,进而判断是否有描述符就绪
- 若链表不为空就表示有文件描述符就绪,则epoll_wait直接返回
- 在返回之前将就绪的描述对应事件结构向用户空间拷贝一份,用户程序可以直接操作就绪的描述符,不需循环遍历
-
优劣
-
- 优势:epoll没有监控的上限;采用事件结构简化了select监控结合的监控流程;epoll采用事件回调函数对文件描述符进行监控,避免了轮询遍历,性能不会随着文件描述符的增多而下降;epoll描述符的事件结构,只需要向内核中拷贝一次,不必每次都拷贝
- 劣势:不能跨平台
单线程
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class SocketMultiplexingV1 {
private ServerSocketChannel server = null;
private Selector selector = null;
int port = 8092;
public static void main(String[] args) throws IOException {
SocketMultiplexingV1 socketServer = new SocketMultiplexingV1();
socketServer.start();
}
public void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
// 若在epoll模型下,open=>epoll_create->fd3
selector = Selector.open();// select poll epoll 优先选择epoll
// server 约等于 listen状态的fd4
/*
register
select、poll:在JVM开辟一个数组,fd4放进去
epoll:epoll_ctl(fd3, ADD, fd4, EPOLLIN...)
*/
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() throws IOException {
initServer();
System.out.println("server start at port:" + port);
while (true) {
Set<SelectionKey> keys = selector.keys();
System.out.println("num of event monitor: " + keys.size());
// 调用多路复用器(select、poll、epoll(epoll_wait))
/*
select():
1.select、poll 其实是内核的select(fd4)、poll(fd4);
2.epoll 其实是内核的epoll_wait()
参数可以带时间
select:null 一直阻塞到有就绪事件; 0 立即返回
poll:负值 一直阻塞到有就绪事件;0 立即返回
epoll_wait:-1 一直阻塞到有就绪事件; 0 立即返回
*/
while (selector.select(500) > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();// 返回就绪状态的fd集合
Iterator<SelectionKey> iter = selectionKeys.iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();// 不移除会重复循环处理
if (key.isAcceptable()) {
// select、poll 因为内核没有开辟空间,在JVM中保存,包含listen状态的fd4
// epoll 通过epoll_ctl将新的客户端fd注册到内核空间
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
}
}
}
}
}
public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept();
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(8192);
/*
select、poll:JVM中开辟一个数组,fd7放进去
epoll:epoll_ctl(fd3, ADD, fd7, EPOLLIN...)
*/
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("--------------------------");
System.out.println("new client: " + client.getRemoteAddress());
System.out.println("--------------------------");
} catch (IOException e) {
e.printStackTrace();
}
}
public void readHandler(SelectionKey key) {
try {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(8192);
client.read(readBuffer);
readBuffer.flip();
// 接收信息
byte[] dataBytes = new byte[readBuffer.remaining()];
readBuffer.get(dataBytes);
String data = new String(dataBytes);
String info = "receive a new msg: " + data;
System.out.println(info);
// 返回信息
byte[] writeBytes = info.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(writeBytes.length);
writeBuffer.put(writeBytes);
writeBuffer.flip();
client.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
启动服务端
- 当前目录
- 查询主线程信息
启动客户端
- 此时服务端信息
- 此时主线程信息
客户端传入消息
- 此时服务端信息
- 此时主线程信息
多线程
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
public class SocketMultiplexingV2 {
private ServerSocketChannel server = null;
private Selector selector1 = null;
private Selector selector2 = null;
private Selector selector3 = null;
int port = 8093;
public void initServer() {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
// selector1关注OP_ACCEPT事件
selector1 = Selector.open();
// selector2、selector3关注OP_READ事件
selector2 = Selector.open();
selector3 = Selector.open();
server.register(selector1, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
SocketMultiplexingV2 service = new SocketMultiplexingV2();
service.initServer();
// boss
NioThread t1 = new NioThread(service.selector1, 2);
// worker
NioThread t2 = new NioThread(service.selector2);
NioThread t3 = new NioThread(service.selector3);
t1.start();
try {
// 确保t1启动完成
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
t3.start();
System.out.println("server start....");
}
}
class NioThread extends Thread {
Selector selector = null;
static int selectors = 0;
int id = 0;
// 存放新接入的客户端连接
volatile static BlockingDeque<SocketChannel>[] queue;
// 分配队列
static AtomicInteger idx = new AtomicInteger();
NioThread(Selector selector, int n) {
this.selector = selector;
this.selectors = n;
queue = new LinkedBlockingDeque[selectors];
for (int i = 0; i < n; i++) {
queue[i] = new LinkedBlockingDeque<>();
}
System.out.println("Boss start...");
}
NioThread(Selector selector) {
this.selector = selector;
id = idx.getAndIncrement() % selectors;
System.out.println("worker " + id + " start");
}
public void run() {
try {
while (true) {
while(selector.select(10) > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// select、poll 因为他们内核没有空间,那么在JVM中保存,和前面的listen状态的fd4一起
// epoll 通过epoll_ctl将新的客户端fd注册到内核空间
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
}
}
}
if (!queue[id].isEmpty()) {
ByteBuffer buffer = ByteBuffer.allocate(8192);
SocketChannel client = queue[id].take();
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("--------------------------");
System.out.println("new client: " + client.socket().getPort() + " distributed " + id);
System.out.println("--------------------------");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept();
client.configureBlocking(false);
int num = idx.getAndDecrement() % selectors;
queue[num].add(client);
} catch (IOException e) {
e.printStackTrace();
}
}
public void readHandler(SelectionKey key) {
try {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(8192);
client.read(readBuffer);
readBuffer.flip();
// 接收信息
byte[] dataBytes = new byte[readBuffer.remaining()];
readBuffer.get(dataBytes);
String data = new String(dataBytes);
String info = "receive a new msg: " + data;
System.out.println(info);
// 返回信息
byte[] writeBytes = info.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(writeBytes.length);
writeBuffer.put(writeBytes);
writeBuffer.flip();
client.write(writeBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
启动服务端
-
当前目录
-
查看主线程信息
-
查看线程信息
-
- mio.27700 -> accept
- mio.27706 -> read
- mio.27707 -> read
- mio.27700 -> accept
启动客户端
- 启动客户端1
- 此时服务端信息
- mio.27700 (accpet)
- mio.27706(read)
- 启动客户端2
- 此时服务端信息
- mio.27700 (accpet)
- mio.27707(read)
- 启动客户端3
- 此时服务端信息
- mio.27700 (accpet)
- mio.27706(read)
客户端传入消息
- 客户端1消息
- 此时服务端信息
- mio.27706(read)
- 客户端2消息
- 此时服务端信息
- mio.27707(read)
- 客户端3消息
- 此时服务端消息
- mio.27706(read)
类比Netty的实现
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyIO {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(2);
NioEventLoopGroup worker = new NioEventLoopGroup(2);
ServerBootstrap boot = new ServerBootstrap();
try {
boot.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, Boolean.FALSE)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyInBound());
}
})
.bind(8094)
// 阻塞当前线程到服务启动起来
.sync()
.channel()
.closeFuture()
// 阻塞当前线程到服务停止
.sync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyInBound extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
ByteBuf buffer = (ByteBuf) msg;
System.out.println("Thread: " + Thread.currentThread().getName() + " receive a msg: " + buffer.toString());
buffer.release();
}
}
异步IO(Asynchronous IO)
流程
- 用户程序通过系统调用,告知内核启动某个IO操作,用户程序返回
- 内核在整个IO操作(数据准备、数据复制)完成之后,通知用户程序,用户程序执行后续的业务操作
-
- 数据准备:内核将苏剧从网络物理设备(网卡)读到缓冲区
- 数据复制:内核将数据从内核缓冲区拷贝到用户程序空间缓冲区
内核对AIO的支持
- int aio_read(struct aiocb *aiocbp);
对一个有效的文件描述符进行异步读操作;
在请求进行排队之后会立即返回(如果执行成功,返回值就为 0;如果出现错误,返回值就为 -1,并设置 errno 的值) - int aio_write(struct aiocb *aiocbp);
对一个有效的文件描述符进行异步写操作;
在请求进行排队之后会立即返回(如果执行成功,返回值就为 0;如果出现错误,返回值就为 -1,并设置 errno 的值) - int aio_fsync(int op, struct aiocb *aiocbp);
与aiocbp相关的所有未完成的异步I/O操作进行同步 - int aio_error(const struct aiocb *aiocbp);
检查异步请求的状态,返回值如下:
-EINPROGRESS:请求尚未完成
-ECANCELED:请求被取消
-0:请求已处理完成
-负值:操作失败 - ssize_t aio_return(struct aiocb *aiocbp);
返回aiocbp指向的控制块的异步I/O请求的最终返回值
在异步IO完成时,返回一个同步read/write/fsync/fdatasync中的数据,在异步IO未完成时,返回值和aio_return()效果是undefined - int aio_suspend(const struct aiocb * const aiocb_list[], int nitems, const struct timespec *timeout);
挂起线程直到以下情况发生:注册事件有一个或多个完成;收到一个信号;超时设置不空且已超时
-aiocb_list:aiocb块地址的数组,可以向其内添加想要等待阻塞的异步事件
-nitems:向cblist注册的aiocb个数
-timeout:等待阻塞的超时时间,NULL为无限等待 - int aio_cancel(int fd, struct aiocb *aiocbp);
尝试取消文件描述符fd的未完成事件请求,返回值如下:
-AIO_CANCELED:所有请求成功取消
-AIO_NOTCANCELED:至少有一个请求因为在运行中而未能取消
-AIO_ALLDONE:在请求之前所有的请求都已完成
–1:出现错误,并设置 errno 的值 - int lio_listio(int mode, struct aiocb *const aiocb_list[],int nitems, struct sigevent *sevp);
启用一个数组aiocb_list来描述IO操作
-mode:LIO_WAIT:阻塞至所有操作完成,此时忽略sevp;LIO_NOWAIT:请求进入执行队列之后立即返回,请求完成之后将会通过sevp异步回调,若sevp为null,无异步回调
-aiocb_list:IO操作数组
-nitems:指定数组的大小
aiocb_list中的每个控制块中的aio_lio_opcode取值有:LIO_READ,启用读操作(调用aio_read);LIO_WRITE,启用写操作(调用aio_write);LIO_NOP:忽略控制块
优劣
- 优势:在内核kernel的等待数据和复制数据,用户线程都是非阻塞的;只需要注册IO操作完成的回调函数,到操作系统的内核
- 劣势:需要完成事件的注册与传递,这里边需要底层操作系统提供大量的支持;目前Linux对AIO支持并不完全,如下示例在Linux 3.10.0运行,通过strace追踪发现底层还是使用epoll实现
示例
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
public class SocketAIO {
public static void main(String[] args) {
new Thread(new AsyncServerHandler(), "AIO-Server").start();
}
}
class AsyncServerHandler implements Runnable {
// 服务端启动端口号
private int port = 8093;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncServerHandler(){
try {
//创建一个AsynchronousServerSocketChannel对象,工厂方法
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
//绑定端口号
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("server start on port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
// 为了防止主线程启动完成后关闭
latch = new CountDownLatch(1);
//该方法是异步的接受来自该通道的客户端的连接请求,连接成功后调用CompletionHandler的completed或者failed方法
asynchronousServerSocketChannel.accept(this, new ServerAcceptCompletionHandler());
try {
//阻塞直到count down to 0
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ServerAcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AsyncServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment,this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 异步读操作,参数的定义:
// 第一个参数:接收缓冲区,用于异步从channel读取数据包;
// 第二个参数:异步channel携带的附件,通知回调的时候作为入参参数,这里是作为ClientReadCompletionHandler的入参
// 通知回调的业务handler,也就是数据从channel读到ByteBuffer完成后的回调handler,这里是ClientReadCompletionHandler
result.read(buffer, buffer, new ClientReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncServerHandler attachment) {
exc.printStackTrace();
AsyncServerHandler result = (AsyncServerHandler) attachment;
result.latch.countDown();
}
}
class ClientReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ClientReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null)
this.channel = channel;
}
/**
* 业务处理,从ByteBuffer读取业务数据,做业务操作
*
* @param result
* @param attachment
*/
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("server receive order:" + req);
String currentTime = new Date(System.currentTimeMillis()).toString();
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**
* 往客户端的写操作
*
* @param currentTime
*/
public void doWrite(String currentTime) {
if (null != currentTime && currentTime.trim().length() > 0) {
byte[] bytes = currentTime.getBytes();
//分配一个写缓存
ByteBuffer write = ByteBuffer.allocate(bytes.length);
System.out.println("reponsbody=" + currentTime);
//将返回数据写入缓存
write.put(bytes);
write.flip();
//将缓存写进channel
channel.write(write, write, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果发现还有数据没写完,继续写
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
//写失败,关闭channel,并释放与channel相关联的一切资源
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
//读,关闭channel,并释放与channel相关联的一切资源
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void completed(Integer result, ByteBuffer buffer) {
//如果发现还有数据没写完,继续写
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
//写失败,关闭channel,并释放与channel相关联的一切资源
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
//读,关闭channel,并释放与channel相关联的一切资源
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
最后
以上就是激动篮球为你收集整理的IO演进什么是IOIO模型的全部内容,希望文章能够帮你解决IO演进什么是IOIO模型所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复