1、阻塞队列的原理
阻塞队列与普通队列的区别在于:阻塞队列为空时,从队列中获取元素的操作将会被阻塞,当队列为满时,往队列里添加元素的操作会被阻塞。
试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来
2、阻塞队列的简单实现
/** * 基于链表实现的一个阻塞队列 * * @author jade * */ public class BlockingQueue { private int capacity ; // 阻塞队列容量 private List queue = new LinkedList(); // 基于链表实现的一个阻塞队列 public BlockingQueue() { this(Integer.MAX_VALUE); } public BlockingQueue(int capacity) { this.capacity = capacity; } /** * 入队列 * * @param item * @throws InterruptedException */ public synchronized void enqueue(Object item) throws InterruptedException { while (this.queue.size() == this.capacity) { wait(); } if (this.queue.size() == 0) { notifyAll(); } this.queue.add(item); } /** * 出队列 * * @return * @throws InterruptedException */ public synchronized Object dequeue() throws InterruptedException { while (this.queue.size() == 0) { wait(); } if (this.queue.size() == this.capacity) { notifyAll(); } return this.queue.remove(0); } }
注意:
1)在enqueue和dequeue方法内部,只有队列的大小等于上限(capacity)或者下限(0)时,才调用notifyAll方法,小于时不调用。
如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,就不需要唤醒,都能够正常的往队列中添加或者移除元素。
2)enqueue和dequeue方法都加了synchronized ,在进入synchronized的时候获取锁,退出的时候释放锁。而如果没有synchronized,直接使用wait/notify时,无法确认哪个锁。
3、LinkedBlockingQueue
在Java.util.concurrent包下提供了若干个阻塞队列,其中LinkedBlockingQueue基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
首先看一下LinkedBlockingQueue类中的几个成员变量:
private static final long serialVersionUID = -6903933977591709194L; private final int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = this.takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = this.putLock.newCondition();
可以看出,LinkedBlockingQueue中用来存储元素的实际上是一个链表,head和last分别表示链表的头节点和下一节点, capacity表示队列的容量。
takelock,putLock 是可重入锁,notEmpty和notFull是等待条件。
下面看一下LinkedBlockingQueue的构造器,构造器有三个重载版本:
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int paramInt) { if (paramInt <= 0) { throw new IllegalArgumentException(); } this.capacity = paramInt; this.last = (this.head = new Node(null)); } public LinkedBlockingQueue(Collection<? extends E> paramCollection) { this(Integer.MAX_VALUE); ReentrantLock localReentrantLock = this.putLock; localReentrantLock.lock(); try { int i = 0; Iterator localIterator = paramCollection.iterator(); while (localIterator.hasNext()) { Object localObject1 = localIterator.next(); if (localObject1 == null) { throw new NullPointerException(); } if (i == this.capacity) { throw new IllegalStateException("Queue full"); } enqueue(new Node(localObject1)); i++; } this.count.set(i); } finally { localReentrantLock.unlock(); } }
第一个构造器默认容量是Integer.MAX_VALUE,第二个构造器只有一个参数用来指定容量,第三个构造器可以指定一个集合进行初始化。
然后看它的两个关键方法的实现:put()和take():
public void put(E paramE) throws InterruptedException {
// 确保放入的元素是非空的 if (paramE == null) { throw new NullPointerException(); } int i = -1; Node localNode = new Node(paramE); ReentrantLock localReentrantLock = this.putLock; AtomicInteger localAtomicInteger = this.count;
// 响应中断 localReentrantLock.lockInterruptibly(); try {
while (localAtomicInteger.get() == this.capacity) { this.notFull.await(); // 阻塞 } enqueue(localNode); i = localAtomicInteger.getAndIncrement(); if (i + 1 < this.capacity) { this.notFull.signal(); ///使用了signal()提高性能, put的时候对notFull 条件队列中阻塞的线程进行唤醒。 } } finally { localReentrantLock.unlock(); } if (i == 0) { signalNotEmpty(); // put 的时候,会在i == 0 的情况下对 notEmpty 条件队列中阻塞的线程进行唤醒,但是这个需要获取takeLock锁。
}
}
public E take() throws InterruptedException { int i = -1; AtomicInteger localAtomicInteger = this.count; ReentrantLock localReentrantLock = this.takeLock; localReentrantLock.lockInterruptibly(); Object localObject1; try { while (localAtomicInteger.get() == 0) { this.notEmpty.await(); } localObject1 = dequeue(); i = localAtomicInteger.getAndDecrement(); if (i > 1) { this.notEmpty.signal(); } } finally { localReentrantLock.unlock(); } if (i == this.capacity) { signalNotFull(); } return (E)localObject1; }
跟put方法实现很类似,只不过put方法等待的是notFull信号,而take方法等待的是notEmpty信号
LinkedBlockingQueue内部维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。
而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
LinkedBlockingQueue内部使用ReentrantLock实现插入锁(putLock)和取出锁(takeLock)。putLock上的条件变量是notFull,即可以用notFull唤醒阻塞在putLock上的线程。takeLock上的条件变量是notEmtpy,即可用notEmpty唤醒阻塞在takeLock上的线程。
本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-03/141978.htm