🔍 基于JDK1.8的LinkedBlockingQueue源码分析。

基本知识

  • 始于:JDK 1.5

  • 作者:Doug Lea

  • 特点:线程安全,有界并阻塞,生产-消费模型

  • 结构:单向链表

  • 类图:

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** 容量 */
private final int capacity;

/** 节点数量 */
private final AtomicInteger count = new AtomicInteger();

/** 头节点 */
transient Node<E> head;

/** 尾节点 */
private transient Node<E> last;

/** 执行take、poll(消费性操作)需要获取的锁,简称消费锁 */
private final ReentrantLock takeLock = new ReentrantLock();

/** 当队列为空时,执行出队操作的线程会被放入的条件队列 */
private final Condition notEmpty = takeLock.newCondition();

/** 执行put、offer(生产性操作)需要获取的锁,简称生产锁 */
private final ReentrantLock putLock = new ReentrantLock();

/** 当队列为满时,执行入队操作的线程会被放入的条件队列 */
private final Condition notFull = putLock.newCondition();

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 无参构造,默认容量为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

// 容量构造
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

// 集合构造
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
// 获取生产锁,并加锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
// 释放生产锁
putLock.unlock();
}
}

offer(E e)

用途:向队列尾部添加一个元素。如果队列中有空闲位置则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public boolean offer(E e) {
// (1) 参数验空
if (e == null) throw new NullPointerException();
// (2) 如果当前队列满则丢弃将要放入的元素,返回false
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
// (3) 构造新节点
Node<E> node = new Node<E>(e);
// (4) 获取putLock独占锁,并加锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// (5) 如果队列不满则添加到队列尾部,并递增元素计数
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
// (6) 如果新元素入队后还有剩余空间,则唤醒入队阻塞队列中的线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// (7) 释放putLock锁
putLock.unlock();
}
// (8) c为新节点添加前的队列节点元素数量
// c == 0表示添加后队列中至少有一个元素,唤醒出队条件队列中的阻塞线程进行消费
if (c == 0)
signalNotEmpty();
return c >= 0;
}

// 添加元素到队列尾部
private void enqueue(Node<E> node) {
last = last.next = node;
}

// 唤醒出队条件队列中的阻塞线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

offer(E e, long timeout, TimeUnit unit)

用途:向队列尾部添加一个元素。如果队列中有空闲位置则插入成功后返回true,如果队列已满等待指定时间看是否有空余位置,有空闲位置则插入成功后返回true,超时后返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// (1) 参数验空
if (e == null) throw new NullPointerException();
// (2) 记录需要等待时间
long nanos = unit.toNanos(timeout);
int c = -1;
// (3) 获取putLock独占锁
final ReentrantLock putLock = this.putLock;
// (4) 获取队列元素数量
final AtomicInteger count = this.count;
// (5) 加锁
putLock.lockInterruptibly();
try {
// (6) 队列已满,看在超时时间之内是否有消费操作产生空余位置
while (count.get() == capacity) {
// (7) 超时,直接返回false
if (nanos <= 0)
return false;
// (8) awaitNanos返回提供时间的剩余时间,小于0代表超时
nanos = notFull.awaitNanos(nanos);
}
// (9) 将新节点添加到队列尾部,队列元素数量递增
enqueue(new Node<E>(e));
c = count.getAndIncrement();
// (10) 队列中尚有空位,唤醒入队条件队列中的阻塞线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// (11) 释放锁
putLock.unlock();
}
// (12) 唤醒出队条件队列中的阻塞线程
if (c == 0)
signalNotEmpty();
return true;
}

poll()

用途:从队列头部获取并移除一个元素,如果队列为空则返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public E poll() {
// (1) 如果队列为空则返回null
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// (2) 获取独占锁,加锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// (3) 队列不空则出队,递减元素数量
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
// (4) c大于1,当前线程移除队列里面一个元素后队列不为空
if (c > 1)
notEmpty.signal();
}
} finally {
// (5) 释放锁
takeLock.unlock();
}
// (6) c为移除节点前的队列元素数量
// c == capacity表示移除后产生一个空位,唤醒入队条件队列的阻塞线程进行生成
if (c == capacity)
signalNotFull();
return x;
}

// 移除队首节点head
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

// 唤醒入队队列中阻塞线程
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

remove(Object o)

用途:删除队列中指定的元素,有则删除并返回true,没有则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public boolean remove(Object o) {
// (1) 参数验空
if (o == null) return false;
// (2) 双重加锁
fullyLock();
try {
// (3) 遍历队列
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// (4) 找到节点,则删除节点并返回true
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
// (5) 找不到则返回false
return false;
} finally {
// (6) 双重解锁
fullyUnlock();
}
}

// 删除节点p
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
// 如果删除前队列为满,那么删除后则产生一个空位,唤醒入队条件队列的阻塞线程进行生产
if (count.getAndDecrement() == capacity)
notFull.signal();
}

// 给生产锁和消费锁都加锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}

// 生产锁和消费锁都释放锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

size()

用途:获取当前队列元素个数。

1
2
3
public int size() {
return count.get();
}

由于进行出队、入队操作时的count是加锁的,所以结果相比ConcurrentLinkedQueue的size方法比较准确的。

为什么ConcurrentLinkedQueue只能通过遍历链表而不能使用原子变量呢?

因为使用原子变量保存队列节点个数需要保证入队、出队操作和原子变量操作都是原子性操作,而ConcurrentLinkedQueue使用的是CAS无锁算法,无法保证。

最后小结

LinkedBlockingQueue的内部是通过单向链表实现的,使用头、尾节点来进行入队和出队操作,即入队操作都是对头节点进行操作,出队操作都是对尾节点进行操作。

如上图所示,对头、尾节点的操作分别使用了单独的独占锁从而保证了原子性,所以出队和入队操作是可以同时进行的。另外对头、尾节点都配备了一个条件队列,用来存放被阻塞的线程,并结合入队、出队操作实现了一个生产消费模型。