ConcurrentLinkedQueue
🔍 基于JDK1.8的ConcurrentLinkedQueue源码分析。
基本知识
始于:JDK 1.5
作者:Doug Lea
特点:线程安全,无界非阻塞
结构:单向链表
类图:
主要属性
1 | // 节点类 |
构造函数
1 | // 无参构造 |
offer(E e)
用途:在队列末尾添加一个元素。
1 | public boolean offer(E e) { |
(1)单线程执行offer(item)
首先进行参数判空,空则抛出NPE异常,使用item作为参数构造一个新节点item
。
执行代码(1),这时候head
、tail
、p
、t
同时指向了哨兵节点。同时q
指向哨兵节点的后继。队列状态如下:

执行代码(2),发现 q == null
则执行代码(3),通过CAS操作将p.next
设置为item
。
这里 p == t
成功,所以没有设置尾节点,直接返回true
。队列状态如下:

(2)双线程执行
假设两个线程同时执行代码(3),线程1调用offer(item1)
,线程2调用offer(item2)
。
假设线程1先执行CAS操作,那么更新p
的next
节点为item1
,那么线程2执行CAS操作的时候,发现p
的next
节点并不是null
,然后执行重新循环,执行代码(1)。队列状态如下:

线程2执行代码(2),由于q != null
,于是线程2执行代码(6),p = q
。队列状态如下:

线程2重新循环,执行代码(1)。队列状态如下:

线程执行代码(2),由于q == null
,接着执行代码(3),通过CAS操作更新p
的next
节点为item2
。
然后执行代码(4),由于p != t
,所以通过CAS操作重新设置尾节点为item2
。队列状态如下:

(3)第三情况
在以上的两种情况中,还差代码(5)的执行情况没有出现。
在poll
操作后可能会存在的一种情况。如下图所示:

执行代码(2)时,队列状态如下:

此时q != null
并且p == q
(tail
结点的item
为空,但是tail
不为空),所以执行代码(5),由于t == tail
,所以p = head
。
然后重新循环,执行代码(1),并且执行到(2)。队列状态如下:

此时由于q == null
,执行代码(3)进行CAS操作,如果没有竞争线程,则CAS成功,p
的next
结点设置为新节点,然后执行代码(4),由于p != t
,所以设置新节点为新的尾节点。队列状态如下:

这里自引用的结点则会被GC。
可见,offer
操作的关键步骤是代码(3),通过CAS操作来控制某一时刻只能有1个线程可以追加元素到队列末尾。进行CAS竞争失败的线程会通过循环不断尝试CAS操作,直到CAS成功才会返回,也就是通过无线循环不断进行CAS尝试方式来代替阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。
poll()
用途:在队列头部获取并移除一个元素,如果队列为空则返回null。
1 | public E poll() { |
(1)poll
操作从队列头获取元素,所以循环从head
节点开始,执行代码(3)获取当前队列的头节点。队列开始为空的时候状态如下:

由于head
节点为空,所以会执行到代码(6),如果这个过程中没有线程调用offer
方法,则此时q == null
。队列状态如下:

所以会执行updateHead
方法,由于h == p
,所以不会执行casHead
,直接返回null。
(2)假设执行到代码(6)时已经有其他线程调用了offer
方法并成功添加一个元素到队列,这时候q
指向的是新增元素的节点。队列状态如下:

执行代码(6),q != null
,继续执行代码(7),p != q
,最后执行代码(8),p = q
。队列状态如下:

继续循环,执行代码(3)后执行代码(4),此时p.item
不为null,于是通过CAS操作尝试将p.item
设置为null,假设此时没有其他线程进行poll
操作,则CAS操作成功后执行代码(5),由于此时p != h
,所以设置头节点为p
,并且设置h
的next
节点为h
自己,返回从移除节点值item
。队列状态如下:

这个状态就是offer
操作执行代码(5)的状态。
(3)假设现在一个线程调用了poll
操作,则在执行代码(4)时队列状态如下:

这时候执行代码(6)返回。
(4)现在还有代码分支(7)没有执行过。
假设线程1执行poll
操作时当前队列状态如下:

那么执行代码(4)时,通过CAS操作设置p.item
为null,假设CAS设置成功则标记该节点并从队列中将其清除,队列状态如下:

然后,由于p != h
,所以会执行uploadHead
方法,假如线程1执行uploadHead
方法前,另外一个线程2开始poll
操作,这时候线程2的p
指向head
节点,但是还未执行到代码(6),队列状态如下:

然后线程1执行updateHead
方法,执行完毕后线程1退出,队列状态如下:

然后线程2继续执行代码(6),由于p
自引用,所以p == q
,于是跳转到外层循环restartFromHead
,重新获取head
,队列状态如下:

总结:poll
方法在移除一个元素时,只是简单地使用CAS操作把当前节点的item
设置为null,然后通过设置头节点将该元素从队列中移除,被移除的节点就成了孤立节点从而被GC。另外,如果执行分支中发现头节点被修改了,要跳转到外层循环重新获取新的头节点。
peek()
用途:获取队列头部的一个元素,如果队列为空则返回null。
1 | public E peek() { |
第一次循环时会发现item
为空,第二次循环时p
指向队列第一个元素,执行代码(4),q == null
成立。
(1)队列为空时,队列状态如下:

这时候执行updateHead
方法,由于p == h
,所以不进行任何操作,直接返回null。
(2)队列至少有一个元素时,队列状态如下:

这时候执行代码(6),p = q
,队列状态如下:

重新循环,执行代码(4)时,item != null
,所以执行updateHead
方法,由于h != p
,所以重新头节点为p
,队列状态如下:

即剔除了哨兵节点。
总结:peek
操作和poll
操作类似,只是前者只获取头节点但不删除。另外,在第一次调用peek
方法时会删除哨兵节点,并让头节点指向队列中的第一个元素或者null。
size()
用途:计算当前队列值不为空的元素个数。
1 | public int size() { |
remove(Object o)
用途:删除指定值的元素,队列中存在一个则直接删除,存在多个则删除第一个,并返回true,否则返回false。
1 | public boolean remove(Object o) { |
最后小结
ConcurrentLinkedQueue
的底层使用单向链表来保存队列元素,每个元素给封装成节点。队列靠头尾节点来维护,创建队列时头尾节点都指向一个item
为null的哨兵节点。第一次执行peek
或者first
时会把删除哨兵节点,把头节点指向第一个真正的节点。由于使用非阻塞CAS算法,没有加锁,所以在计算size
时有可能进行了offer
、poll
或者remove
操作,导致计算的元素个数不精确,所以在并发情况下size
方法不推荐使用。
入队、出队都是操作volatile
修饰的tail
和head
节点,要保证在多线程下出入队线程安全,只需要保证这个两个节点操作的可见性和原子性即可。volatile
保证操作可见性,CASS保证操作原子性。

offer
操作是在tail
后面增加元素,也就是调用tail.cadNext
方法,而这个方法使用的是CAS操作,只有一个线程会成功,然后失败的线程会循环,重新获取tail
,再执行casNext
方法。poll
操作也通过类似CAS的算法保证出队时移除节点操作的原子性。