阻塞队列BlockingQueue详解

阻塞队列是生产者消费者模式的经典体现。

我们在曾在Java线程池详解中自己实现过一个阻塞队列,这篇文章我们来研究一下JDK中的阻塞队列:

BlockingQueue接口主要方法

抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

其中抛出异常特殊值都是来自父接口的方法,它们都是非阻塞的,分别代表如果操作不成功抛出 IllegalStateException 异常、如果操作不成功返回false。

而“阻塞”一列会一直保持等待直到操作成功,但是等待过程中可以被interrupt打断。

“超时”一列只等待一个限定时间,时间内没有操作成功则返回false。

ArrayBlockingQueue

内部结构

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

ArrayBlockingQueue内部使用数组来实现队列操作,有两个指针takeIndex和putIndex分别指向要取出的位置(队列头)和要放入的位置(队列尾),实现了一个类似于循环队列的结构。使用一把锁管理出队、入队操作。

入队、出队原理

入队通过enqueue函数实现:

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    //如果队列尾超过数组尾部指向数组头
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

入队完成之后调用notEmpty.signal()唤醒一个消费者线程。

出队通过dequeue函数实现:

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

出队完成后调用notFull.signal();唤醒一个生产者线程。

构造方法及特点总结

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
  • ArrayBlockingQueue的锁可以设置为公平或非公平的,默认为非公平的
  • ArrayBlockingQueue构造时必须设置容量,容量不会变化。
  • ArrayBlockingQueue的入队、出队共用一把锁,不能同时进行

LinkedBlockingQueue

内部结构

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

transient Node<E> head;
private transient Node<E> last;
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue内部使用链表来实现队列操作,有两个节点指针head和last分别指向哨兵节点、数组的尾节点。与ArrayBlockingQueue不同的是使用了两把锁分别管理入队和出队。

入队、出队原理

入队:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //由于count使用的是AtomicInteger,即使出队操作冲突了也没关系
        while (count.get() == capacity) {
            notFull.await();
        }
        //enqueue函数中就是简单的把节点连接到链表尾
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            //如果插入后容量未满,则通知一个生产者
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    //如果插入前容量是0则通知一个消费者
    if (c == 0)
        signalNotEmpty();
}

出队:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        //dequeue中就是直接将head.next指向了next.next
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            //如果出队前容量大于1,则通知一个消费者
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        //如果出队前容量是满的,则通知一个生产者
        signalNotFull();
    return x;
}

可以看出LinkedBlockingQueue的并发粒度相较于ArrayBlockingQueue有所提高,可以让入队出队同时进行,signal的条件和时机也是考虑到了入队出队的并发执行。

构造方法及特点总结

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}


public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
  • LinkedBlockingQueue的head始终指向哨兵节点
  • LinkedBlockingQueue构造时可以设置容量,默认为int的最大值
  • LinkedBlockingQueue的入队、出队分别用了一把锁,可以同时进行。

思考:为什么ArrayBlockingQueue的入队出队不能分别使用一把锁?

由于ArrayBlockingQueue的内部结构是使用数组实现的,类似于循环队列的结构,意味着可能出现入队的指针+1==出队的指针的情况,那么一旦先入队,要出队的元素就会丢失,并把刚才入队的元素出队了,造成极严重的后果。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e9%98%bb%e5%a1%9e%e9%98%9f%e5%88%97blockingqueue%e8%af%a6%e8%a7%a3/

(0)
彭晨涛彭晨涛管理者
上一篇 2020年2月14日
下一篇 2020年2月14日

相关推荐

  • Java中的四种内部类

    我发现最近真是越来越没有东西写了。。。不可能天天学习新知识啊,最近在复习阶段了,复习的东西大多数是博客里写过的/(ㄒoㄒ)/ 复习Java基础的时候认真看了一下Java的内部类,这…

    Java 2020年5月23日
    0110
  • Java反射机制和动态代理详解

    反射 概念 反射机制: 指的是可以于运行时加载、探知、使用编译期间完全未知的类。 程序在运行状态中,可以动态加载一个只有名称的类,对于任意一个已加载的类,都能够知道这个类的所有属性…

    Java 2020年2月15日
    0630
  • JavaIO-缓冲流与转换流

    缓冲流 概述 缓冲流,也叫高效流,是对4个基本的FileXxx 流的增强,所以也是4个流,按照数据类型分类: 字节缓冲流:BufferedInputStream,BufferedO…

    Java 2020年2月4日
    0140
  • 对象的输入输出-Java序列化机制

    对象序列化和反序列化,在Java中体现为两种字节流: ObjectInputStream、ObjectOutputStream 序列化的概念 指堆内存中的java对象数据,通过某种…

    Java 2019年12月21日
    0170
  • JDK8-Stream流库详解

    流提供了一种让我们可以在比集合更高的概念级别上指定计算的数据视图。通过使用流,我们可以说明想要完成什么任务,而不是说明如何去实现它。 流的创建 Collection.stream(…

    Java 2020年2月11日
    0150
  • Java日志框架JUL和Log4j介绍

    日志文件是用于记录系统操作事件的文件集合,可分为事件日志和消息日志。具有处理历史数据、诊断问题的追踪以及理解系统的活动等重要作用。 日志实现:JUL、logback、log4j、l…

    2020年2月14日
    0340
  • LinkedList源码分析

    相关文章:ArrayList源码分析 List家族一览: LinkedList简介 LinkedList的超类有List、Queue,说明它既有List的性质也有Queue的性质,…

    2019年12月2日
    080
  • 深入理解java虚拟机第三版读书笔记07

    附: Java虚拟机规范-Java虚拟机指令集:JDK8 Java虚拟机规范-Java虚拟机指令集:JDK13 续深入理解java虚拟机第三版读书笔记06 字节码指令简介 Java…

    Java 2020年1月21日
    0230
  • Java线程池详解

    线程池就是享元模式和生产者消费者模式的应用 动手实现线程池 步骤1:自定义拒绝策略接口 @FunctionalInterface // 拒绝策略 interface RejectP…

    2020年2月3日
    0340
  • 按键排序的Map-TreeMap源码分析

    总结 总结放前面(这篇挺短的)1. TreeMap基于红黑树实现,可以对Map中的key排序2. 它的排序和定位需要依赖比较器或实现 Comparable 接口,也因此不需要key…

    Java 2020年2月16日
    0140

发表回复

登录后才能评论