概述
概述
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景。
生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不
直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻
塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了
生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
举个例子:
就和包饺子一样,生产者负责包饺子,消费者负责吃饺子,当消费者吃饱后,生产者多余的饺子就会进入等待,直到消费者又饿了,再去吃~
我们借助生产者消费者模型,来进行“削峰”,削弱请求峰值对服务器的冲击力,接受的大量数据被放进阻塞队列中,在里面缓存着,不会消耗太多的CPU资源,然后服务器按照固定的节奏从阻塞队列中获取数据~
下面我们基于数组实现一个阻塞队列,方便理解:
初始情况下 head 和 tail 为 0
入队时,每次放入新元素,就放在tail位置上,然后tail++,当tail >= 数组的长度的时候,让tail = 0;从头再开始。
出队时,让head++,同理,head >= 数组长度的时候,也让head=0,从头开始。
这样就保证队列中的元素一直在head 到tail的区间里,左闭右开的区间
但是这里还会发现一个问题,队列为空的时候 和 队列为满的时候,head 和 tail 都是在同一位置,所以我们还需要一个变量size来判断队列中元素的个数。(也可以浪费一个区间来进行判断,但这里是简单实现)。
然后我们实现阻塞,
通过Object类,引入一个锁对象。
当我们队列为空时,就通过加锁,wait方法,就要阻塞等待,直到生产者线程生产元素后唤醒消费者线程,队列不为空,可以继续消费。
同理当我们队列满了的时候,也通过加锁的方式,就要阻塞等待,直到消费者线程,消费完元素后,队列不在为满的时候,就可以唤醒生产者线程来进行生产。
为什么要用while循环来判断size == 数组长度呢?
举个例子:
如果当前线程已经满了,由于是多线程的环境下,可能只消费了一个元素,但是有多个线程进行生产,此时多个线程共同竞争一把锁,竞争到的可以生产,那么没有竞争到的就还需要判断对列是否满了,满了依然进入阻塞等待。
wait被唤醒后要再次判断当前对列是否为满~
这两个等待不会同时等待~
public class TestBlockingQueue {
//基于数组的方式实现一个普通队列
//在改进成阻塞对列
static class BlockingQueue{
private int [] items = new int[10];
//从head取元素
private int head = 0;
//从tail添加元素
private int tail = 0;
//表示队列中的元素
private int size = 0;
//引入一个锁对象
private Object locker = new Object();
//入队列
public void put(int value) throws InterruptedException {
synchronized (locker){
//队列满后,阻塞等待,等到队列不满的时候,继续插入
while(size == items.length){
locker.wait();
}
items[tail] = value;
tail++;
if(tail >= items.length){
tail = 0;
}
size++;
locker.notifyAll();
}
}
//出队列
public Integer take() throws InterruptedException {
int ret = 0;
synchronized (locker){
//队列为空同样进入阻塞等待,直到有元素时,继续出元素
while(size == 0){
locker.wait();
}
ret = items[head];
head++;
if(head >= items.length){
head = 0;
}
size --;
locker.notifyAll();
}
return ret;
}
}
}
测试:
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new BlockingQueue();
Thread producer = new Thread(){
@Override
public void run() {
for (int i = 0; i < 30; i++) {
try {
System.out.println("生产了元素 " + i);
queue.put(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread customer = new Thread(){
@Override
public void run() {
while(true){
try {
int ret = queue.take();
System.out.println("消费了 " + ret);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
producer.start();
customer.start();
}
如果去掉producer线程里的Thread.sleep(1000); 就会发现生成的速度很快,但是消费的速度很慢。之后就产生了阻塞,因为我设置的阻塞队列大小为10,所以当producer线程生产到11的时候,进入WAITING状态。直到customer线程,消费完之后,队列不在为满的时候,唤醒producer线程继续生产。
以上设计的只是一个简单的阻塞队列,而Java库中的阻塞队列 BlockingDeque ,更加强大,功能也更加完善和细致,只针对局部加锁,降低锁的力度~
最后
以上就是朴素夕阳为你收集整理的Java多线程中的阻塞队列的全部内容,希望文章能够帮你解决Java多线程中的阻塞队列所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复