阻塞和非阻塞队列下两种生产者消费者实现

Source

队列可分为两种,一种是阻塞队列,一种是非阻塞队列。

阻塞队列和非阻塞队列的区别:阻塞队列可以阻塞,非阻塞队列不能阻塞,只能使用队列wait(),notify()进行队列消息传送。而阻塞队列当队列里面没有值时,会阻塞直到有值输入。输入也一样,当队列满的时候,会阻塞,直到队列不为空。

自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

ArrayBlockingQueue:
基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列

LinkedBlockingQueue
基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

PriorityBlockingQueue:
它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列

DelayQueue:
一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

以下五个方法,在阻塞与非阻塞队列中均可使用。对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

  1. add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
  2. remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
  3. offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
  4. poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;
  5. peek():获取队首元素,若成功,则返回队首元素;否则返回null

阻塞队列包括了非阻塞队列中的大部分方法,上面列举的5个方法在阻塞队列中都存在,但是要注意这5个方法在阻塞队列中都进行了同步措施。
除此之外,阻塞队列提供了另外4个非常有用的方法:

  1. put(E e):put方法用来向队尾存入元素,如果队列满,则等待;
  2. take():take方法用来从队首取元素,如果队列为空,则等待
  3. offer(E e,long timeout, TimeUnit unit):offer方法用来向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
  4. poll(long timeout, TimeUnit unit): poll方法用来从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取到,则返回null;否则返回取得的元素;

关于阻塞队列和非阻塞队列的一个经典的使用场景就是生产者和消费者模型,思考一下,按照上面的对两种队列的描述,其实现有何差异呢?

很明显,阻塞队列不需要外力干扰,只要生产者和消费者使用的是同一个队列,生产者生产完数据,就会自动阻塞在那里,等待消费者消费,即队列只要不为空,生产中能和就会一直生产数据,

反观非阻塞队列,生产者生产完毕数据,由于是非阻塞的,其他任何线程都有可能参与到消费队列数据的可能,同时由于非阻塞,线程就有终止的风险,就不能像阻塞队列那样一直运转,就需要借助非阻塞队列的wait()和notify()方法,

下面我们用代码来实现这两种队列下的生产–消费模型,

1、先看阻塞队列的代码,代码比较简单,都有注释,大家可参照注释一起理解,

package com.youfan.common.util;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * 阻塞队列
 * @author asus
 *
 */
public class Test1 {

	public static void main(String[] args) {
		Test1 t = new Test1();
		Producer p = t.new Producer();
		Consumer c = t.new Consumer();
		p.start();
		c.start();
	}

	private int queueSize = 10;

	private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

	// 生产者
	class Producer extends Thread {

		@Override
		public void run() {
			producer();
		}

		private void producer() {
			while (true) {
				try {
					queue.put(1);
					System.out.println("生产者向队列中添加了一个元素,队列的剩余空间是:" + (10 - queue.size()));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}

	// 生产者
	class Consumer extends Thread {

		@Override
		public void run() {
			consumer();
		}
		
		private void consumer() {
			while (true) {
				try {
					queue.take();
					System.out.println("消费者从队列中取出一个元素,队列中剩余的元素个数是:" + (queue.size()));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

运行上述代码,我们看到结果是:
在这里插入图片描述

可以看到,生产者生产数据以及消费者消费数据的过程一直在进行;

2、非阻塞队列实现生产消费模型:

package com.youfan.common.util;

import java.util.PriorityQueue;

public class Test2 {
	
	public static void main(String[] args) {
		Test2 t = new Test2();
		Producer p = t.new Producer();
		Consumer c = t.new Consumer();
		p.start();
		c.start();
	}
	
	private int queueSize = 10;
	private PriorityQueue<Integer> queue = new PriorityQueue<Integer>();
	
	//对于生产者来说,一旦队列填满了,就需要等待,即wait(),一旦消费者消费了,队列存在容量了,则生产者线程被notify(),就继续生产
	class Producer extends Thread{
		
		@Override
		public void run() {
			produce();
		}
		
		public void produce(){
			while(true){
				synchronized (queue) {
					while(queue.size() == queueSize){
						System.out.println("队列满了,需要等待");
						try {
							queue.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
					queue.offer(1);
					queue.notify();  	//生产者生产出了数据就通知消费者线程取出数据
					System.out.println("生产者向队列中添加了一个元素,队列的剩余空间是:" + (10 - queue.size()));
				}
			}
		}
	}
	
	//对于消费者来说,如果队列为空,则需要等待,否则,从队列中取出一条数据
	class Consumer extends Thread{
		
		@Override
		public void run() {
			consume();
		}
		
		public void consume(){
			while(true){
				synchronized (queue) {
					while(queue.size() == 0){
						System.out.println("队列为空,请等一会儿再消费");
						try {
							queue.wait();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
					queue.poll();
					//消费者取出数据后,就需要唤醒生产者线程,可以进行数据放入了
					queue.notify();
					System.out.println("消费者从队列中取出一个元素,队列中剩余的元素个数是:" + (queue.size()));
				}
			}
		}
		
	}
	
}	

运行上述代码,控制台打印结果同样实现了上述功能:
在这里插入图片描述

实际业务中,生产者消费者模型使用的场景非常多,借助队列,可以带来意想不到结果,大家可以在此基础上做进一步的思考,希望对看到的小伙伴们有所帮助哦