
7.1 AbstractQueuedSynchronizer
7.1.1 AQS概述
AbstractQueuedSynchronizer
(抽象同步队列),是阻塞式锁和相关的同步器工具的框架。
- 使用
state
属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁。
getState
:获取state状态
setState
:设置state状态
compareAndSetState
:乐观锁机制设置state状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于FIFO的等待队列,在其内部通过节点
head
和tail
记录对手和队尾元素,队列的元素类型为Node
。
SHARED
:用来标记该线程是是否是获取共享资源时被阻塞挂起后放入AQS队列的
EXCLUSIVE
:用来标记线程是获取独占资源时被挂起后放入AQS队列的
waitStatus
:记录当前线程等待状态
- 1:CANCELLED,线程结点已释放(超时、中断),已取消的节点不会再阻塞
- -1:SIGNAL,该线程的后续线程需要阻塞,即只要前驱结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程
- -2:CONDITION,该线程在condition队列中阻塞
- -3:PROPAGATE,释放共享资源时需要通知其他节点
prev
记录当前节点的前驱结点,next
记录当前阶段的后继结点
- 内部类
ConditionObject
,用来结合锁实现线程同步,是条件变量,每个条件变量对应一个条件队列(单向链表队列),用来存放调用条件变量的await
方法后被阻塞的线程,队列的头、尾元素分别是firstWaiter
和lastWaiter
。
7.1.2 获取与释放资源
对于AQS来说,线程同步的关键是对状态值state
进行操作。根据state
是否属于一个线程,操作state
的方式分为独占方式和共享方式。
- 独占方式:
void acquire(int arg)
void acquireInterruptibly(int arg)
boolean release(int arg)
- 共享方式:
void acquireShared(int arg)
void acquireSharedInterruptibly(int arg)
boolean releaseShared(int arg)
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记到是这个线程获取到了,其他线程再尝试操作state
获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。比如独占锁ReentrantLock
的实现,当一个线程获取了ReentrantLock
的锁后,在AQS内部会首先使用CAS操作把state
状态值从0变为1,然后设置当前锁的持有者exclusiveOwnerThread
为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值state
从1变为2,即设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。
对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。比如Semaphore
信号量,当一个线程通过acquire()
方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。
7.1.2.1 独占方式
(1)当一个线程调用acquire(int arg)
方法获取独占资源时,会首先使用tryAcquire
方法尝试获取资源,具体是设置状态变量state
的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE
的Node
节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)
方法挂起自己。
1 2 3 4 5
| public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
|
(2)当一个线程调用release(int arg)
方法时会尝试使用tryRelease
操作释放资源,这里是设置状态变量state
的值,然后调用LockSupport.unpark(thread)
方法激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryAcquire
尝试,看当前状态变量state
的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。
1 2 3 4 5 6 7 8 9
| public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
|
需要注意的是,AQS类并没有提供可用的tryAcquire
和tryRelease
方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquire
和tryRelease
需要由具体的子类来实现。子类在实现tryAcquire
和tryRelease
时要根据具体场景使用CAS算法尝试修改state
状态值,成功则返回true
,否则返回false
。子类还需要定义,在调用acquire
和release
方法时state
状态值的增减代表什么含义。
7.1.2.2 共享方式
(1)当线程调用acquireShared(int arg)
获取共享资源时,会首先使用tryAcquireShared
尝试获取资源,具体是设置状态state
的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED
的Node
节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)
方法挂起自己。
1 2 3 4
| public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
|
(2)当一个线程调用releaseShared(int arg)
时会尝试使用tryReleaseShared
操作释放资源,这里是设置状态变量state
的值,然后使用LockSupport.unpark(thread)
激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryReleaseShared
查看当前状态变量state
的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。
1 2 3 4 5 6 7
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|
同样需要注意的是,AQS类并没有提供可用的tryAcquireShared
和tryReleaseShared
方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquire
和tryRelease
需要由具体的子类来实现。子类在实现tryAcquireShared
和tryReleaseShared
时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。
7.1.2.3 入队操作
当一个线程获取锁失败后该线程会被转换为Node
节点,然后就会使用enq(final Node node)
方法将该节点插入到AQS的阻塞队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
|

在第一次循环中,在队列尾部插入元素时,队列状态如图(default)所示,队列头尾节点都指向null;
执行代码(1)后节点t指向了尾部节点,此时队列状态如图(I)所示;
执行代码(2),使用CAS设置一个哨兵节点为头结点,如果设置成功,则让尾部节点也指向哨兵节点,此时队列状态如图(II)所示。

第一次循环之后只插入了一个哨兵节点,还需要插入node节点。
在第二次循环中,执行代码(1)后节点t指向了哨兵节点,此时队列状态如图(III)所示;
执行代码(3),设置node的前驱结点为哨兵节点,此时队列状态如图(IV)所示;
执行代码(4),通过CAS设置node节点为尾部节点,如果设置成功,则队列状态如图(V)所示;
然后设置哨兵节点的后继结点为node结点,此时队列状态如图(VI)所示。
7.1.2.4 补充
基于AQS实现的锁除了需要重写上面方法以外,还需要重写isHeldExclusively
方法,来判断是被当前线程独占还是被共享。
此外,与acquire
和release
对应的都有一个带有Interruptibly
关键字的函数。
区别如下:
不带interruptibly
关键字的方法的意思是不对中断进行响应,也就是线程在调用不带interruptibly
关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。
而带interruptibly
关键字的方法要对中断进行响应,也就是线程在调用带interruptibly
关键字的方法获取资源时或者获取资源失败而被挂起时,其他线程中断了该线程,那么该线程会抛出InterruptedException
异常而返回。
7.2 ReentrantLock
7.2.1 概述与应用
ReentrantLock
是可重入的独占锁,同时只能有一个线程可以获取该锁,其他获取该锁的线程会被阻塞而被放入该锁的AQS阻塞队列中。
示例1:基于AQS实现的不可重入的独占锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| import java.io.Serializable; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock;
public class ParaKLock implements Lock, Serializable { private static final long serialVersionUID = 7379874578553124018L;
private static class ParaKSync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6278547169976524823L;
@Override protected boolean isHeldExclusively() { return getState() == 1; }
@Override protected boolean tryAcquire(int arg) { if (Thread.currentThread() == this.getExclusiveOwnerThread()) throw new IllegalMonitorStateException("不支持锁重入"); if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
@Override protected boolean tryRelease(int arg) { assert arg == 1; if (getState() == 0) throw new IllegalMonitorStateException("当前state不为0"); setExclusiveOwnerThread(null); setState(0); return true; }
Condition newCondition() { return new ConditionObject(); } }
private final ParaKSync sync = new ParaKSync();
@Override public void lock() { sync.acquire(1); }
@Override public boolean tryLock() { return sync.tryAcquire(1); }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); }
@Override public void unlock() { sync.release(1); }
public boolean isLocked() { return sync.isHeldExclusively(); }
@Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
@Override public Condition newCondition() { return sync.newCondition(); } }
|
示例2:基于上面自定义独占锁实现生产-消费模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| import lombok.extern.slf4j.Slf4j;
import java.util.Queue; import java.util.Random; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition;
@Slf4j(topic = "ProducerConsumerModel") public class ProducerConsumerModel { private final static ParaKLock LOCK = new ParaKLock(); private final static Condition FULL = LOCK.newCondition(); private final static Condition EMPTY = LOCK.newCondition(); private final static Queue<Character> QUEUE = new LinkedBlockingDeque<>(); private final static int QUEUE_MAX_SIZE = 5; private final static Random RANDOM = new Random();
private static Character randomChar() { int choice = RANDOM.nextInt(3); if (choice == 0) return (char) (RANDOM.nextInt(10) + 48); else if (choice == 1) return (char) (RANDOM.nextInt(26) + 65); else return (char) (RANDOM.nextInt(26) + 97); }
static class Producer extends Thread { public Producer() { super.setName("Producer"); }
@Override public void run() { while (true) { LOCK.lock(); try { while (QUEUE.size() == QUEUE_MAX_SIZE) { FULL.await(); } Character character = randomChar(); TimeUnit.SECONDS.sleep(1); log.debug("生产 => [{}]", character); QUEUE.add(character); EMPTY.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { LOCK.unlock(); } } } }
static class Consumer extends Thread { public Consumer() { super.setName("Consumer"); }
@Override public void run() { while (true) { LOCK.lock(); try { while (QUEUE.isEmpty()) { EMPTY.await(); } Character character = QUEUE.poll(); TimeUnit.SECONDS.sleep(1); log.debug("消费 => [{}]", character); FULL.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { LOCK.unlock(); } } } }
public static void main(String[] args) { Producer producer = new Producer(); Consumer consumer = new Consumer(); producer.start(); consumer.start(); } }
|
7.2.2 非公平锁原理
先结合ReentrantLock
的内部类Sync
、NonfailSync
和AbstractQueuedSynchronizer
中的方法进行分析。
对于Sync
中的抽象方法lock
,公平锁与非公平锁分别有不同的实现:
非公平锁NonfairSync
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
|
Sync
中的nonfairTryAcquire
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
AbstractQueuedSynchronizer
中涉及到的主要方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
static void selfInterrupt() { Thread.currentThread().interrupt(); }
|
非公平抢锁过程总结:
调用ReentrantLock
的lock
方法,会执行AQS的acquire
方法,先执行NonFairSync
的tryAcquire
方法尝试获取锁,失败则执行AQS的acquireQueued
方法在队列中循环获取锁。
tryAcquire
:检查state
状态值,为0说明还未有线程获取锁,那么自己就直接尝试通过CAS操作将state
设置为1,成功则官宣自己独占;不为0且当前线程是独占线程,表示锁冲入,那么state
+1。以上两种情况成功都返回true
,从而结束抢锁过程,其他情况都代表获取失败,返回false
。
获取失败后,先执行addWaiter
方法将当前线程封装为节点添加到队列末尾,然后进入acquireQueued
方法,循环获取直到成功,执行selfInterrupt
方法。
acquireQueued
:设置一个标志interrupted = false
,代表线程获取锁的过程中是否产生了中断,第一个if分支,判断当前节点的前驱节点是否为哨兵节点,是则拥有获取锁的资格,执行tryAcquire
尝试获取锁,成功则将前驱节点剔除,并且返回interrupted
;往下执行,第二个if分支,说明当前节点不是第二节点,没有获取锁的资格,或者是tryAcquire
失败,没有抢占到锁,执行shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
,执行成功后将interrupted
设置为true
。返回false
则直接退出acquireQueued
过程,返回true
则说明获取锁的过程中产生了中断,但是这个中断标记在parkAndCheckInterrupt
清除了,所以需要填补中断标记。
shouldParkAfterFailedAcquire
:顾名思义,获取失败后应该挂起,如果当前节点的前驱节点的状态为SIGNAL
,那么前驱节点释放锁后会通知自己,自己就可以安心阻塞,并返回true
;如果当前节点的前驱节点的状态为CANCELLED
即取消排队,那么需要寻找当前节点的有效前驱,即跳过状态为CANCELLED
的节点,找到一个状态为SIGNAL
的节点;否则当前节点的前驱节点的状态为0,则通过CAS操作将前驱节点的状态设置为SIGNAL
,后两种都会返回false
,那么会在acquireQueued
的for循环中进行下一轮,第二次会按照第一种情况返回true
。返回true
后,执行parkAndCheckInterrupt
。
parkAndCheckInterrupt
:先调用LockSupport.park
挂起当前线程(有三种唤醒方式:①unpark ②interrupt ③return),那么当前线程等待唤醒(被唤醒的时候会进入acquireQueued
的第一个if分支),后调用Thread.interrupted
方法返回并重置中断状态,
这里为什么需要清除中断状态呢?请看知乎回答: https://www.zhihu.com/question/399039232/answer/1260843911
如果不清除中断状态,也就是返回Thread.currentThread.isInterrupt()
方法,那么当前线程挂起后依然是中断状态,那么下一轮循环的时候LockSupport.lock
方法将会失效,使得当前线程持续运行,在acquireQueued
的for循环中空转,导致CPU100%。
下面开始图示详解:
没有竞争时

第一个竞争出现时

Thread-1执行了
CASC
尝试将state
由0
改为1
,结果失败
- 进入
tryAcquire
逻辑,这时state
已经是1
,结果仍然失败
- 接下来进入
addWaiter
逻辑,构造Node
队列
- 图中黄色三角表示该
Node
的waitStatus
状态,其中0为默认正常状态
- Node的创建是懒惰的
- 其中第一个
Node
称为Dummy
(哑元)或哨兵,用来占位,并不关联线程

进入acquireQueued
逻辑
acquireQueued
会在一个死循环中不断尝试获得锁,失败后进入park
阻塞
- 如果自己是紧邻着
head
(排第二位),那么再次tryAcquire
尝试获取锁,当然这时state
仍为1
,失败
- 进入
shouldParkAfterFailedAcquire
逻辑,将前驱node
,即head
的waitStatus
改为-1
,这次返回false

shouldParkAfterFailedAcquire
执行完毕回到acquireQueued
,再次tryAcquire
尝试获取锁,当然这时state
仍然为1,失败
当再次进入shouldParkAfterFailedAcquire
时,这时因为其前驱node
的waitStatus
已经是-1
,这次返回true
进入parkAndCheckInterrupt
,Thread-1 park(灰色表示)

再次有多个线程经历上述过程竞争失败,变成这个样子

Thread-0释放锁
- 设置
exclusiveOwnerThread
为null
state
= 0

当前队列不为null
,并且head
的waitStatus
,进入unparkSuccessor
流程
找到队列中离head
最近的一个node
(没取消的),unpark
恢复其运行,本例中即为Thread-1。
回到Thread-1的acquireQueued
流程

如果加锁成功(没有竞争),会设置
exclusiveOwnerThread
为Thread-1,state
= 1
head
指向刚刚Thread-1所在的Node
,该node
清空Thread
- 原本的
head
因为从链表断开,而可被垃圾回收
如果这时候有其他线程来竞争(非公平的体现),例如这时有Thrtead-4来了

如果不巧又被Thread-4占了先
- Thread-4被设置为
exclusiveOwnerThread
,state
= 1
- Thread-1再次进入
acquireQueued
流程,获取锁失败,重新进入park
阻塞
7.2.3 公平锁原理
ReentrantLock-Fair
的Sync
覆盖方法tryAcquire
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
AbstractQueuedSynchronizer
中涉及到的方法:
1 2 3 4 5 6 7 8 9
| public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
|
7.2.4 可重入原理
ReentrantLock
的内部类Sync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
|
7.2.5 可打断原理
不可打断模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
static void selfInterrupt() { Thread.currentThread().interrupt(); }
|
可打断模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
|
7.2.6 条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是ConditionObject
。
内部属性:
1 2 3 4 5 6
| private transient Node firstWaiter; private transient Node lastWaiter;
private static final int REINTERRUPT = 1; private static final int THROW_IE = -1;
|
await流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private Node addConditionWaiter()
final int fullyRelease(Node node)
final boolean isOnSyncQueue(Node node)
private int checkInterruptWhileWaiting(Node node)
final boolean acquireQueued(final Node node, int arg)
private void unlinkCancelledWaiters()
private void reportInterruptAfterWait(int interruptMode)
|
开始Thread-0持有锁,调用await
,进入ConditionObject
的addConditionWaiter
流程。
创建新的Node
状态为-2(Node.CONDITION
),关联Thread-0,加入等待队列尾部。

接下来进入AQS的fullRelease
流程,释放同步器上的锁
unpark AQS队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么Thread-1竞争成功

park阻塞Thread-0

signal流程
1 2 3 4 5 6 7 8
| public final void signal()
protected boolean isHeldExclusively()
private void doSignal(Node first)
final boolean transferForSignal(Node node)
|
假设Thread-1要来唤醒Thread-0

进入ConditionObject
的doSignal
流程,取得等待队列中第一个Node
,Thread-0所在Node

执行transferForSignal
流程,将该Node
加入到AQS队列尾部,将Thread-0的waitStatus
改为0,Thread-3的waitStatus
改为-1

Thread-1释放锁,进入unlock
流程。
7. 3 ReentrantReadWriteLock
7.3.1 概述与应用
当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。
- 读锁不支持条件变量
- 重入时升级不支持:持有读锁的情况下去获取写锁,会导致写锁永久等待
- 重入时降级支持:持有写锁的情况下去获取读锁
示例1:数据容器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import lombok.extern.slf4j.Slf4j;
import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j(topic = "DataContainer") class DataContainer { private Object data; private final static Random RANDOM = new Random(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private static Character randomChar() { int choice = RANDOM.nextInt(3); if (choice == 0) return (char) (RANDOM.nextInt(10) + 48); else if (choice == 1) return (char) (RANDOM.nextInt(26) + 65); else return (char) (RANDOM.nextInt(26) + 97); }
public void read() { log.debug("获取读锁..."); readLock.lock(); try { log.debug("读取数据 => [{}]", data); } finally { log.debug("释放读锁..."); readLock.unlock(); } }
public void write() { log.debug("获取写锁..."); writeLock.lock(); try { log.debug("写入数据 => [{}]", data = randomChar()); } finally { log.debug("释放写锁..."); writeLock.unlock(); } } }
|
示例2:文档中的应用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import java.util.Random; import java.util.concurrent.locks.ReentrantReadWriteLock;
class CachedData { Object data; volatile boolean cacheValid; final static Random RANDOM = new Random(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
void processCachedData() { lock.readLock().lock(); if (!cacheValid) { lock.readLock().unlock(); lock.writeLock().lock(); try { if (!cacheValid) { data = write(); cacheValid = true; } lock.readLock().lock(); } finally { lock.writeLock().unlock(); } } try { use(data); } finally { lock.readLock().unlock(); } } }
|
示例3:Redis的Java缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
| import lombok.extern.slf4j.Slf4j; import redis.clients.jedis.Jedis;
import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock;
@Slf4j(topic = "GenericRedisDemo") public class GenericRedisDemo { public static void main(String[] args) { GenericCached cached = new GenericCached(); cached.update(new User(1, "KHighness", "parakovo@gmail.com")); cached.update(new User(2, "RubbishK", "rubbish@gmail.com")); cached.update(new User(3, "FlowerK", "flower@gmail.com")); log.debug(cached.query(User.class, 1).toString()); log.debug(cached.query(User.class, 1).toString()); log.debug(cached.query(User.class, 2).toString()); } }
class User { private Integer id; private String name; private String email; public User(Integer id, String name, String email) { this.id = id; this.name = name; this.email = email; } }
interface Generic { String update(Object obj); Map<String, String> query(Class<?> clazz, Integer id); }
class GenericRedis implements Generic { private final Jedis jedis = new Jedis("127.0.0.1", 6379);
public String update(Object obj) { Class<?> clazz = obj.getClass(); Field[] declaredFields = clazz.getDeclaredFields(); Map<String, String> map = new HashMap<>(); Integer id = null; try { Field idField = clazz.getDeclaredField("id"); idField.setAccessible(true); id = (Integer) idField.get(obj); } catch (NoSuchFieldException | IllegalAccessException e) { e.printStackTrace(); } for (Field field : declaredFields) { field.setAccessible(true); try { map.put(field.getName(), field.get(obj).toString()); } catch (IllegalAccessException e) { e.printStackTrace(); } } return jedis.hmset(clazz.getSimpleName() + ":" + id, map); }
public Map<String, String> query(Class<?> clazz, Integer id) { return jedis.hgetAll(clazz.getSimpleName() + ":" + id); } }
@Slf4j(topic = "GenericCached") class GenericCached implements Generic { private final GenericRedis redis = new GenericRedis(); private final Map<Key, Map<String, String>> cache = new HashMap<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
static class Key { Class<?> clazz; Integer id;
public Key(Class<?> clazz, Integer id) { this.clazz = clazz; this.id = id; }
@Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; return Objects.equals(clazz, key.clazz) && Objects.equals(id, key.id); }
@Override public int hashCode() { return Objects.hash(clazz, id); }
@Override public String toString() { return "Key[" + "clazz=" + clazz + ", id=" + id + ']'; } }
@Override public String update(Object obj) { String res = null; lock.writeLock().lock(); try { res = redis.update(obj); Field idField = obj.getClass().getDeclaredField("id"); idField.setAccessible(true); Key key = new Key(obj.getClass(), (int) idField.get(obj)); if (cache.remove(key) != null) { log.debug("清除缓存: {}", key); } } catch (IllegalAccessException | NoSuchFieldException e) { e.printStackTrace(); } finally { lock.writeLock().unlock(); } return res; }
@Override public Map<String, String> query(Class<?> clazz, Integer id) { Key key = new Key(clazz, id); Map<String, String> res = null; lock.readLock().lock(); try { if (cache.containsKey(key)) { log.debug("缓存查询: {}", res = cache.get(key)); return res; } } finally { lock.readLock().unlock(); } lock.writeLock().lock(); try { if (cache.containsKey(key)) { log.debug("缓存查询: {}", res = cache.get(key)); return res; } cache.put(key, res = redis.query(clazz, id)); log.debug("添加缓存: {}", key); } finally { lock.writeLock().unlock(); } return res; } }
|
7.3.2 内部类Sync概述
读写锁的内部维护了一个ReadLock
和一个WriteLock
,它们依赖Sync
实现具体功能,Sync
继承自AQS,并且也提供了公平锁和非公平锁的实现。
AQS中只维护了一个state
状态,那么如何表示读和写两种状态呢?
Sync
巧妙地使用state
的高16位表示读获取到读锁的次数,低16位表示获取到写锁的线程的可重入次数。
1 2 3 4 5 6 7 8 9 10 11 12
| static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
|
Sync
的成员:
firstReader
:记录第一个获取到读锁的线程
firstReaderHoldCount
:记录第一个获取到读锁的线程获取读锁的可重入次数
cacheHoldCounter
:记录最后一个获取读锁的线程获取读锁的可重入次数
readHolds
:存放除去第一个获取锁线程外的其他线程获取读锁的可重入次数
1 2 3 4 5 6 7 8 9 10 11
| static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); }
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
|
7.3.3 写锁的获取与释放
在ReentrantReadWriteLock
中写锁使用WriteLock
实现。
获取
写锁是个独占锁,某时只有一个线程可以获取该锁。
如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回。
如果当前已经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起。
另外,写锁是可重入锁,再次获取只是简单地把可重入次数加1后直接返回。
(1)WriteLock
的lock
方法:
1 2 3
| public void lock() { sync.acquire(1); }
|
(2)AbstractQueuedSynchronizer
的acquire
方法:
1 2 3 4 5
| public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
|
(3)Sync
重写的tryAcquire
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
|
(4)writerShouldBlock
方法:
- 公平锁
NonfairSync
实现:return false
,即抢占式执行CAS尝试获取写锁,获取成功则设置当前锁的持有者为当前线程并返回true。
- 非公平锁
FairSync
的实现:return hashQueuedPredecessors()
,判断当前线程节点是否有前驱结点,有则放弃获取写锁的权限,直接返回false。
释放
如果当前线程持有该所,unlock
会让该线程对该线程持有的AQS状态值减1,如果减1后当前状态值为0则当前线程会释放该锁,否则仅仅减1而已。
如果当前线程没有持有该锁而调用unlock
抛出IllegalMonitorStateException
异常。
(1)WriteLock
的unlock
方法:
1 2 3
| public void unlock() { sync.release(1); }
|
(2)AbstractQueuedSynchronizer
的release
方法:
1 2 3 4 5 6 7 8 9 10
| public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
|
(3)Sync
重写的tryRelease
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
| protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
|
7.3.4 读锁的获取与释放
在ReentrantReadWriteLock
中读锁使用ReadLock
实现。
获取
如果当前没有线程持有写锁,则当前线程可以获取锁,AQS的状态值state
的高16位的值会加1,然后方法返回。
如果其他一个线程持有锁,则当前线程会阻塞。
(1)Read
的lock
方法:
1 2 3
| public void lock() { sync.acquireShared(1); }
|
(2)AbstractQueuedSynchronizer
的acquireShared
方法:
1 2 3 4
| public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
|
(3)Sync
重写的tryAcquireShared
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
|
(4)readerShouldBlock
方法:
公平锁NonfairStnc
实现:return apparentlyFirstQueuedIsExclusive()
,实现如下:
1 2 3 4 5 6 7
| final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
|
上述代码的作用是,如果队列里面存在一个元素,咋判断第一个元素是不是正在尝试获取写锁,如果不是,则当前线程判断当前获取读锁的线程是否到达了最大值,最后执行CAS操作将AQS状态值的高16位值加1。
非公平锁FairSync
的实现:return hashQueuedPredecessors()
,判断当前线程节点是否有前驱结点,有则放弃获取读锁的权限。
(5)Sync
的fullTryAcquireShared
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1; } } }
|
释放
(1)ReadLock
的unlock
方法:
1 2 3
| public void unlock() { sync.releaseShared(1); }
|
(2)AbstractQueuedSynchronizer
的releaseShared
方法:
1 2 3 4 5 6 7
| public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
|
(3)Sync
的tryReleaseShared
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
|
7.4 StampedLock
StampedLock
是JDK1.8新增的一个锁,该锁提供了三种模式的读写控制,但是不支持条件变量和锁重入。
当调用获取锁的系列函数时,会返回一个long
型的变量,称为戳记(stamp),代表了锁的状态。其中try系列获取锁的函数,当获取失败后会返回为0的stamp值。当调用释放锁和转换锁的方法时需要传入获取锁时返回的stamp值。
StampedLock
提供了三种读写模式的锁:
写锁writeLock
:是一个独占锁,任意时刻只有一个线程可以获取到该锁,当一个线程获取该锁后,其他请求读锁和写锁的线程必须等待,这类似于ReentrantReadWriteLock
的写锁(不同的是这里的写锁是不可重入锁);当目前没有线程持有读锁或者写锁时才可以获取到该锁。请求该锁成功后会返回一个stamp
变量用来表示该锁的版本,当解释该锁时需要调用unlockWrite
方法并传递获取锁时的stamp
参数。并且它提供了非阻塞的tryWriteLock
方法。
1 2
| long stamp = lock.writeLock(); lock.unlockWrite(stamp);
|
悲观读锁readLock
:是一个共享锁,在没有线程获取独占写锁的情况下,多个线程可以同时获取该锁。如果已经有线程持有写锁,则其他线程请求获取该锁会被阻塞,这类似于ReentrantReadWriteLock
的读锁(不同的是这里的读锁是不可重入锁)。这里说的悲观是指具体操作数据前会悲观地认为其他线程可以要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑。请求该所成功后会返回一个stamp
变量用来表示该锁的版本,当释放该锁时需要调用unlockRead
方法并传递stamp
参数。并且它提供了非阻塞的tryReadLock
方法。
1 2
| long stamp = lock.readLock(); lock.unlockRead(stamp);
|
乐观读锁tryOptimisticRead
:它是相对于悲观锁来说的,在操作数据前并没有通过CAS设置锁的状态,仅仅通过位运算测试。如果当前没有线程持有写锁,则简单地返回一个非0的stamp
值。获取该stamp
后在具体操作数据前还需要调用validate
方法验证该stamp
是否已经不可用,也就是看当调用tryOptimisticRead
返回stamp
后到当前时间期间是否有其他线程持有了写锁,如果是则validate
会返回0,否则就可以使用该stamp
版本的锁对数据进行操作。由于tryOptimisticRead
并没有使用CAS设置锁状态,所以不需要显式地释放该锁。该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用位操作进行检验,不涉及CAS操作,所以效率上会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要复制一份要操作的变量到方法栈,并且在操作数据时可能其他写线程已经修改了数据,而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。
1 2 3 4
| long stamp = lock.tryOptimisticRead(); if (!lock.validate(stamp)) { }
|
示例:笛卡尔二维点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| import java.util.concurrent.locks.StampedLock;
public class Point {
private double x;
private double y;
private final StampedLock lock = new StampedLock();
void move(double deltaX, int deltaY) { long stamp = lock.writeLock(); try { x += deltaX; y += deltaY; } finally { lock.unlockWrite(stamp); } }
double distanceFromOrigin() { long stamp = lock.tryOptimisticRead(); double currentX = x, currentY = y; if (!lock.validate(stamp)) { stamp = lock.readLock(); try { currentX = x; currentY = y; } finally { lock.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); }
void moveIfAtOrigin(double newX, double newY) { long stamp = lock.readLock(); try { while (x == 0.0 && y == 0.0) { long ws = lock.tryConvertToWriteLock(stamp); if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { lock.unlockRead(stamp); stamp = lock.writeLock(); } } } finally { lock.unlockWrite(stamp); } } }
|