// 容量构造 publicLinkedBlockingQueue(int capacity){ if (capacity <= 0) thrownew IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
// 集合构造 publicLinkedBlockingQueue(Collection<? extends E> c){ this(Integer.MAX_VALUE); // 获取生产锁,并加锁 final ReentrantLock putLock = this.putLock; putLock.lock(); try { int n = 0; for (E e : c) { if (e == null) thrownew NullPointerException(); if (n == capacity) thrownew IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { // 释放生产锁 putLock.unlock(); } }
public E poll(){ // (1) 如果队列为空则返回null final AtomicInteger count = this.count; if (count.get() == 0) returnnull; E x = null; int c = -1; // (2) 获取独占锁,加锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // (3) 队列不空则出队,递减元素数量 if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); // (4) c大于1,当前线程移除队列里面一个元素后队列不为空 if (c > 1) notEmpty.signal(); } } finally { // (5) 释放锁 takeLock.unlock(); } // (6) c为移除节点前的队列元素数量 // c == capacity表示移除后产生一个空位,唤醒入队条件队列的阻塞线程进行生成 if (c == capacity) signalNotFull(); return x; }
// 移除队首节点head private E dequeue(){ Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }