🔍 基于JDK1.8的ArrayBlockingQueue源码分析。

基本知识

  • 始于:JDK 1.5

  • 作者:Doug Lea

  • 特点:线程安全,有界并阻塞

  • 结构:数组

  • 类图:

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** 存储元素的数组 */
final Object[] items;

/** 队列头部位置,take, poll, peek, remove操作的位置 */
int takeIndex;

/** 队列尾部位置,put, offer, add操作的位置 */
int putIndex;

/** 队列元素的数量 */
int count;

/** 可重入独占锁 */
final ReentrantLock lock;

/** 当队列为空时,执行出队操作的线程会被放入的条件队列 */
private final Condition notEmpty;

/** 当队列为满时,执行入队操作的线程会被放入的条件队列 */
private final Condition notFull;

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 容量构造
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

// 公平性构造
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

// 集合构造
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
// 获取独占锁,并加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
// 释放独占锁
lock.unlock();
}
}

默认使用非公平锁。

offer(E e)

用途:向队列尾部插入一个元素,如果队列有空闲空间则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false。如果e为null则抛出NPE异常。该方法是非阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public boolean offer(E e) {
// (1) 参数验空
checkNotNull(e);
// (2) 获取独占锁,并加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// (3) 如果队列已满,返回false
if (count == items.length)
return false;
// (4) 队列未满,插入元素,返回true
else {
enqueue(e);
return true;
}
} finally {
// (5) 释放独占锁
lock.unlock();
}
}

// 向队列尾部插入元素
private void enqueue(E x) {
// (6) 将元素放入相应位置
final Object[] items = this.items;
items[putIndex] = x;
// (7) 如果当前队列已满,下一次存放位置为起始点
if (++putIndex == items.length)
putIndex = 0;
// (8) 队列元素数量递增
count++;
// (9) 添加了元素,唤醒出队条件队列的阻塞进行消费
notEmpty.signal();
}

代码(1)进行参数校验,如果为空则抛出NullPointerException异常。

代码(2)获取独占锁,当前线程获取该锁后,其他入队和出队操作的线程都会被阻塞挂起而后被放入lock的AQS阻塞队列。

代码(3)判断队列是否已满,满则直接返回false,否则调用enqueue方法后返回true。

在enqueue方法中,首先把当前元素放入items数组,然后计算下一个元素应该存放的下标位置,并递增元素个数计数器,添加元素之后,激活因为进行消费操作而被阻塞的线程。这里由于在操作共享变量count前加了独占锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是从CPU缓存或者寄存器获取。

代码(5)释放锁后,会把修改的共享变量值刷新到主内存中,这样其他线程通过加锁再次读取这些共享变量时,就可以看到最新的值。

put(E e)

用途:向队列尾部插入一个元素,如果队列有空闲位置则插入后直接返回true,如果队列已满则阻塞当前线程知道队列有空闲并插入成功后返回true,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。如果e为null则抛出NPE异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void put(E e) throws InterruptedException {
// (1) 参数验空
checkNotNull(e);
// (2) 获取独占锁,并加锁(可打断)
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// (3) 如果队列已满,则把当前线程放入notFull条件队列
while (count == items.length)
notFull.await();
// (4) 队列产生空闲位置,插入元素
enqueue(e);
} finally {
// (5) 释放独占锁
lock.unlock();
}
}

poll()

用途:从队列头部获取并移除一个元素,如果队列为空则返回null,该方法是不阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public E poll() {
// (1) 获取独占锁,并加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// (2) 当前队列为空则返回null
// 非空则获取并移除队尾元素,并返回队尾元素的值
return (count == 0) ? null : dequeue();
} finally {
// (3) 释放独占锁
lock.unlock();
}
}

// 获取并移除队列尾部元素
private E dequeue() {
final Object[] items = this.items;
// (4) 获取元素值
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
// (5) 数组中的值为null
items[takeIndex] = null;
// (6) 如果当前队列已满,下一次获取位置为起始点
if (++takeIndex == items.length)
takeIndex = 0;
// (7) 队列元素数量递减
count--;
// (8) 更新迭代器状态
if (itrs != null)
itrs.elementDequeued();
// (9) 移除了元素,队列产生空位,唤醒入队条件队列的阻塞线程进行生产
notFull.signal();
return x;
}

take()

用途:获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E take() throws InterruptedException {
// (1) 获取独占锁,并加锁(可打断)
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// (2) 如果队列为空,则把当前线程放入notEmpty条件队列
while (count == 0)
notEmpty.await();
// (3) 获取队头元素
return dequeue();
} finally {
// (4) 释放独占锁
lock.unlock();
}
}

peek()

用途:获取队列头部元素但是不从队列里面移除,如果队列为空则返回null,该方法是不阻塞的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E peek() {
// (1) 获取独占锁,并加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// (2) 从数组中获取队头下标
return itemAt(takeIndex);
} finally {
// (3) 释放独占锁
lock.unlock();
}
}

@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}

size()

用途:获取队列元素个数。

1
2
3
4
5
6
7
8
9
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}

size方法比较简单,上锁后直接返回count,并在返回前释放锁。

因为count并没有被声明为volatile,所以无法保证线程可见性。

最后小结

ArrayBlockingQueue通过全局独占锁实现了同时只有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似于在方法上添加synchronized的意思。

其中offer和poll操作通过简单的加速搜进行入队、出队操作,而put、take操作则使用条件变量实现,如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。另外,相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的。