我是靠谱客的博主 欢喜面包,最近开发中收集的这篇文章主要介绍NIO之Pipe,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

引言

Pipe(管道)一般用于Java多线程通信,其优点是可以利用Java的I/O机制来解决多线程数据交互的同步问题。管道类似生活中的水管,一端进水一端出水,其使用方法也是一端写数据一端读数据,写读数据时均有可能进入阻塞。

Java不同的管道形式

Java库中提供了两种管道的使用方法,即BIO(Blocking IO)型与NIO(Non-blocking IO)型(本文只介绍NIO型管道),BIO为常见的IO类型例如普通的文件输入输出,而NIO是一种非阻塞模式的IO类型,何为非阻塞?我们知道IO是系统与外界进行信息交互的统称,与外界交互就免不了进入阻塞来等待外界满足正常IO需求,此时线程处于等待状态,只要一个IO没有完成就无法进行其他工作,如果同时要进行多个IO操作,就必须要开多个线程,造成资源浪费,NIO机制就能够一定程度缓解这些问题,其使用通道和选择器可以在一个线程中处理多个IO请求,NIO机制以后再详细介绍,本文只介绍其一个具体应用----管道。

管道类

// 抽象类
public abstract class Pipe
//静态函数用于构造对象
public static Pipe open()
//获取读取端通道
public abstract Pipe.SourceChannel source()
//获取写入端通道
public abstract Pipe.SinkChannel sink()
//Channel类设置通道是否阻塞,默认为true阻塞,这种设置的通道使用方法与普通IO相同,如果要使用选择器处理多路IO,就需要将其设置为false非阻塞模式。
public final SelectableChannel configureBlocking(boolean block)

java原代码中Pipe类为一个抽象类,没有直接提供Pipe的实现方法,这是因为不同系统底层的NIO实现方式不同,所以实例对象时会根据不同的系统实例不同的Pipe,实际使用的类为

class PipeImpl extends Pipe

管道类的简单例子

主要实现两个线程使用管道进行通信,直接上代码

public class myPipeTest {
private static Object myThr;
public static void main(String[] args) throws IOException, InterruptedException {
Pipe aPipe = Pipe.open();
Thread myTask1 = new Thread(new myThr1(aPipe));
Thread myTask2 = new Thread(new myThr2(aPipe));
myTask1.start();
myTask2.start();
Thread.sleep(5000);
}
}
class myThr1 implements Runnable {
private Pipe aPipe;
public myThr1(Pipe aPipe) {
this.aPipe = aPipe;
}
@Override
public void run() {
System.out.println("write");
ByteBuffer abbf = ByteBuffer.allocate(1024);
Pipe.SinkChannel aSc = null;
int time = 0;
try {
while(true){
String str = "i am " + Thread.currentThread().getName() + time++;
abbf.clear();
abbf.put(str.getBytes());
abbf.flip();
aSc = aPipe.sink();
aSc.write(abbf);
if(time == 2)
{
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
aSc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class myThr2 implements Runnable {
private Pipe aPipe;
public myThr2(Pipe aPipe) {
this.aPipe = aPipe;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("read");
ByteBuffer abbf = ByteBuffer.allocate(1024);
try {
Pipe.SourceChannel aSc = aPipe.source();
aSc.read(abbf);
System.out.println(new String(abbf.array()));
aSc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

运行结果

write
read
i am Thread-0 0 i am Thread-0 1

代码说明

1、myThr1线程写入端,myThr2是读取端
2、写入端连续写入两次,读取端读取了一次可以看出写入会接着上次未读出的信息继续写入,并未是直接清空之前数据。
3、不使用选择器单独使用通道进行读写数据时也能进入阻塞

Pipe的常用例子

简单方法并不常用,使用Pipe类一般都会用到选择器实现一个线程处理多组IO请求,本次实例主要使用选择器同时处理网络套接字IO请求与管道IO请求,并进行简单的异常处理。

public class myTest {
public static void main(String[] args) throws IOException {
/*选择器*/
Selector aSel = Selector.open();
SelectionKey aKeySocket = null;
SocketChannel aSockChn = null;
/*SocketServer 通道*/
ByteBuffer buf = ByteBuffer.allocate(1024);
ServerSocketChannel lSockChn = ServerSocketChannel.open();
lSockChn.socket().bind(new InetSocketAddress(10086));
lSockChn.configureBlocking(false); //注册到选择器的通道需要设置为非阻塞模式
SelectionKey aKeyServerSocket = lSockChn.register(aSel, SelectionKey.OP_ACCEPT);//注册此通道的accept事件,除此之外还可以注册read write connect事件
/*pipeSocket 通道*/
Pipe aPipe = Pipe.open();
Pipe.SourceChannel pipeSrcChn = aPipe.source();
pipeSrcChn.configureBlocking(false);
SelectionKey aKeyPipe = pipeSrcChn.register(aSel, SelectionKey.OP_READ);
/*创建启动写线程*/
Thread myTask1 = new Thread(new myThrPipe(aPipe));
Thread myTask2 = new Thread(new myThrSocket());
myTask1.start();
myTask2.start();
/*Select*/
while(true)
{
aSel.select(10000);
/*选择器可能会进入阻塞, 每10s自动退出阻塞*/
int error = 0;
for (SelectionKey Key:aSel.selectedKeys()) {
if(Key.isAcceptable()){
AbstractSelectableChannel sChn = (AbstractSelectableChannel)Key.channel();
if(Key.equals(aKeyServerSocket)){
/*向选择器注册Socket 通道*/
ServerSocketChannel aServerSockChn = (ServerSocketChannel)sChn;
aSockChn = aServerSockChn.accept();
if(aSockChn == null){
continue;
}
aSockChn.configureBlocking(false);
aKeySocket = aSockChn.register(aSel, SelectionKey.OP_READ);
}
else {
error = 1;
break;
}
}
if(Key.isReadable()){
buf.clear();
AbstractSelectableChannel sChn = (AbstractSelectableChannel)Key.channel();
if(Key.equals(aKeySocket)){
//读取套接字
aSockChn = (SocketChannel)sChn;
int readNum = aSockChn.read(buf);
if (readNum < 0){
Key.cancel();/*出错认为客户端已经断开连接,从选择器中移除*/
break;
}
System.out.println("Socket mes:" + new String(buf.array()));
}
else if(Key.equals(aKeyPipe)){
//读取管道
Pipe.SourceChannel aPipeSrcChn = (Pipe.SourceChannel) sChn;
int readNum = aPipeSrcChn.read(buf);
if (readNum < 0){
Key.cancel();/*出错管道已经无效,从选择器中移除*/
break;
}
System.out.println("Pipe mes:" + new String(buf.array()));
}
else{
error = 2;
break;
}
}
}
/*选择器已空退出循环*/
if(aSel.selectedKeys().isEmpty()) {
break;
}
/*出错退出循环*/
if (error != 0) {
break;
}
}
/*释放资源*/
if(aSockChn != null){
aSockChn.close();
}
if(pipeSrcChn != null){
pipeSrcChn.close();
}
if(lSockChn != null){
lSockChn.close();
}
if(aSel != null){
aSel.close();
}
}
}
//pipe写入线程
class myThrPipe implements Runnable {
private Pipe aPipe;
public myThrPipe(Pipe aPipe) {
this.aPipe = aPipe;
}
@Override
public void run() {
ByteBuffer abbf = ByteBuffer.allocate(1024);
abbf.clear();
abbf.put("i am Pipe Writer".getBytes());
abbf.flip();
try {
Pipe.SinkChannel
aSc = aPipe.sink();
System.out.println("Pipe write");
aSc.write(abbf);
aSc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//socket写入线程
class myThrSocket implements Runnable {
@Override
public void run() {
try {
Socket aSocket = new Socket("hostName", 10086); //此处可以使用电脑主机名或者IP地址
OutputStream os = aSocket.getOutputStream();
System.out.println("Socket send");
os.write("I am Socket Client".getBytes());
aSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

打印结果

Pipe write
Socket send
Pipe mes: i am Pipe Writer
Socket mes: I am Socket Client

最后

以上就是欢喜面包为你收集整理的NIO之Pipe的全部内容,希望文章能够帮你解决NIO之Pipe所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部