🔍 基于JDK1.8的ArrayBlockingQueue源码分析。
基本知识
始于:JDK 1.5
作者:Doug Lea
特点:线程安全,有界并阻塞
结构:数组
类图:
主要属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
|
构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public ArrayBlockingQueue(int capacity) { this(capacity, false); }
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
|
默认使用非公平锁。
offer(E e)
用途:向队列尾部插入一个元素,如果队列有空闲空间则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false。如果e
为null则抛出NPE异常。该方法是非阻塞的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
|
代码(1)进行参数校验,如果为空则抛出NullPointerException
异常。
代码(2)获取独占锁,当前线程获取该锁后,其他入队和出队操作的线程都会被阻塞挂起而后被放入lock的AQS阻塞队列。
代码(3)判断队列是否已满,满则直接返回false,否则调用enqueue
方法后返回true。
在enqueue
方法中,首先把当前元素放入items
数组,然后计算下一个元素应该存放的下标位置,并递增元素个数计数器,添加元素之后,激活因为进行消费操作而被阻塞的线程。这里由于在操作共享变量count
前加了独占锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是从CPU缓存或者寄存器获取。
代码(5)释放锁后,会把修改的共享变量值刷新到主内存中,这样其他线程通过加锁再次读取这些共享变量时,就可以看到最新的值。
put(E e)
用途:向队列尾部插入一个元素,如果队列有空闲位置则插入后直接返回true,如果队列已满则阻塞当前线程直到队列有空闲并插入成功后返回true,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException
异常而返回。如果e
为null则抛出NPE异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
|
poll()
用途:从队列头部获取并移除一个元素,如果队列为空则返回null,该方法是不阻塞的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
|
take()
用途:获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException
异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
|
peek()
用途:获取队列头部元素但是不从队列里面移除,如果队列为空则返回null,该方法是不阻塞的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }
@SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; }
|
size()
用途:获取队列元素个数。
1 2 3 4 5 6 7 8 9
| public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
|
size
方法比较简单,上锁后直接返回count
,并在返回前释放锁。
因为count
并没有被声明为volatile
,所以无法保证线程可见性。
最后小结
ArrayBlockingQueue
通过全局独占锁实现了同时只有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似于在方法上添加synchronized
的意思。
其中offer
和poll
操作通过简单的加锁进行入队、出队操作,而put
、take
操作则使用条件变量实现,如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。另外,相比LinkedBlockingQueue
,ArrayBlockingQueue
的size
操作的结果是精确的。