ConcurrentLinkedQueue
🔍 基于JDK1.8的ConcurrentLinkedQueue源码分析。
基本知识
始于:JDK 1.5
作者:Doug Lea
特点:线程安全,无界非阻塞
结构:单向链表
类图:
主要属性
1 | // 节点类 |
构造函数
1 | // 无参构造 |
offer(E e)
用途:在队列末尾添加一个元素。
1 | public boolean offer(E e) { |
(1)单线程执行offer(item)
首先进行参数判空,空则抛出NPE异常,使用item作为参数构造一个新节点item。
执行代码(1),这时候head、tail、p、t同时指向了哨兵节点。同时q指向哨兵节点的后继。队列状态如下:
执行代码(2),发现 q == null则执行代码(3),通过CAS操作将p.next设置为item。
这里 p == t成功,所以没有设置尾节点,直接返回true。队列状态如下:
(2)双线程执行
假设两个线程同时执行代码(3),线程1调用offer(item1),线程2调用offer(item2)。
假设线程1先执行CAS操作,那么更新p的next节点为item1,那么线程2执行CAS操作的时候,发现p的next节点并不是null,然后执行重新循环,执行代码(1)。队列状态如下:
线程2执行代码(2),由于q != null,于是线程2执行代码(6),p = q。队列状态如下:
线程2重新循环,执行代码(1)。队列状态如下:
线程执行代码(2),由于q == null,接着执行代码(3),通过CAS操作更新p的next节点为item2。
然后执行代码(4),由于p != t,所以通过CAS操作重新设置尾节点为item2。队列状态如下:
(3)第三情况
在以上的两种情况中,还差代码(5)的执行情况没有出现。
在poll操作后可能会存在的一种情况。如下图所示:
执行代码(2)时,队列状态如下:
此时q != null 并且p == q(tail结点的item为空,但是tail不为空),所以执行代码(5),由于t == tail,所以p = head。
然后重新循环,执行代码(1),并且执行到(2)。队列状态如下:
此时由于q == null,执行代码(3)进行CAS操作,如果没有竞争线程,则CAS成功,p的next结点设置为新节点,然后执行代码(4),由于p != t,所以设置新节点为新的尾节点。队列状态如下:
这里自引用的结点则会被GC。
可见,offer操作的关键步骤是代码(3),通过CAS操作来控制某一时刻只能有1个线程可以追加元素到队列末尾。进行CAS竞争失败的线程会通过循环不断尝试CAS操作,直到CAS成功才会返回,也就是通过无线循环不断进行CAS尝试方式来代替阻塞算法挂起调用线程。相比阻塞算法,这是使用CPU资源换取阻塞所带来的开销。
poll()
用途:在队列头部获取并移除一个元素,如果队列为空则返回null。
1 | public E poll() { |
(1)poll操作从队列头获取元素,所以循环从head节点开始,执行代码(3)获取当前队列的头节点。队列开始为空的时候状态如下:
由于head节点为空,所以会执行到代码(6),如果这个过程中没有线程调用offer方法,则此时q == null。队列状态如下:
所以会执行updateHead方法,由于h == p,所以不会执行casHead,直接返回null。
(2)假设执行到代码(6)时已经有其他线程调用了offer方法并成功添加一个元素到队列,这时候q指向的是新增元素的节点。队列状态如下:
执行代码(6),q != null,继续执行代码(7),p != q,最后执行代码(8),p = q。队列状态如下:
继续循环,执行代码(3)后执行代码(4),此时p.item不为null,于是通过CAS操作尝试将p.item设置为null,假设此时没有其他线程进行poll操作,则CAS操作成功后执行代码(5),由于此时p != h,所以设置头节点为p,并且设置h的next节点为h自己,返回从移除节点值item。队列状态如下:
这个状态就是offer操作执行代码(5)的状态。
(3)假设现在一个线程调用了poll操作,则在执行代码(4)时队列状态如下:
这时候执行代码(6)返回。
(4)现在还有代码分支(7)没有执行过。
假设线程1执行poll操作时当前队列状态如下:
那么执行代码(4)时,通过CAS操作设置p.item为null,假设CAS设置成功则标记该节点并从队列中将其清除,队列状态如下:
然后,由于p != h,所以会执行uploadHead方法,假如线程1执行uploadHead方法前,另外一个线程2开始poll操作,这时候线程2的p指向head节点,但是还未执行到代码(6),队列状态如下:
然后线程1执行updateHead方法,执行完毕后线程1退出,队列状态如下:
然后线程2继续执行代码(6),由于p自引用,所以p == q,于是跳转到外层循环restartFromHead,重新获取head,队列状态如下:
总结:poll方法在移除一个元素时,只是简单地使用CAS操作把当前节点的item设置为null,然后通过设置头节点将该元素从队列中移除,被移除的节点就成了孤立节点从而被GC。另外,如果执行分支中发现头节点被修改了,要跳转到外层循环重新获取新的头节点。
peek()
用途:获取队列头部的一个元素,如果队列为空则返回null。
1 | public E peek() { |
第一次循环时会发现item 为空,第二次循环时p指向队列第一个元素,执行代码(4),q == null成立。
(1)队列为空时,队列状态如下:
这时候执行updateHead方法,由于p == h,所以不进行任何操作,直接返回null。
(2)队列至少有一个元素时,队列状态如下:
这时候执行代码(6),p = q,队列状态如下:
重新循环,执行代码(4)时,item != null,所以执行updateHead方法,由于h != p,所以重新头节点为p,队列状态如下:
即剔除了哨兵节点。
总结:peek操作和poll操作类似,只是前者只获取头节点但不删除。另外,在第一次调用peek方法时会删除哨兵节点,并让头节点指向队列中的第一个元素或者null。
size()
用途:计算当前队列值不为空的元素个数。
1 | public int size() { |
remove(Object o)
用途:删除指定值的元素,队列中存在一个则直接删除,存在多个则删除第一个,并返回true,否则返回false。
1 | public boolean remove(Object o) { |
最后小结
ConcurrentLinkedQueue的底层使用单向链表来保存队列元素,每个元素给封装成节点。队列靠头尾节点来维护,创建队列时头尾节点都指向一个item为null的哨兵节点。第一次执行peek或者first时会把删除哨兵节点,把头节点指向第一个真正的节点。由于使用非阻塞CAS算法,没有加锁,所以在计算size时有可能进行了offer、poll或者remove操作,导致计算的元素个数不精确,所以在并发情况下size方法不推荐使用。
入队、出队都是操作volatile修饰的tail和head节点,要保证在多线程下出入队线程安全,只需要保证这个两个节点操作的可见性和原子性即可。volatile保证操作可见性,CASS保证操作原子性。
offer操作是在tail后面增加元素,也就是调用tail.cadNext方法,而这个方法使用的是CAS操作,只有一个线程会成功,然后失败的线程会循环,重新获取tail,再执行casNext方法。poll操作也通过类似CAS的算法保证出队时移除节点操作的原子性。




