🔍 基于JDK1.8的ConcurrentLinkedQueue源码分析。

基本知识

  • 始于:JDK 1.5

  • 作者:Doug Lea

  • 特点:线程安全,无界非阻塞

  • 结构:单向链表

  • 类图:

主要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 节点类
private static class Node<E> {
// 节点值
volatile E item;
// 后继结点
volatile Node<E> next;

// 构造函数调用unsafe.putObject
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}

// 节点值和后继节点的修改操作全部使用CAS实现

boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}

boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
...
}

// 头节点(null,伪节点)
private transient volatile Node<E> head;

// 尾节点
private transient volatile Node<E> tail;

// unsafe对象,用于CAS操作
private static final sun.misc.Unsafe UNSAFE;

// 头节点的域偏移地址
private static final long headOffset;

// 尾节点的域偏移地址
private static final long tailOffset;

// 静态初始化
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentLinkedQueue.class;
headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail"));
} catch (Exception e) {
throw new Error(e);
}
}

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 无参构造
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

// 集合构造
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}

offer(E e)

用途:在队列末尾添加一个元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public boolean offer(E e) {
// e为null抛出NullPointerException
checkNotNull(e);
// 创建新节点,构造函数内部调用unsafe.putObject
final Node<E> newNode = new Node<E>(e);

// (1) 遍历链表,无限循环+CAS重试,直到从(3)返回
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// (2) 当前节点的后继节点为空,说明当前为最后一个节点
if (q == null) {
// (3) 使用CAS操作设置p的后继节点为新节点
if (p.casNext(null, newNode)) {
// CAS成功,则说明新节点已经插入链表
// 然后通过CAS操作设置当前尾节点
// (4) 当前节点不为尾节点,说明有线程抢先一步更新了尾节点
// 把尾节点原子更新为新节点
if (p != t)
casTail(t, newNode); // Failure is OK.
return true;
}
}
// (5) 当前节点被自引用,说明当前节点已经被删除 重新设置p的值
else if (p == q)
// 多线程操作,由于poll操作后 有可能把head变为自引用
// head.next = next,所以这里需要重新找新的head
// 因为head后面的节点才是正常的节点
p = (t != (t = tail)) ? t : head;
// (6) 当前节点不是尾节点,重新寻找尾节点
else
// p == t || t = tail => 没有其他线程更新tail,
// p != t && t != tail => 有其他线程更新tail,那么最新的tail节点,即q节点
p = (p != t && t != (t = tail)) ? t : q;
}
}

(1)单线程执行offer(item)

首先进行参数判空,空则抛出NPE异常,使用item作为参数构造一个新节点item

执行代码(1),这时候headtailpt同时指向了哨兵节点。同时q指向哨兵节点的后继。队列状态如下:

执行代码(2),发现 q == null则执行代码(3),通过CAS操作将p.next设置为item

这里 p == t成功,所以没有设置尾节点,直接返回true。队列状态如下:

(2)双线程执行

假设两个线程同时执行代码(3),线程1调用offer(item1),线程2调用offer(item2)

假设线程1先执行CAS操作,那么更新pnext节点为item1,那么线程2执行CAS操作的时候,发现pnext节点并不是null,然后执行重新循环,执行代码(1)。队列状态如下:

线程2执行代码(2),由于q != null,于是线程2执行代码(6),p = q。队列状态如下:

线程2重新循环,执行代码(1)。队列状态如下:

线程执行代码(2),由于q == null,接着执行代码(3),通过CAS操作更新pnext节点为item2

然后执行代码(4),由于p != t,所以通过CAS操作重新设置尾节点为item2。队列状态如下:

(3)第三情况

在以上的两种情况中,还差代码(5)的执行情况没有出现。

poll操作后可能会存在的一种情况。如下图所示:

执行代码(2)时,队列状态如下:

此时q != null 并且p == qtail结点的item为空,但是tail不为空),所以执行代码(5),由于t == tail,所以p = head

然后重新循环,执行代码(1),并且执行到(2)。队列状态如下:

此时由于q == null,执行代码(3)进行CAS操作,如果没有竞争线程,则CAS成功,pnext结点设置为新节点,然后执行代码(4),由于p != t,所以设置新节点为新的尾节点。队列状态如下:

这里自引用的结点则会被GC。

可见,offer操作的关键步骤是代码(3),通过CAS操作来控制某一时刻只能有1个线程可以追加元素到队列末尾。进行CAS竞争失败的线程会通过循环不断尝试CAS操作,直到CAS成功才会返回,也就是通过无线循环不断进行CAS尝试方式来代替阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。

poll()

用途:在队列头部获取并移除一个元素,如果队列为空则返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public E poll() {
// goto标记
restartFromHead:
// (1) 无限循环
for (;;) {
// (2) 从头节点开始遍历
for (Node<E> h = head, p = h, q;;) {
// (3) 保存当前节点值
E item = p.item;
// (4) 当前节点值不为空,则通过CAS操作将其设置为null
if (item != null && p.casItem(item, null)) {
// (5) CAS成功则标记当前节点并从链表中移除
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// (6) 当前队列为空,则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// (7) 当前节点自引用,重新寻找头节点
else if (p == q)
continue restartFromHead;
// (8) 当前节点指向下个节点
else
p = q;
}
}
}

// 更新头节点
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
// 自引用,从而孤立,被GC
h.lazySetNext(h);
}

(1)poll操作从队列头获取元素,所以循环从head节点开始,执行代码(3)获取当前队列的头节点。队列开始为空的时候状态如下:

由于head节点为空,所以会执行到代码(6),如果这个过程中没有线程调用offer方法,则此时q == null。队列状态如下:

所以会执行updateHead方法,由于h == p,所以不会执行casHead,直接返回null。

(2)假设执行到代码(6)时已经有其他线程调用了offer方法并成功添加一个元素到队列,这时候q指向的是新增元素的节点。队列状态如下:

执行代码(6),q != null,继续执行代码(7),p != q,最后执行代码(8),p = q。队列状态如下:

继续循环,执行代码(3)后执行代码(4),此时p.item不为null,于是通过CAS操作尝试将p.item设置为null,假设此时没有其他线程进行poll操作,则CAS操作成功后执行代码(5),由于此时p != h,所以设置头节点为p,并且设置hnext节点为h自己,返回从移除节点值item。队列状态如下:

这个状态就是offer操作执行代码(5)的状态。

(3)假设现在一个线程调用了poll操作,则在执行代码(4)时队列状态如下:

这时候执行代码(6)返回。

(4)现在还有代码分支(7)没有执行过。

假设线程1执行poll操作时当前队列状态如下:

那么执行代码(4)时,通过CAS操作设置p.item为null,假设CAS设置成功则标记该节点并从队列中将其清除,队列状态如下:

然后,由于p != h,所以会执行uploadHead方法,假如线程1执行uploadHead方法前,另外一个线程2开始poll操作,这时候线程2的p指向head节点,但是还未执行到代码(6),队列状态如下:

然后线程1执行updateHead方法,执行完毕后线程1退出,队列状态如下:

然后线程2继续执行代码(6),由于p自引用,所以p == q,于是跳转到外层循环restartFromHead,重新获取head,队列状态如下:

总结:poll方法在移除一个元素时,只是简单地使用CAS操作把当前节点的item设置为null,然后通过设置头节点将该元素从队列中移除,被移除的节点就成了孤立节点从而被GC。另外,如果执行分支中发现头节点被修改了,要跳转到外层循环重新获取新的头节点。

peek()

用途:获取队列头部的一个元素,如果队列为空则返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public E peek() {
// goto标记
restartFromHead:
// (1) 无限循环
for (;;) {
// (2) 从头节点开始遍历
for (Node<E> h = head, p = h, q;;) {
// (3) 保存当前节点的值
E item = p.item;
// (4) 当前节点值不为空且下一个节点值为空
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
// (5) 当前节点自引用,重新寻找头节点
else if (p == q)
continue restartFromHead;
// (6) 当前结点指向下个结点
else
p = q;
}
}
}

第一次循环时会发现item 为空,第二次循环时p指向队列第一个元素,执行代码(4),q == null成立。

(1)队列为空时,队列状态如下:

这时候执行updateHead方法,由于p == h,所以不进行任何操作,直接返回null。

(2)队列至少有一个元素时,队列状态如下:

这时候执行代码(6),p = q,队列状态如下:

重新循环,执行代码(4)时,item != null,所以执行updateHead方法,由于h != p,所以重新头节点为p,队列状态如下:

即剔除了哨兵节点。

总结:peek操作和poll操作类似,只是前者只获取头节点但不删除。另外,在第一次调用peek方法时会删除哨兵节点,并让头节点指向队列中的第一个元素或者null。

size()

用途:计算当前队列值不为空的元素个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// 最大值Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE)
break;
return count;
}

// 获取第一个队列元素,无则为null
Node<E> first() {
// goto标记
restartFromHead:
// (1) 无限循环
for (;;) {
// (2) 从头节点开始遍历
for (Node<E> h = head, p = h, q;;) {
// (3) 标记当前节点是否有值
boolean hasItem = (p.item != null);
// (4) 当前节点有值,或者当前节点是最后一个元素
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
// (5) 当前节点自引用,重新寻找头节点
else if (p == q)
continue restartFromHead;
// (6) 当前节点指向下个节点
else
p = q;
}
}
}

// 获取当前节点的下个节点,如果是自引用则返回真正的头节点
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}

remove(Object o)

用途:删除指定值的元素,队列中存在一个则直接删除,存在多个则删除第一个,并返回true,否则返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean remove(Object o) {
// (1) 判空
if (o != null) {
Node<E> next, pred = null;
// (2) 遍历链表
for (Node<E> p = first(); p != null; pred = p, p = next) {
// (3) 标记是否移除
boolean removed = false;
// (4) 当前节点的值
E item = p.item;
// (5) 匹配成功,则使用CAS设置当前节点的值设置为null
// 失败的线程继续循环查找队列中是否其他匹配的元素
if (item != null) {
if (!o.equals(item)) {
next = succ(p);
continue;
}
removed = p.casItem(item, null);
}

// (6) 获取下一个节点
next = succ(p);
// (7) 如果前驱节点和后继节点均不为空,将前驱节点的后继指向后继节点
if (pred != null && next != null)
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}

最后小结

ConcurrentLinkedQueue的底层使用单向链表来保存队列元素,每个元素给封装成节点。队列靠头尾节点来维护,创建队列时头尾节点都指向一个item为null的哨兵节点。第一次执行peek或者first时会把删除哨兵节点,把头节点指向第一个真正的节点。由于使用非阻塞CAS算法,没有加锁,所以在计算size时有可能进行了offerpoll或者remove操作,导致计算的元素个数不精确,所以在并发情况下size方法不推荐使用。

入队、出队都是操作volatile修饰的tailhead节点,要保证在多线程下出入队线程安全,只需要保证这个两个节点操作的可见性和原子性即可。volatile保证操作可见性,CASS保证操作原子性。

offer操作是在tail后面增加元素,也就是调用tail.cadNext方法,而这个方法使用的是CAS操作,只有一个线程会成功,然后失败的线程会循环,重新获取tail,再执行casNext方法。poll操作也通过类似CAS的算法保证出队时移除节点操作的原子性。