🔍 基于JDK1.8的LinkedBlockingQueue源码分析。
基本知识
始于:JDK 1.5
作者:Doug Lea
特点:线程安全,有界并阻塞,生产-消费模型
结构:单向链表
类图:
主要属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
|
构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
|
offer(E e)
用途:向队列尾部添加一个元素。如果队列中有空闲位置则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
private void enqueue(Node<E> node) { last = last.next = node; }
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
|
offer(E e, long timeout, TimeUnit unit)
用途:向队列尾部添加一个元素。如果队列中有空闲位置则插入成功后返回true,如果队列已满等待指定时间看是否有空余位置,有空闲位置则插入成功后返回true,超时后返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
|
poll()
用途:从队列头部获取并移除一个元素,如果队列为空则返回null。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null; return x; }
private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
|
remove(Object o)
用途:删除队列中指定的元素,有则删除并返回true,没有则返回false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; if (last == p) last = trail; if (count.getAndDecrement() == capacity) notFull.signal(); }
void fullyLock() { putLock.lock(); takeLock.lock(); }
void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
|
size()
用途:获取当前队列元素个数。
1 2 3
| public int size() { return count.get(); }
|
由于进行出队、入队操作时的count是加锁的,所以结果相比ConcurrentLinkedQueue
的size
方法比较准确的。
为什么ConcurrentLinkedQueue
只能通过遍历链表而不能使用原子变量呢?
因为使用原子变量保存队列节点个数需要保证入队、出队操作和原子变量操作都是原子性操作,而ConcurrentLinkedQueue
使用的是CAS无锁算法,无法保证。
最后小结
LinkedBlockingQueue
的内部是通过单向链表实现的,使用头、尾节点来进行入队和出队操作,即入队操作都是对头节点进行操作,出队操作都是对尾节点进行操作。
如上图所示,对头、尾节点的操作分别使用了单独的独占锁从而保证了原子性,所以出队和入队操作是可以同时进行的。另外对头、尾节点都配备了一个条件队列,用来存放被阻塞的线程,并结合入队、出队操作实现了一个生产消费模型。