4.1 问题提出

4.1.1 提取款问题

有如下需求,保证account、withdraw取款方法的线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
interface Account {
// 获取余额
Integer geBalance();
// 取款
void withdraw(Integer amount);

/**
* 方法内会启动1000个线程,每个线程做-10元操作
* 如果初始余额为10000,那么正确的结果应该是0
* @param account 账户
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
long start = System.nanoTime();
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.printf("final balance: %d, spend time(ms): %d] ", account.geBalance(), (end - start) / 1000_000);
}
}

显然以上做法无法保证线程安全。

4.1.2 锁解决方案

保护共享变量,给临界区加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class AccountSafe implements Account {
// 余额
private Integer balance;

public AccountSafe(Integer balance) {
this.balance = balance;
}

@Override
public Integer geBalance() {
return this.balance;
}

@Override
public void withdraw(Integer amount) {
synchronized (this) {
this.balance -= amount;
}
}
}

4.1.2 无锁解决方案

使用原子整数进行CAS操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class AccountCAS implements Account {
private AtomicInteger balance;

public AccountCAS(Integer balance) {
this.balance = new AtomicInteger(balance);
}

@Override
public Integer geBalance() {
return balance.get();
}

@Override
public void withdraw(Integer amount) {
while (true) {
// 获取余额的最新值
int prev = this.balance.get();
// 修改后的余额
int next = prev - amount;
// 同步到主存
// CAS(预期值,修改值) => boolean(是否修改成功)
if (this.balance.compareAndSet(prev, next)) {
break;
}
}
}
}

4.1.3 测试

编写以下代码进行测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class AccountDemo {
public static void main(String[] args) {
System.out.print("[Unsafe => ");
Account a1 = new AccountUnsafe(10000);
Account.demo(a1);
System.out.print("[synchronized => ");
Account a2 = new AccountSafe(10000);
Account.demo(a2);
System.out.print("[compareAndSet => ");
Account a3 = new AccountCAS(10000);
Account.demo(a3);
System.out.println();
}
}

测试脚本:

1
for ($i=0; $i -le 10; $i++) { java top.parak.none.AccountDemo }

运行结果:


4.1.5 compareAndSet

compareAndSet,简称CAS(也有Compare And Swap的说法),它必须是原子操作。

1
2
3
4
5
6
7
8
9
10
11
while (true) {
// 获取余额的最新值
int prev = this.balance.get();
// 修改后的余额
int next = prev - amount;
// 同步到主存
// CAS(预期值,修改值) => boolean(是否修改成功)
if (this.balance.compareAndSet(prev, next)) {
break;
}
}

4.2 CAS与volatile

4.2.1 volatile

获取共享变量时,为了保证该变量的可见性,需要使用volatile修饰。

它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile变量都是操作主存。即一个线程对volatile变量的修改,对另一个线程可见。

CAS必须借助volatile才能读取到共享变量的最新值来实现【比较并交换】的效果。

4.2.2 为什么CAS+重试效率高?

  • 在无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
  • 但是,在无锁情况下,因为线程要保持运行,需要额外CPU的支持,CPU在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然也不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

4.2.3 CAS应用场景

结合CAS和volatile可以实现无锁并发,适用于线程数少、多核CPU的场景下:

  • CAS是基于乐观锁的思想:非常乐观,假设没有别的线程来修改共享变量,如果其他线程修改了当前线程就再次重试。
  • synchronized是基于悲观锁的思想:非常悲观,提防其他线程来修改共享变量,当前线程获取资源就立马上锁,其他争抢资源失败的线程进入阻塞状态,修改结束才开锁,
  • CAS体现的是无锁并发、无阻塞并发
    • 因为没有使用无锁并发、无阻塞并发,所以线程不会陷入阻塞。
    • 但是如果竞争激烈,重试必然 频繁发生,反而效率会收到影响。

4.2.4 CAS特点

优点:

  • 可以保证变量操作的原子性
  • 并发量低时,CAS效率高于synchronized
  • 在线程对共享资源占用时间较短的情况下,使用CAS机制效率也会较高

缺点:

  • 无法解决ABA问题
  • 可能会消耗较高的CPU
  • 不能保证代码块的原子性

4.3 原子整数

JUC提供如下原子整数类:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

AtomicInteger常用API如下:


4.4 原子引用

JUC提供如下原子引用类:

  • AtomicReference:普通原子引用类型,对对象进行原子操作
  • AtomicStampedReference:带int类型版本戳的原子引用类型,记录更改次数
  • AtomicMarkableReference:带boolean类型版本戳的原子引用类型,记录是否更改

作用:保证引用类型的共享变量是线程安全的。

使用BigDemical实现提取款问题的线程安全解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class DecimalAccountCAS implements DecimalAccount {
private AtomicReference<BigDecimal> balance;

public DecimalAccountCAS(BigDecimal balance) {
this.balance = new AtomicReference<>(balance);
}

@Override
public BigDecimal getBalance() {
return balance.get();
}

@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = balance.get();
BigDecimal next = prev.subtract(amount);
if (balance.compareAndSet(prev, next)) {
break;
}
}
}
}

interface DecimalAccount {
BigDecimal getBalance();
void withdraw(BigDecimal amount);
}

4.4.1 ABA问题

如下程序所示,虽然在other方法中存在两个线程对共享变量进行了修改,但是经过了两轮修改又变成了原值,main线程对修改共享变量的过程是不可见的,这种操作对业务代码并无影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j(topic = "ABAAtomicReference")
public class ABAAtomicReferenceDemo {
private static AtomicReference<String> ref = new AtomicReference<>("A");

public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
String prev = ref.get();
other();
TimeUnit.SECONDS.sleep(1);
log.debug("change A -> K ? {}", ref.compareAndSet(prev, "K"));
}

private static void other() {
new Thread(() -> log.debug("change A -> B ? {}", ref.compareAndSet(ref.get(), "B")), "B").start();
new Thread(() -> log.debug("change B -> A ? {}", ref.compareAndSet(ref.get(), "A")), "A").start();
}
}

运行结果:

1
2
3
4
2021-04-27 17:29:12.538 [main] DEBUG ABAAtomicReference - main start...
2021-04-27 17:29:12.575 [A] DEBUG ABAAtomicReference - change B -> A ? true
2021-04-27 17:29:12.575 [B] DEBUG ABAAtomicReference - change A -> B ? true
2021-04-27 17:29:13.580 [main] DEBUG ABAAtomicReference - change A -> K ? true

虽然ABA对业务没有影响,但是如何让主线程感知到其他线程的修改呢?

4.4.2 AtomicStampedReference

解决ABA问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j(topic = "ABAAtomicStampedReference")
public class ABAAtomicStampedReferenceDemo {
private static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
int stamp = ref.getStamp();
other();
TimeUnit.SECONDS.sleep(1);
boolean res = ref.compareAndSet(ref.getReference(), "K", stamp, stamp + 1);
log.debug("change A -> K ? {}", res);
}

private static void other() {
new Thread(() -> {
int stamp = ref.getStamp();
boolean res = ref.compareAndSet(ref.getReference(), "B", stamp, stamp + 1);
log.debug("change A -> B ? {}", res);
}).start();
new Thread(() -> {
int stamp = ref.getStamp();
boolean res = ref.compareAndSet(ref.getReference(), "A", stamp, stamp + 1);
log.debug("change B -> A ? {}", res);
}).start();
}
}

运行结果:

1
2
3
4
2021-04-27 18:18:00.754 [main] DEBUG ABAAtomicStampedReference - main start...
2021-04-27 18:18:00.787 [A] DEBUG ABAAtomicStampedReference - change B -> A ? true
2021-04-27 18:18:00.787 [B] DEBUG ABAAtomicStampedReference - change A -> B ? true
2021-04-27 18:18:01.792 [main] DEBUG ABAAtomicStampedReference - change A -> K ? false

4.4.3 AtomicMarkableReference

不关心引用变量更改了几次,只是单纯的关心是否更改过。

案例:

家里有清洁机器人和保洁阿姨,垃圾袋满时,需要更换,机器人换了阿姨则不需要换,反之亦然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Slf4j(topic = "ABAAtomicMarkableReference")
public class ABAAtomicMarkableReferenceDemo {
private static GarbageBag bag = new GarbageBag("装满垃圾");
private static AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);

public static void main(String[] args) throws InterruptedException {
log.debug("家里需要换垃圾袋...");
GarbageBag prev = ref.getReference();
robot();
TimeUnit.MILLISECONDS.sleep(10);
aunt();
TimeUnit.MILLISECONDS.sleep(10);
log.debug(ref.getReference().toString());
}

// 清洁机器人
private static void robot() {
new Thread(() -> {
log.debug("清洁机器人开始打扫卫生...");
boolean res = ref.compareAndSet(ref.getReference(), new GarbageBag("新垃圾袋"),
true, false);
log.debug("机器人是否换了垃圾袋 ? {}", res);
}, "robot").start();
}

// 保洁阿姨
private static void aunt() {
new Thread(() -> {
log.debug("保洁阿姨开始打扫卫生...");
bag.setDesc("空垃圾袋");
boolean res = ref.compareAndSet(ref.getReference(), new GarbageBag("新垃圾袋"),
true, false);
log.debug("阿姨是否换了垃圾袋 ? {}", res);
}, "aunt").start();
}
}

class GarbageBag {
String desc;

public GarbageBag(String desc) { this.desc = desc; }

public void setDesc(String desc) { this.desc = desc; }

@Override
public String toString() { return "GarbageBag[desc='" + desc + "']"; }
}

运行结果:

1
2
3
4
5
6
2021-04-27 20:01:20.764 [main] DEBUG ABAAtomicMarkableReference - 需要换垃圾袋...
2021-04-27 20:01:20.796 [robot] DEBUG ABAAtomicMarkableReference - 清洁机器人开始打扫卫生...
2021-04-27 20:01:20.796 [robot] DEBUG ABAAtomicMarkableReference - 机器人是否换了垃圾袋 ? true
2021-04-27 20:01:20.809 [aunt] DEBUG ABAAtomicMarkableReference - 保洁阿姨开始打扫卫生...
2021-04-27 20:01:20.809 [aunt] DEBUG ABAAtomicMarkableReference - 阿姨是否换了垃圾袋 ? false
2021-04-27 20:01:20.823 [main] DEBUG ABAAtomicMarkableReference - GarbageBag[desc='新垃圾袋']

4.5 原子数组

JUC提供如下原子数组类:

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

作用:保证数组内的元素的线程安全。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Slf4j(topic = "AtomicArray")
public class AtomicArrayDemo {
public static void main(String[] args) {
demo(
() -> new int[10],
array -> array.length,
(array, index) -> array[index]++,
array -> log.debug("普通数组:{}", Arrays.toString(array))
);
demo(
() -> new AtomicIntegerArray(10),
AtomicIntegerArray::length,
AtomicIntegerArray::getAndIncrement,
array -> log.debug("安全数组:{}", array)
);
}

/**
* @param arraySupplier 提供数组
* @param lengthFunction 获取数组长度的方法
* @param putConsumer 指定元素的自增方法
* @param printConsumer 打印数组元素的方法
* @apiNote
* <p> Supplier 提供者 无中生有 () -> 结果 </p>
* <p> Function 函数 一个参数一个结果 (参数) -> 结果 | BiFunction (参数1,参数2) -> 结果 </p>
* <p> Consumer 消费者 一个参数没有结果 (参数) -> Void | BiConsumer (参数1,参数2) -> Void </p>
*/
private static <T> void demo(Supplier<T> arraySupplier, Function<T, Integer> lengthFunction,
BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer) {
List<Thread> list = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFunction.apply(array);

for (int i = 0; i < length; i++) {
list.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) { // 正确结果应该是数组元素都为10000
putConsumer.accept(array, j % length);
}
}));
}

list.forEach(Thread::start);
list.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
printConsumer.accept(array);
}
}

运行结果:

1
2
2021-04-27 21:09:01.160 [main] DEBUG AtomicArray - 普通数组:[6531, 6533, 6501, 6566, 6515, 6508, 6499, 6519, 6489, 6527]
2021-04-27 21:09:01.166 [main] DEBUG AtomicArray - 安全数组:[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

4.6 字段更新器

JUC提供如下字段更新器:

  • AtomicReferenceFeildUpdater:引用类型的属性
  • AtomicIntegerFieldUpdater:整形的属性
  • AtomicLongFeildUpdater:长整形的属性

注意:利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合volatile修饰的字段使用,否则会出现异常。

1
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j(topic = "AtomicFieldUpdater")
public class AtomicFieldUpdaterDemo {
public static void main(String[] args) {
Student stu = new Student();
AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
log.debug("update ? {}", updater.compareAndSet(stu, null, "RubbishK"));
log.debug("update ? {}", updater.compareAndSet(stu, stu.getName(), "FlowerK"));
log.debug(stu.toString());
}
}

class Student {
volatile String name;

public String getName() { return name; }

@Override
public String toString() { return "Student[name='" + name + "']"; }
}

运行结果:

1
2
3
2021-04-27 21:36:51.784 [main] DEBUG AtomicFieldUpdater - update ? true
2021-04-27 21:36:51.786 [main] DEBUG AtomicFieldUpdater - update ? true
2021-04-27 21:36:51.786 [main] DEBUG AtomicFieldUpdater - Student[name='FlowerK']

4.7 原子累加器

JUC提供如下原子累加器:

  • LongAddr
  • LongAccumulator
  • DouleAddr
  • DoubleAccumulator

4.7.1 累加性能比较

累加性能比较AtomicLongLongAddr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j(topic = "Compare")
public class PerformanceCompareDemo {
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(AtomicLong::new, AtomicLong::getAndIncrement);
}
for (int i = 0; i < 5; i++) {
demo(LongAdder::new, LongAdder::increment);
}
}

private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer) {
T adder = supplier.get();
long start = System.nanoTime();
List<Thread> list = new ArrayList<>();
for (int i = 0; i < 40; i++) {
list.add(new Thread(() -> {
for (int k = 0; k < 50_0000; k++) {
consumer.accept(adder);
}
}));
}
list.forEach(Thread::start);
list.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
log.debug("{} cost: {}(ns)", adder.getClass().getSimpleName(), (end - start));
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
2021-04-27 22:34:29.842 [main] DEBUG Compare - AtomicLong cost: 363865900(ns)
2021-04-27 22:34:30.176 [main] DEBUG Compare - AtomicLong cost: 331326300(ns)
2021-04-27 22:34:30.565 [main] DEBUG Compare - AtomicLong cost: 388361700(ns)
2021-04-27 22:34:30.961 [main] DEBUG Compare - AtomicLong cost: 396090500(ns)
2021-04-27 22:34:31.349 [main] DEBUG Compare - AtomicLong cost: 386800900(ns)
2021-04-27 22:34:31.404 [main] DEBUG Compare - LongAdder cost: 53539000(ns)
2021-04-27 22:34:31.438 [main] DEBUG Compare - LongAdder cost: 33946400(ns)
2021-04-27 22:34:31.479 [main] DEBUG Compare - LongAdder cost: 40203000(ns)
2021-04-27 22:34:31.511 [main] DEBUG Compare - LongAdder cost: 32314300(ns)
2021-04-27 22:34:31.546 [main] DEBUG Compare - LongAdder cost: 34245400(ns)

可以发现,LongAddr的速度要比AtomicLong高出一个数量级。

4.7.2 LongAdder源码分析

先贴一下前辈的主页:http://gee.cs.oswego.edu

作为并发大师@Doug lea 的作品LongAdder,它的设计非常精巧。

LongAdder类有几个关键域:

1
2
3
4
5
6
7
8
// 累加单元数组,懒惰初始化
transient volatile Cell[] cells;

// 基础值,如果没有竞争,则用CAS累加这个域
transient volatile long base;

// 在cells创建或者扩容时,置为1,表示加锁
transient volatile int cellsBusy;

4.7.2.1 CAS锁

切勿使用生产环境。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j(topic = "LockCAS")
public class LockCASDemo {
// 0表示没加锁
// 1表示加了锁
private final AtomicInteger state = new AtomicInteger(0);

private void lock() {
while (true) {
if (state.compareAndSet(0, 1)) {
break;
}
}
}

public void unlock() {
log.debug("unlock...");
state.set(0);
}
}

4.7.2.1 原理之伪共享

Cell即为累加单元。

1
2
3
4
5
6
7
8
9
10
11
12
// 防止缓存行伪共享(一个缓存行容纳多个Cell对象)
@Sun.misc.Contented
static final class Cell {
volatile long value;
Cell(long x) { value = x; }

// 最重要的方法,用CAS方式进行累加,prev表示旧值,next表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// ...
}

从缓存说起,缓存与内存的速度比较:


从CPU到 大约需要的时钟周期
寄存器 1 cycle(4GHz的CPU约为0.25ns)
L1 3~4 cycle
L2 10~20 cycle
L3 40~45 cycle
内存 120~240 cycle

因为CPU与内存的速度差异很大,需要靠预读数据至缓存来提升效率。

而缓存以缓存行为单位,每个缓存对应着一块内存,一般是64 byte(8个long)。

缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中。

CPU要保证数据的一致性,如果某个CPU核心更改了数据,其他CPU核心对应的整个缓存行必须失效。


因为Cell是数组形式,在内存中是连续存储的,一个Cell为24字节(16字节的对象头和8字节的value),因此缓存行可以存下2个的Cell对象。这样问题来了;

  • Core-0要修改Cell[0]
  • Core-1要修改Cell[1]

无论谁修改成功,都会导致对方Core的缓存行失效,比如Core-0中Cell[0] = 6000,Cell[1] = 8000。要累加Cell[0] = 6001,Cell[1] = 8000,这时会让Core-1缓存行失效。

@sun.misc.Contented用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,从而让CPU将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效。


4.7.2.2 主要方法

add方法:

1
2
3
4
5
6
7
8
9
10
11
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null /* cells是否为空 */
|| !casBase(b = base, b + x) /* cas base累加,成功则不会进入if代码块 */) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null || /* 当前线程cell是否创建 */
!(uncontended = a.cas(v = a.value, v + x))) /* cas base累加,失败则进入 */
longAccumulate(x, null, uncontended);
}
}

longAccumulate方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) { // 不断尝试
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
// (1)cells已创建,Cell未创建
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // 尝试获取cells数组
Cell r = new Cell(x); // 乐观创建Cell对象
if (cellsBusy == 0 && casCellsBusy()) {
// 当前无人上锁,自己尝试上锁
boolean created = false; // 是否已创建
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 再次检查数组数组不为空且长度大于0
// 且数组中空置的槽位为空
rs[j] = r; // 将创建的新的Cell对象填充到数组的空槽位
created = true;
}
} finally {
cellsBusy = 0; // 释放锁
}
if (created)
break; // 创建成功则退出循环
continue; // Slot is now non-empty
}
}
collide = false;
}
// (2)cells已创建。Cell已创建
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// 对累加单元a进行CAS+x,操作成功则退出
break;
else if (n >= NCPU || cells != as)
// 检查当前n是否超过机器的CPU上限
collide = false; // At max size or stale
else if (!collide)
// 上个条件匹配之后,下次循环就走这个判断,不会进入下面的扩容逻辑
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
// 扩容
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1]; // 左移一位,扩容两倍
for (int i = 0; i < n; ++i)
rs[i] = as[i]; // 将旧数组拷贝到新数组
cells = rs; // 替换cells
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h); // 以上条件均不匹配,改变线程对应的cell对象
}
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// (3)cells数组未创建,三个条件如下
// cellsBusy是标记位,0代表未加锁,1代表已加锁
// cells == as代表没有其他线程改变cells数组,as是第一次if判断读取到的数组引用
// casCellsBusy()这个方法的作用就是尝试通过CAS将cellsBusy从0改为1,成功说明加锁成功
boolean init = false; // 是否初始化
try { // Initialize table
if (cells == as) { // 再次判断是否有其他线程修改了cells
Cell[] rs = new Cell[2]; // 初始大小为2
rs[h & 1] = new Cell(x); // 赋初始值1
cells = rs; // 将刚刚创建的数组赋值给成员变量
init = true;
}
} finally {
cellsBusy = 0; // 将标记位设为0,代表解锁
}
if (init) // 初始化成功则退出循环
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
// 加锁失败,进行cas base累加,成功则break,失败则继续循环
break; // Fall back on using base
}
}



sum方法:

1
2
3
4
5
6
7
8
9
10
11
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) { // 判空
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) // 对每个元素判空
sum += a.value; // 累加每个单元值
}
}
return sum;
}

4.8 Unsafe

4.8.1 概述

Unsafe对象提供了非常底层的,操作内存、线程的办法,Unsafe对象不能直接调用,只能通过反射获得:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class UnsafeAccessor {
private static Unsafe unsafe;

static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}

static Unsafe getUnsafe() {
return unsafe;
}
}

4.8.2 CAS操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class UnsafeDemo {
public static void main(String[] args) throws NoSuchFieldException {
Unsafe unsafe = UnsafeAccessor.getUnsafe();
Field id = User.class.getDeclaredField("id");
Field name = User.class.getDeclaredField("name");
// 获得域的偏移地址
long idOffset = unsafe.objectFieldOffset(id);
long nameOffset = unsafe.objectFieldOffset(name);
// 使用CAS方法替换成员变量
User user = new User();
unsafe.compareAndSwapInt(user, idOffset, 0, 1);
unsafe.compareAndSwapObject(user, nameOffset, null, "KHighness");
System.out.println(user);
}
}

class User {
volatile int id;
volatile String name;

public int getId() { return id; }

public void setId(int id) { this.id = id; }

public String getName() { return name; }

public void setName(String name) { this.name = name; }

@Override
public String toString() {
return "User[" +
"id=" + id +
", name='" + name + '\'' +
']';
}
}

运行结果:

1
User[id=1, name='KHighness']

4.8.3 自定义原子实现类

使用自定义的AtomicData实现之前线程安全的原子整数Account实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class KAtomicInteger implements Account{
private volatile int value;
private static long valueOffset;
private static final Unsafe UNSAFE;

static {
UNSAFE = UnsafeAccessor.getUnsafe();
try {
valueOffset = UNSAFE.objectFieldOffset(KAtomicInteger.class.getDeclaredField("value"));
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}

public KAtomicInteger(int value) {
this.value = value;
}

public int getValue() {
return value;
}

public void decrement(int amount) {
while (true) {
int prev = this.value;
int next = prev - amount;
if (UNSAFE.compareAndSwapInt(this, valueOffset, prev, next)) {
break;
}
}
}

@Override
public Integer geBalance() {
return getValue();
}

@Override
public void withdraw(Integer amount) {
decrement(amount);
}
}