概述
上一篇我们讲解了大部分的理论知识,这一篇会讲解一些实际代码部分。
首先让我们熟悉一下几个方法:
- 创建一个服务器对象:ServerSocketChannel.open()
- 服务器对象需要绑定ip和端口,使用bind(InetSocketAddress )方法,需要使用传入InetSocketAddress,只需传入一个端口号即可
- 服务器调用accept()方法获取客户端的连接请求
- 通过接收到的客户端连接对象read(buffer)方法获取客户端发送的消息
- 创建客户端:SocketChannel.open()
- 客户端使用connect(InetSocketAddress server)方法,连接对应的服务器
- 通过write(buffer)方法发送消息到连接的服务器
- 设置阻塞模式:configureBlocking(false),如果为true就是阻塞的,false为非阻塞
- 通过Selector.open()创建selector对象管理多个channel
还有剩下的一些方法的使用都会在注释中呈现出来
阻塞模式
首先是阻塞模式的一个引用,用于衬托下文非阻塞模式的好处。
服务器:
public static void main(String[] args) throws IOException {
//1.创建一个服务器,ServerSocketChannel的open方法
ServerSocketChannel ssc=ServerSocketChannel.open();
//2.绑定监听端口
ssc.bind(new InetSocketAddress(8080));
//用于接收消息,这里限制50字节
ByteBuffer buffer=ByteBuffer.allocate(50);
//用于保存所有的客户端Socket对象
List<SocketChannel> channels=new ArrayList<SocketChannel>();
while(true){
//3.接收客户端的请求 (这里是阻塞的)
SocketChannel sc = ssc.accept();
System.out.println("连接..."+sc);
//加入集合里,以便查看对应的连接是否发消息过来了
channels.add(sc);
//遍历List集合,查看已经建立连接的客户端有人发消息了吗
for (int i = 0; i < channels.size(); i++) {
SocketChannel scTemp = channels.get(i);
//ByteArrayOutputStream方便接收完整的消息
ByteArrayOutputStream baos=new ByteArrayOutputStream();
//read方法也是阻塞的
while (scTemp.read(buffer)!=-1){
buffer.flip(); //由写模式变为读模式
//读出数据
while (buffer.hasRemaining()){
baos.write(buffer.get());
}
buffer.clear();
}
System.out.println("接收到消息:" +baos.toString() + scTemp);
baos.close();
}
}
}
客户端:
SocketChannel sc=SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8080));
sc.write(StandardCharsets.UTF_8.encode("ihaveadream"));//转为ByteBuffer
System.out.println("waiting....");
//注意:这里如果不关闭的话,服务器就会一直等待你发送消息
//sc.shutdownOutput();
sc.close();
结果:服务器和多个客户端通信的时候,发生了阻塞,他必须有一个新的连接来到时,才会遍历查看有没有新的消息。会卡在accept上,或者read上。为了避免这种情况,下面使用非阻塞模式。
非阻塞模式
服务器:
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocketChannel ssc = ServerSocketChannel.open();
//改成非阻塞的,默认为true即阻塞的状态,ssc非阻塞影响的是accept方法
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
ByteBuffer buffer = ByteBuffer.allocate(50);
List<SocketChannel> channels = new ArrayList<SocketChannel>();
while (true) {
//现在是非阻塞的,没有连接也会继续运行,没有连接则返回一个null,继续向下跑
SocketChannel sc = ssc.accept();
//加一个判断,表示有客户端连接时才执行
if (sc != null) {
System.out.println("连接..." + sc);
//也可以将SocketChannel设为非阻塞模式,影响的是下面的read方法
sc.configureBlocking(false);
channels.add(sc);
}
for (int i = 0; i < channels.size(); i++) {
SocketChannel scTemp = channels.get(i);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
//加一个判断
int len;
//如果read方法没有读取到数据,返回0,现在是非阻塞的了
len = scTemp.read(buffer);
//加一个判断,如果len为0或者读到末尾了,则不做任何事,这样的的话,客户端每发一条数据都能及时的显示出来
//没发的话不显示没用的数据
if (len != 0&&len!=-1) {
buffer.flip();
while (buffer.hasRemaining()) {
baos.write(buffer.get());
}
buffer.clear();
System.out.println("接收到消息:" + baos.toString() + scTemp);
}
baos.close();
}
}
}
总结:此方法,服务器不断循环,cpu占用高,为了解决这种问题下面采用Selector加以改进。
Selector
多路复用。管理多个channel
(1)与Selector一起使用时,Channel必须处于非阻塞模式下,否则将抛出异常IllegalBlockingModeException。
(2)一个通道,并没有一定要支持所有的四种操作。比如服务器通道ServerSacketChannel.支持 Accept接受操作,而 SocketChannel客户端通道则不支持。可以通过通道上的 validOps()方法,来获取特定通道下所有支持的操作集合。
Selector监听以下四种事件(重点)
-
accept :在有连接请求时触发
-
connect:是客户端连接建立后
-
read:可读事件,表明某个channel中有可读的数据了,一般是客户端在SocketChannel写了数据发给服务器
-
write:可写事件
调用select()的方法时,selector会进行阻塞,select何时不阻塞
-
事件发生:
-
客户端发起连接请求,触发accept事件
-
客户端发送数据,或者客户端正常或异常关闭都会触发read事件,如果发送数据大于buffer缓冲区,触发多次read事件
-
channel可写,会触发write事件
-
在linux下nio bug发生
-
-
调用selector的wakeup()方法
-
调用selector.close()方法
-
selector所在线程interrupt
解决上面的CPU占用过高问题:
public static void main(String[] args) throws IOException {
//1.创建selector管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
//首先需要转为非阻塞模式,才能关联selector
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
//2.建立selector和channel的联系(注册)
//SelectionKey(观察者) 事件发生后,通过它可以知道发生什么事件和哪个channel发生的事件
//SelectionKey当做管理这个ssc的管理员 0表示什么事件都不感 兴趣
//当这个ssc发生感兴趣的事件后,就会把这个SelectionKey(sscKey)给到selector,放在集合里
SelectionKey sscKey = ssc.register(selector, 0, null);
System.out.println("ssc的SelectionKey对象为:" + sscKey);
//设置它的感兴趣的事件,是个常量,可以用 +或| 选多个事件
//现在这个key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {
//3.selector方法,没有事件发生,线程阻塞,有事件线程才会恢复运行,
// 事件要么处理要么cancel取消 否则会一直处于非阻塞,
selector.select();//有三个方法重载,一个带超时,一个不阻塞
//4.处理事件,需要手动删除
//里面实际包含了SelectionKey还有他对应的哪个事件,当我们把对应的事件处理完后,就会自动去除该具体事件
//但是对应的SelectionKey还在,如果不删除的话下次再循环到很可能就会报空指针异常,因为该SelectionKey就是个空壳子了
Set<SelectionKey> keys = selector.selectedKeys();
//5.用迭代器遍历,因为涉及到了删除操作
Iterator<SelectionKey> iterator = keys.iterator();
//6.处理事件
while (iterator.hasNext()) {
//取到channel关联的SelectionKey
SelectionKey key = iterator.next();
//已经取到对应的SelectionKey了,可以删除了。
iterator.remove();
System.out.println("key:" + key);
//区分事件类型
//1) ssc的accept事件
if (key.isAcceptable()) {
//拿到 是哪个channel发生的事件 即发生事件的channel,需要强转,这里即ssc对象
ServerSocketChannel sscRef = (ServerSocketChannel) key.channel();
//获取客户端
SocketChannel sc = sscRef.accept();
System.out.println("接收到" + sc.getLocalAddress() + "的一个连接请求"); //修改客户端为非阻塞,因为客户端也要关联Selector
sc.configureBlocking(false);
//这个SelectionKey是给sc准备的,用于监听读和写事件,和ssc共用一个selector
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ, null);
} else if (key.isReadable()) {//2) read事件发生
//因为这里服务器并没有设置监听read事件,所以只可能是SocketChannel类型,且是客户端通过客户端的channel发送数据
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(5);
try {
//当客户端关闭后,会引发一个read,此read会进入SelectionKey
//现在的缓冲区只有5字节,如果消息有12字节的话会触发3次read事件
int len=channel.read(buffer);
buffer.flip();
CharBuffer decode = StandardCharsets.UTF_8.decode(buffer);//解码buffer
String msg = decode.toString();
//读到-1表示客户端正常关闭,客户端关闭会触发一次读事件
if (len == -1) {
//另外,这个事件会不断发生,就算从已准备好的集合移除也没有,必须将该channel关闭或者调用哪个该 key的cancel方法,
// 因为SelectionKey代表的是Selector和Channel之间的联系,所以在Channel关闭了之后,对于 Selector来说,
// 这个Channel永远都会发出关闭这个事件,表明自己关闭了,直到从该Selector移除去
key.cancel();
}else {
System.out.println("接收到一个消息" + channel + "的一个消息: " + msg);
}
} catch (IOException e) {
//客户端不是close关闭的,会抛异常,导致服务器也停掉,所以需要捉住异常。
System.out.println("一个客户端非正常关闭了");
key.cancel();
}
// channel.read(buffer);
//如果在取消SelectionKey(这时候只是加入取消的键集合,下一次select才会执行)后没有调用到selector的 select方法(因为Client一般在取消key后,
// 我们都会终止调用select的循环,
//当然,server关闭一个注册的channel我们是不会终止select循环的),那么本地socket将进入CLOSE-WAIT 状态(等待本地Socket关闭)。
//简单的解决办法是在 SelectableChannel.close方法之后调用Selector.selectNow方法
// selector.selectNow();
}
}
}
}
客户端代码不变。
这里面涉及到的几个细节:
- 每处理一个事件都必须remove掉,否则下一次会继续执行该方法,如服务器accept到了客户端,但对应的事件却没删除,下一次没有客户端请求连接依旧会被上次残留的事件唤醒,所以服务器去到一个null,再给null注册事件时,会报空指针异常。
- 客户端正常关闭连接会触发一个read事件,读该事件的时候会读到-1,客户端非正常断开会抛异常,为了不影响服务器的运行需要捕捉该异常,客户端断开连接的read事件需要cancel掉,否则会发生无限循环打印异常信息或者其他信息。
进一步优化
对上面的服务器进行一些功能上的优化
处理消息边界
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。
(1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;
(2)服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;
(3)服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;
(4)服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。 如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。
解决方法:
-
客户端和服务器约定一个包的固定大小,服务器按预定长度读取,缺点是浪费宽带。
-
客户端发送消息的时候指定一个分隔符,如n,但是此方法效率较低,需要一个一个去找分隔符。
-
(常用)跟TCP一样,TLV类型,(Type,Length,Value),数据包含有一个固定大小的包头,指定后面的数据有多大,分配合适的buffer,缺点是buffer需提前分配,如果内容过大,影响server吞吐量
-
Http1.1:TLV格式
-
Http2.0:LTV格式
-
附件与扩容
如果一次发送的内容太长,则需要两次read事件读取,但时两次读取就把一个整体消息给分隔开了,每次接收同一个消息的ByteBuffer因为是局部变量所以是两个不同的buffer块,因此没办法去共享结合这两次读取的消息了。如果把buffer块变成全局的,会造成混乱。所以提出了新的attachment属性,使得每个socketchannel即客户端都拥有一个独立的ByteBuffer。
对上面代码进行修改,以前的注释删掉了,新的注释需要留意
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
//1) ssc的accept事件
if (key.isAcceptable()) {
ServerSocketChannel sscRef = (ServerSocketChannel) key.channel();
SocketChannel sc = sscRef.accept();
System.out.println("接收到" + sc.getLocalAddress() + "的一个连接请求");
//建立一个缓冲区,用于关联新连接的SocketChannel
ByteBuffer buffer = ByteBuffer.allocate(5);
sc.configureBlocking(false);
//注册方法的第三个参数写与之关联的buffer块,保证每个SocketChannel有独立的ByteBuffer
//buffer块实际关联的是SelectionKey,SelectionKey再关联SocketChannel
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ, buffer);
} else if (key.isReadable()) {//2) read事件发生
SocketChannel channel = (SocketChannel) key.channel();
//通过SocketChannel关联到的scKey获得对应的关联的attachment即buffer块
//attachment是Object类型,需要强转。Object类型我们以后也可以关联其他的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
try {
//当客户端关闭后,会引发一个read,此read会进入SelectionKey
int len=channel.read(buffer);
//读到-1表示客户端正常关闭,客户端关闭会触发一次读事件
if (len == -1) {
key.cancel();
}else {
split(buffer);
//表明spilt方法里,没有碰到一个消息的结尾,表明消息太长,需要扩容
//注意此position是写模式下的,表示写指针到最后不能再写了
if(buffer.position()==buffer.limit()){
ByteBuffer newBuffer=ByteBuffer.allocate(buffer.capacity()*2);
buffer.flip();
newBuffer.put(buffer);
//替换掉旧buffer,下次read事件channel.read(buffer)会从position的位置继续读未取到的消息。
key.attach(newBuffer);
}
}
} catch (IOException e) {
//客户端不是close关闭的,会抛异常,导致服务器也停掉,所以需要捉住异常。
System.out.println("一个客户端非正常关闭了");
key.cancel();
}
}
}
}
}
//按分隔符split每一条消息,并打印,半包的消息让他继续保存在buffer里
public static void split(ByteBuffer buffer){
buffer.flip();
ByteArrayOutputStream bao=new ByteArrayOutputStream();
for (int i = 0; i < buffer.limit(); i++) {
//get(int index)方法并不会移动position指针
byte b = buffer.get(i);
//得到一条完整消息
if(b =='n'){ //"abcdn123"
//不包含后面的n
int len=i-buffer.position();
for (int j=0; j < len; j++) {
bao.write(buffer.get());
}
//消耗掉末尾的n标记
buffer.get();
//输出这条完整消息
System.out.println(new String(bao.toByteArray(), 0, bao.size()));
bao.reset();
}
}
//末尾还有一些半包的数据,让他提到前面去
buffer.compact();
}
ByteBuffer大小分配
-
每个channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个channel共同使用,因此需要为每个channel维护一个独立的 ByteBuffer。
-
ByteBuffer不能太大,比如一个ByteBuffer 1MB的话,要支持百万连接就要1TB内存,因此需要设计大小可变的ByteBuffer
-
一种思路是首先分配一个较小的 buffer,例如4,如果发现数据不够,再分配8k的buffer,将4buffer内容拷贝至8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现
-
另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
-
可写事件
可写事件就是buffer里面还有空的位子 可以写入 就触发可写,可写事件当一次写不完的情况下才需要。
OP_WRITE事件是在Socket发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT时发生。正常情况下,都是可写的,因此一般不注册写事件。
演示:
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8081));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
StringBuffer sb=new StringBuffer();
//连接成功后向客户端发送大量数据,使他一次写不完
for (int i = 0; i < 30000000; i++) {
sb.append("a");
}
//此时这里的buffer很大很大,底层的缓冲区可能一次写不了全部
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
//第一次写入给客户端的channel
int len=sc.write(buffer);
System.out.println(len);
//这里表示buffer里还有没有写出去的数据
if (buffer.hasRemaining()) {
//在该channel加上一个可写事件,使下次再继续写剩余的数据
scKey.interestOps(scKey.interestOps()|SelectionKey.OP_WRITE);
//把未写完的数据挂到该scKey上
scKey.attach(buffer);
}
}else if(key.isWritable()){
//取到那个未写完的buffer和要写到的channel
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
//继续写
int len = sc.write(buffer);
System.out.println(len);
//如果写完了就删除可写事件,否则没写完下次该channel继续触发可写事件
if (!buffer.hasRemaining()){
key.attach(null);//清除buffer,帮助GC
//删掉可写事件,否则就会一直触发可写事件
key.interestOps(key.interestOps()-SelectionKey.OP_WRITE);
}
}
}
}
}
客户端接收消息:
SocketChannel sc=SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1",8081));
int count=0;
while (true){
ByteBuffer buffer=ByteBuffer.allocate(1024*1024);
count+=sc.read(buffer);
System.out.println(count);
}
}
多线程优化
上面的服务器已经能完成基本的功能了,现在为了提高性能,充分利用好cpu的核心数,这里我将用多线程来优化该服务器(netty的原理),先看如下图
我们的boss线程只接受accept事件,worker线程关注客户端的读时间,达到分工合作。
问题:我们设置多少个worker线程才能发挥最大利用率?
答案:你现在的cpu核心数减去一个boss线程。获取当前物理机的核心数方法如下
Runtime.getRuntime().availableProcessors()
注意:如果工作在Docker容器下因为容器不是物理隔离的,会拿到物理cpu的核心数。而不是申请容器时的个数。这个问题在jdk10才修复。
上代码:
/**
* @author lxc 2021/11/6
*/
public class MultiTheadServer {
public static void main(String[] args) throws IOException {
//当前主线程用来充当boss,管理accept事件
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
//1.创建固定数量的worker
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()-1];
for (int i = 0; i < workers.length; i++) {
workers[i]=new Worker("work" + i);
}
//计数器,配合round robin负载均衡
AtomicInteger count = new AtomicInteger();
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
System.out.println("connecting...");
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
System.out.println(sc.getRemoteAddress());
sc.configureBlocking(false);
//这里有一个坑就是selector的register和select方法同时只能运行一个
//如果selector阻塞于select,你再执行register,register也会阻塞等select,当前主线程就会阻塞
//所以work的register方法采用了任务队列,使得word的执行不影响boss的执行
// work.register(sc);
//只增不减,为了防止溢出变为负数,需要加一个求绝对值
workers[Math.abs(count.incrementAndGet() % workers.length)].register(sc);
}
}
}
}
}
class Worker implements Runnable {
private final Thread thread;//每一个该对象,创建一个线程和一个Selector
private final Selector selector;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
public Worker(String name) throws IOException {
selector = Selector.open();
this.thread = new Thread(this, name);
thread.start();
}
//客户端第一次连接selector,把注册进去
public void register(SocketChannel sc) throws ClosedChannelException {
//把register这件任务从boss线程解耦到worker线程
queue.offer(() -> {
try {
sc.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
//立即唤醒selector,防止它正阻塞在select方法上迟迟连接不了新来的客户端
//该方法可以提前唤醒selector,和LockSupport的unpack一个原理
selector.wakeup();
}
@Override
public void run() {
while (true) {
try {
selector.select();
//用于注册新来的客户端
while (!queue.isEmpty()) {
System.out.println("连接selector...");
Runnable task = queue.poll();
task.run();
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel sc = (SocketChannel) key.channel();
try {
int len = sc.read(buffer);
if (len == -1) {
System.out.println("一个客户端正常关闭");
key.cancel();
} else {
buffer.flip();
System.out.println("收到消息" + StandardCharsets.UTF_8.decode(buffer));
}
} catch (IOException e) {
System.out.println("一个客户端非正常关闭");
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
测试:
最后
以上就是洁净音响为你收集整理的Netty基础 NIO SocketChannel 网络编程 自制小型服务器,多线程优化*的全部内容,希望文章能够帮你解决Netty基础 NIO SocketChannel 网络编程 自制小型服务器,多线程优化*所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复