阻塞队列BlockingQueue详解

阻塞队列是生产者消费者模式的经典体现。

我们在曾在Java线程池详解中自己实现过一个阻塞队列,这篇文章我们来研究一下JDK中的阻塞队列:

BlockingQueue接口主要方法

抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

其中抛出异常特殊值都是来自父接口的方法,它们都是非阻塞的,分别代表如果操作不成功抛出 IllegalStateException 异常、如果操作不成功返回false。

而“阻塞”一列会一直保持等待直到操作成功,但是等待过程中可以被interrupt打断。

“超时”一列只等待一个限定时间,时间内没有操作成功则返回false。

ArrayBlockingQueue

内部结构

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

ArrayBlockingQueue内部使用数组来实现队列操作,有两个指针takeIndex和putIndex分别指向要取出的位置(队列头)和要放入的位置(队列尾),实现了一个类似于循环队列的结构。使用一把锁管理出队、入队操作。

入队、出队原理

入队通过enqueue函数实现:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    //如果队列尾超过数组尾部指向数组头
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

入队完成之后调用notEmpty.signal()唤醒一个消费者线程。

出队通过dequeue函数实现:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

出队完成后调用notFull.signal();唤醒一个生产者线程。

构造方法及特点总结

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
  • ArrayBlockingQueue的锁可以设置为公平或非公平的,默认为非公平的
  • ArrayBlockingQueue构造时必须设置容量,容量不会变化。
  • ArrayBlockingQueue的入队、出队共用一把锁,不能同时进行

LinkedBlockingQueue

内部结构

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

transient Node<E> head;
private transient Node<E> last;
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue内部使用链表来实现队列操作,有两个节点指针head和last分别指向哨兵节点、数组的尾节点。与ArrayBlockingQueue不同的是使用了两把锁分别管理入队和出队。

入队、出队原理

入队:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //由于count使用的是AtomicInteger,即使出队操作冲突了也没关系
        while (count.get() == capacity) {
            notFull.await();
        }
        //enqueue函数中就是简单的把节点连接到链表尾
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            //如果插入后容量未满,则通知一个生产者
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    //如果插入前容量是0则通知一个消费者
    if (c == 0)
        signalNotEmpty();
}

出队:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        //dequeue中就是直接将head.next指向了next.next
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            //如果出队前容量大于1,则通知一个消费者
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        //如果出队前容量是满的,则通知一个生产者
        signalNotFull();
    return x;
}

可以看出LinkedBlockingQueue的并发粒度相较于ArrayBlockingQueue有所提高,可以让入队出队同时进行,signal的条件和时机也是考虑到了入队出队的并发执行。

构造方法及特点总结

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}


public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
  • LinkedBlockingQueue的head始终指向哨兵节点
  • LinkedBlockingQueue构造时可以设置容量,默认为int的最大值
  • LinkedBlockingQueue的入队、出队分别用了一把锁,可以同时进行。

思考:为什么ArrayBlockingQueue的入队出队不能分别使用一把锁?

由于ArrayBlockingQueue的内部结构是使用数组实现的,类似于循环队列的结构,意味着可能出现入队的指针+1==出队的指针的情况,那么一旦先入队,要出队的元素就会丢失,并把刚才入队的元素出队了,造成极严重的后果。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e9%98%bb%e5%a1%9e%e9%98%9f%e5%88%97blockingqueue%e8%af%a6%e8%a7%a3/

发表评论

电子邮件地址不会被公开。