解读Java并发队列BlockingQueue
解读Java并发队列BlockingQueue最近得空,想写篇文章好好说说 java 线程池问题,我相信很多人都一知半解的,包括我自己在仔仔细细看源码之前,也有许多的不解,甚至有些地方我一直都没有理解到位。说到线程池实现,那么就不得不涉及到各种 BlockingQueue 的实现,那么我想就 BlockingQueue 的问题和大家分享分享我了解的一些知识。本文没有像之前分析 AQS 那样一行一行源码分析了,不过还是把其中最重要和最难理解的代码说了一遍,所以不免篇幅略长。本文涉及到比较多的 Doug Lea 对 BlockingQueue 的设计思想,希望有心的读者真的可以有一些收获,我觉得自己还是写了一些干货的。本文直接参考 Doug Lea 写的 java doc 和注释,这也是我们在学习 java 并发包时最好的材料了。希望大家能有所思、有所悟,学习 Doug Lea 的代码风格,并将其优雅、严谨的作风应用到我们写的每一行代码中。BlockingQueue开篇先介绍下 BlockingQueue 这个接口的规则,后面再看其实现。首先,最基本的来说, BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:1、抛出异常;2、返回特殊值(null 或 true/false,取决于具体的操作);3、阻塞等待此操作,直到这个操作成功;4、阻塞等待此操作,直到成功或者超时指定时间。总结如下:BlockingQueue 的各个实现都遵循了这些规则,当然我们也不用死记这个表格,知道有这么回事,然后写代码的时候根据自己的需要去看方法的注释来选取合适的方法即可。www.gw638.cn对于 BlockingQueue,我们的关注点应该在 put(e) 和 take() 这两个方法,因为这两个方法是带阻塞的。BlockingQueue 不接受 null 值的插入,相应的方法在碰到 null 的插入时会抛出 NullPointerException 异常。null 值在这里通常用于作为特殊值返回(表格中的第三列),代表 poll 失败。所以,如果允许插入 null 值的话,那获取的时候,就不能很好地用 null 来判断到底是代表失败,还是获取的值就是 null 值。一个 BlockingQueue 可能是有界的,如果在插入的时候,发现队列满了,那么 put 操作将会阻塞。通常,在这里我们说的×××队列也不是说真正的×××,而是它的容量是 Integer.MAX_VALUE(21亿多)。BlockingQueue 是设计用来实现生产者-消费者队列的,当然,你也可以将它当做普通的 Collection 来用,前面说了,它实现了 java.util.Collection 接口。例如,我们可以用 remove(x) 来删除任意一个元素,但是,这类操作通常并不高效,所以尽量只在少数的场合使用,比如一条消息已经入队,但是需要做取消操作的时候。BlockingQueue 的实现都是线程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素,这个是允许的,取决于具体的实现。BlockingQueue 不支持 close 或 shutdown 等关闭操作,因为开发者可能希望不会有新的元素添加进去,此特性取决于具体的实现,不做强制约束。最后,BlockingQueue 在生产者-消费者的场景中,是支持多消费者和多生产者的,说的其实就是线程安全问题。相信上面说的每一句都很清楚了,BlockingQueue 是一个比较简单的线程安全容器,下面我会分析其具体的在 JDK 中的实现,这里又到了 Doug Lea 表演时间了。BlockingQueue 实现之 ArrayBlockingQueueArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。ArrayBlockingQueue 共有以下几个属性:/ 用于存放元素的数组final Object items;/ 下一次读取操作的位置int takeIndex;/ 下一次写入操作的位置int putIndex;/ 队列中的元素数量int count; / 以下几个就是控制并发用的同步器final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull; 我们用个示意图来描述其同步机制:ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。对于 ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:队列容量,其限制了队列中最多允许的元素个数;指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。www.f-1.ccBlockingQueue 实现之 LinkedBlockingQueue底层基于单向链表实现的阻塞队列,可以当做×××队列也可以当做有界队列来使用。看构造方法:/ 传说中的×××队列public LinkedBlockingQueue() this(Integer.MAX_VALUE);/ 传说中的有界队列public LinkedBlockingQueue(int capacity) if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);我们看看这个类有哪些属性:/ 队列容量private final int capacity; / 队列中的元素数量private final AtomicInteger count = new AtomicInteger(0); / 队头private transient Node<E> head; / 队尾private transient Node<E> last; / take, poll, peek 等读操作的方法需要获取到这个锁private final ReentrantLock takeLock = new ReentrantLock(); / 如果读操作的时候队列是空的,那么等待 notEmpty 条件private final Condition notEmpty = takeLock.newCondition(); / put, offer 等写操作的方法需要获取到这个锁private final ReentrantLock putLock = new ReentrantLock(); / 如果写操作的时候队列是满的,那么等待 notFull 条件private final Condition notFull = putLock.newCondition();这里用了两个锁,两个 Condition,简单介绍如下:takeLock 和 notEmpty 怎么搭配:如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。putLock 需要和 notFull 搭配:如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。首先,这里用一个示意图来看看 LinkedBlockingQueue 的并发读写控制,然后再开始分析源码:看懂这个示意图,源码也就简单了,读操作是排好队的,写操作也是排好队的,唯一的并发问题在于一个写操作和一个读操作同时进行,只要控制好这个就可以了。先上构造方法:public LinkedBlockingQueue(int capacity) if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);注意,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也总是获取头节点后面的一个节点。count 的计数值不包括这个头节点。我们来看下 put 方法是怎么将元素插入到队尾的:public void put(E e) throws InterruptedException if (e = null) throw new NullPointerException(); / 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。 int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; / 必须要获取到 putLock 才可以进行插入操作 putLock.lockInterruptibly(); try / 如果队列满,等待 notFull 的条件满足。 while (count.get() = capacity) notFull.await(); / 入队 enqueue(node); / count 原子加 1,c 还是加 1 前的值 c = count.getAndIncrement(); / 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。 / 哪些线程会等待在 notFull 这个 Condition 上呢? if (c + 1 < capacity) notFull.signal(); finally / 入队后,释放掉 putLock putLock.unlock(); / 如果 c = 0,那么代表队列在这个元素入队前是空的(不包括head空节点), / 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作 if (c = 0) signalNotEmpty();