阻塞队列是生产者消费者模式的经典体现。
我们在曾在Java线程池详解中自己实现过一个阻塞队列,这篇文章我们来研究一下JDK中的阻塞队列:
BlockingQueue接口主要方法
抛出异常 | 特殊值 | 阻塞 | 超时 | ||
---|---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) | |
移除 | remove() | poll() | take() | poll(time, unit) | |
检查 | element() | peek() | 不可用 | 不可用 |
其中抛出异常和特殊值都是来自父接口的方法,它们都是非阻塞的,分别代表如果操作不成功抛出 IllegalStateException
异常、如果操作不成功返回false。
而“阻塞”一列会一直保持等待直到操作成功,但是等待过程中可以被interrupt打断。
“超时”一列只等待一个限定时间,时间内没有操作成功则返回false。
ArrayBlockingQueue
内部结构
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
ArrayBlockingQueue内部使用数组来实现队列操作,有两个指针takeIndex和putIndex分别指向要取出的位置(队列头)和要放入的位置(队列尾),实现了一个类似于循环队列的结构。使用一把锁管理出队、入队操作。
入队、出队原理
入队通过enqueue函数实现:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//如果队列尾超过数组尾部指向数组头
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
入队完成之后调用notEmpty.signal()
唤醒一个消费者线程。
出队通过dequeue函数实现:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
出队完成后调用notFull.signal();
唤醒一个生产者线程。
构造方法及特点总结
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
- ArrayBlockingQueue的锁可以设置为公平或非公平的,默认为非公平的
- ArrayBlockingQueue构造时必须设置容量,容量不会变化。
- ArrayBlockingQueue的入队、出队共用一把锁,不能同时进行
LinkedBlockingQueue
内部结构
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
transient Node<E> head;
private transient Node<E> last;
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue内部使用链表来实现队列操作,有两个节点指针head和last分别指向哨兵节点、数组的尾节点。与ArrayBlockingQueue不同的是使用了两把锁分别管理入队和出队。
入队、出队原理
入队:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//由于count使用的是AtomicInteger,即使出队操作冲突了也没关系
while (count.get() == capacity) {
notFull.await();
}
//enqueue函数中就是简单的把节点连接到链表尾
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
//如果插入后容量未满,则通知一个生产者
notFull.signal();
} finally {
putLock.unlock();
}
//如果插入前容量是0则通知一个消费者
if (c == 0)
signalNotEmpty();
}
出队:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
//dequeue中就是直接将head.next指向了next.next
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
//如果出队前容量大于1,则通知一个消费者
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
//如果出队前容量是满的,则通知一个生产者
signalNotFull();
return x;
}
可以看出LinkedBlockingQueue的并发粒度相较于ArrayBlockingQueue有所提高,可以让入队出队同时进行,signal的条件和时机也是考虑到了入队出队的并发执行。
构造方法及特点总结
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
- LinkedBlockingQueue的head始终指向哨兵节点
- LinkedBlockingQueue构造时可以设置容量,默认为int的最大值
- LinkedBlockingQueue的入队、出队分别用了一把锁,可以同时进行。
思考:为什么ArrayBlockingQueue的入队出队不能分别使用一把锁?
由于ArrayBlockingQueue的内部结构是使用数组实现的,类似于循环队列的结构,意味着可能出现入队的指针+1==出队的指针
的情况,那么一旦先入队,要出队的元素就会丢失,并把刚才入队的元素出队了,造成极严重的后果。
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e9%98%bb%e5%a1%9e%e9%98%9f%e5%88%97blockingqueue%e8%af%a6%e8%a7%a3/