8.1 CountDownLatch

CountDownLatch计数器

特点:内部计数器递减。

功能:主线程等待所有子线程执行完毕进行汇总。

8.1.1 案例

任务分解,第三个任务需要等待第一个任务和第二个任务执行计算后进行汇总。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
* @author KHighness
* @since 2021-05-07
*/

@Slf4j(topic = "CountDownLatch")
public class CountDownLatchDemo {
private static int total = 0;
private static final CountDownLatch countDownLatch = new CountDownLatch(2);
private static final ExecutorService executorService = Executors.newFixedThreadPool(3);

private static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
executorService.submit(() -> {
log.debug("state = {}", countDownLatch.getCount());
total += 1;
sleep(1, TimeUnit.SECONDS);
log.debug("{} run over", Thread.currentThread().getName());
countDownLatch.countDown();
log.debug("state = {}", countDownLatch.getCount());
});
executorService.submit(() -> {
log.debug("state = {}", countDownLatch.getCount());
total += 2;
sleep(1, TimeUnit.SECONDS);
log.debug("{} run over", Thread.currentThread().getName());
countDownLatch.countDown();
log.debug("state = {}", countDownLatch.getCount());
});
executorService.submit(() -> {
log.debug("state = {}", countDownLatch.getCount());
try {
countDownLatch.await();
log.debug("result: total = {}", total);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{} run over", Thread.currentThread().getName());
});
executorService.shutdown();
sleep(3, TimeUnit.SECONDS);
executorService.shutdownNow();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
2021-05-08 12:14:58.374 [pool-1-thread-1] DEBUG CountDownLatch - state = 2
2021-05-08 12:14:58.374 [pool-1-thread-2] DEBUG CountDownLatch - state = 2
2021-05-08 12:14:58.374 [pool-1-thread-3] DEBUG CountDownLatch - state = 2
2021-05-08 12:14:59.379 [pool-1-thread-2] DEBUG CountDownLatch - pool-1-thread-2 run over
2021-05-08 12:14:59.379 [pool-1-thread-1] DEBUG CountDownLatch - pool-1-thread-1 run over
2021-05-08 12:14:59.380 [pool-1-thread-2] DEBUG CountDownLatch - state = 1
2021-05-08 12:14:59.380 [pool-1-thread-1] DEBUG CountDownLatch - state = 0
2021-05-08 12:14:59.380 [pool-1-thread-3] DEBUG CountDownLatch - result: total = 3
2021-05-08 12:14:59.380 [pool-1-thread-3] DEBUG CountDownLatch - pool-1-thread-3 run over

8.1.2 原理

类图


(1)构造方法

入参:count,会将计数器值count赋给了AQS的状态变量state

1
2
3
4
5
6
7
8
9
10
// CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

// Sync
Sync(int count) {
setState(count);
}

(2)void await()方法

当线程调用CountDownLatchawait方法后,当前线程就会阻塞,直到:

  • 所有线程都调用了CountDownLatch对象的countDown方法后,即计数器值为0时
  • 其他线程调用了当前线程的interrupt方法中断了当前线程,当前线程抛出InterruptedException异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// CountDownLatch
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

// AQS
// 获取共享资源时可被中断
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 如果线程被中断即抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 查看当前计数器值是否为0,为0则直接返回,否则进入AQS的队列等待
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

// Sync
// 实现的AQS接口
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

(3)boolean await(long timeout, TimeUnit unit)方法

当线程调用了CountDownLatch的该方法后,当前线程会被阻塞,直到:

  • 所有线程都调用了CountDownLatch对象的countDown方法后,即计数器值为0时
  • 设置的timeout时间到了,因为超时返回false
  • 其他线程调用了当前线程的interrupt方法中断了当前线程,当前线程抛出InterruptedException异常
1
2
3
4
5
// CountDownLatch
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

(4)void countDown()方法

线程调用该方法后,计数器的值递减,递减后如果计数器的值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做。

如果state原始值为n,有(n+1)个线程调用了countDown方法,那么第(n+1)个线程调用无效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// CountDownLatch
public void countDown() {
sync.releaseShared(1);
}

// AQS
public final boolean releaseShared(int arg) {
// 调用Sync的实现,成功则唤醒阻塞的线程
if (tryReleaseShared(arg)) {
// AQS释放资源
doReleaseShared();
return true;
}
return false;
}

// Sync
protected boolean tryReleaseShared(int releases) {
// 循环CAS使计数器(状态值state)减1并更新,直到成功
for (;;) {
int c = getState();
// 防止state变成负数
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

(5)long getCount()方法

获取当前计数器的值,即AQS的state值,一般用于测试。

1
2
3
4
5
6
7
8
9
// CountDownLatch
public long getCount() {
return sync.getCount();
}

// AQS
int getCount() {
return getState();
}

8.1.3 小结

CountDownLatch相比于使用线程的join方法来实现线程间同步,前者更具有灵活性和方便性,因为在ExecutorService线程池中无法直接调用其他线程的join方法。

CountDownLatch使用AQS的状态变量state来存放计数器的值。首先在初始化设置计数器值(AQS状态值),多个线程调用countDown方法实际是原子性递减AQS的状态值。当线程调用await方法后当前线程会被放入AQS的阻塞队列等待,待计数器为0再返回。其他线程调用countDown方法让计数器值递减1,当计数器值变成0时,当前线程还要调用AQS的doReleaseSShared方法来激活由于调用await方法而被阻塞的线程。

8.2 CyclicBarrier

CyclicBarrier回环^1屏障^2

特点:内部计数器递减。

功能:让一组线程全部达到一个状态后再全部同时执行。

[^ 1]: 回环是因为当所有线程执行完毕,并重置CylicBarrier的状态以便重用。

[^ 2]:线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。

8.2.1 案例

一个任务需要三步完成,需要执行多个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j(topic = "CyclicBarrier")
public class CyclicBarrierDemo {
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {log.debug("==========================");});
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);

private static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
executorService.submit(() -> {
try {
log.debug("{} first step", Thread.currentThread().getName());
sleep(1, TimeUnit.SECONDS);
cyclicBarrier.await();
log.debug("{} second step", Thread.currentThread().getName());
sleep(1, TimeUnit.SECONDS);
cyclicBarrier.await();
log.debug("{} third step", Thread.currentThread().getName());
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
2021-05-08 16:28:58.549 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 first step
2021-05-08 16:28:58.549 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 first step
2021-05-08 16:28:58.549 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 first step
2021-05-08 16:28:59.558 [pool-1-thread-2] DEBUG CyclicBarrier - ==========================
2021-05-08 16:28:59.559 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 second step
2021-05-08 16:28:59.559 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 second step
2021-05-08 16:28:59.559 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 second step
2021-05-08 16:29:00.570 [pool-1-thread-2] DEBUG CyclicBarrier - ==========================
2021-05-08 16:29:00.570 [pool-1-thread-2] DEBUG CyclicBarrier - pool-1-thread-2 third step
2021-05-08 16:29:00.570 [pool-1-thread-1] DEBUG CyclicBarrier - pool-1-thread-1 third step
2021-05-08 16:29:00.570 [pool-1-thread-3] DEBUG CyclicBarrier - pool-1-thread-3 third step

8.2.2 原理

类图


CyclicBarrier基于独占锁实现,本质底层还是基于AQS的。

属性:

  • lock:独占锁。

  • trip:条件变量。

  • barrierCommand:到达屏障点执行的任务。

  • parties:线程计数器,这里表示多少线程调用await后,所有线程才会冲破屏障继续向下允许。

  • count:执行记录器,一开始等于parties,每当有线程调用await就递减1,当count为0时就表示所有线程都到了屏障点。

parties始终用来记录总的线程个数,当count计数器值变为0后,会将parties的值赋给count,进而进行服用。

内部类Generation仅有一个属性broken,用来记录当前屏障是否被打破。是在锁内使用变量,所以并没有声明为volatile

(1)构造方法

入参:parties(必选)、barrierAction(可选)

1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

(2)int await()方法

当前线程调用了CyclicBarrier的该方法时会被阻塞,直到:

  • parties个线程都调用了await方法,线程到达屏障点
  • 其他线程调用了当前线程的interrupt方法中断了当前线程,则当前线程抛出InterruptedExcetion异常
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常

内部调用dowait方法,第一个参数为false说明不设置超时时间

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

(3)boolean await(long timeout, TimeUnit unit)方法

当先线程调用了CyclicBarrier的该方法时会被阻塞,直到:

  • parties个线程都调用了await方法,也就是线程都到了屏障点,这时候返回true
  • 设置的timeout时间到了,因为超时返回false
  • 其他线程调用当前线程的interrupt方法中断了当前线程,则当前线程会抛出InterruptedException
  • 与当前屏障点关联的Generation对象的broken标志被设置为true时,会抛出BrokenBarrierException异常

内部调用dowait方法,第一个参数说明设置超时,第二个参数是超时时间

1
2
3
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

(3)int dowait(boolean timed, long nanos)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
...

// (1)如果index=0则说明所有线程都到了屏障点,此时执行初始化时传递的任务
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
// (2) 执行任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// (3)激活其他因调用await方法而被阻塞的线程,并且重置CyclicBarrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// (4)如果index!=0
for (;;) {
try {
// (5)未设置超时时间
if (!timed)
trip.await();
// (6)设置了超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
...
}
...
}
} finally {
lock.unlock();
}
}

private void nextGeneration() {
// (7)唤醒条件队列中的阻塞线程
trip.signalAll();
// (8)重置CyclicBarrier
count = parties;
generation = new Generation();
}

8.2.3 小结

CycleBarrierCountDownLatch的不同之处在于,前者是可以复用的,并且前者特别适合分段任务有序执行的场景。

内部通过ReentrantLock独占锁实现计数器原子性更新,并使用条件变量队列来实现线程同步。

8.3 Semaphore

Semaphore信号量

特点:内部计数器递增。

功能:限制能同时访问共享资源的线程上限。

8.3.1 案例

主线程等待两个子任务执行完毕。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j(topic = "CountDownLatch")
public class CountDownLatchDemo {
private static final CountDownLatch countDownLatch = new CountDownLatch(2);
private static final ExecutorService executorService = Executors.newFixedThreadPool(2);

private static void sleep(int timeout, TimeUnit unit) {
try {
unit.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException {
executorService.submit(() -> {
sleep(1, TimeUnit.SECONDS);
countDownLatch.countDown();
});
executorService.submit(() -> {
sleep(1, TimeUnit.SECONDS);
countDownLatch.countDown();
});
log.debug("wait all child thread over");
countDownLatch.await();
log.debug("all child thread over");
}
}

8.3.2 原理

类图


Semaphore还是使用AQS实现的,Sync只是对AQS的一个修饰,并且Sync有两个实现类,用来指定获取信号量时是否采用公平策略。

(1)构造方法

入参:permits(必选)、fair(可选)

Semphore默认采用非公平策略,如果需要使用公平策略需要使用双参构造方法。初始化信号量个数permits被赋给了AQS的状态变量state

1
2
3
4
5
6
7
8
9
// Semophore
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

// Sync
Sync(int permits) {
setState(permits);
}

(2)void acquire()方法

当前线程调用该方法的目的是希望获取一个信号量资源。

如果当前信号量个数大于0,则当前信号量的计数会减1,然后直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void acquire() throws InterruptedException {
// 传递参数为1,说明要获取一个信号量资源
sync.acquireSharedInterruptibly(1);
}

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// (1)如果线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// (2)否则调用Sync子类方法尝试获取,这里根据构造方法确定NonfairSync还是FairSync
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

对于tryAcquireShared,分两种情况讨论:

  1. 非公平锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // NonfairSync
    protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
    }

    // Sync
    final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
    // 获取当前信号量
    int available = getState();
    // 计算当前剩余量
    int remaining = available - acquires;
    // 如果当前剩余值小于0或者CAS设置成功则返回
    if (remaining < 0 ||
    compareAndSetState(available, remaining))
    return remaining;
    }
    }

    先获取当前信号量,然后减去需要获取的值,得到剩余信号量个数,如果剩余信号量小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。

    另外,由于NonfairSync是非公平获取的,也就是说先调用acquire方法获取信号量的线程不一定比后来者先获取到信号量。

  2. 公平锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    protected int tryAcquireShared(int acquires) {
    for (;;) {
    // 先检查阻塞队列中是否有前驱结点
    if (hasQueuedPredecessors())
    return -1;
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 ||
    compareAndSetState(available, remaining))
    return remaining;
    }
    }

    AQS公平性的保证就靠hasQueuedPredecessors这个方法,如果当前线程节点的前驱结点是否也在等待获取该资源,是则放弃自己获取信号量的资格。

(3)void acquire(int permits)方法

该方法与acquire()方法不同,后者只需要获取一个信号量值,而前者则获取perimits个。

1
2
3
4
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

(4)void acquireUninterruptibly()方法

该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly获取资源时,其他线程调用了当前线程的interrupt方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException异常而返回。

(5)void accquireUninterruptibly(int permits)方法

该方法与acquire(int permits)方法的不同之处在于,该方法对中断不响应。

1
2
3
4
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

(6)void release()方法

该方法的作用是把当前Semaphore对象的信号量增加1,如果当前有线程因为调用acquire方法被阻塞而被放入AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Semaphore
public void release() {
// (1)arg=1
sync.releaseShared(1);
}

// AQS
public final boolean releaseShared(int arg) {
// (2) 尝试释放资源锁
if (tryReleaseShared(arg)) {
// (3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程
doReleaseShared();
return true;
}
return false;
}

// Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// (4)获取当前信号量值
int current = getState();
// (5)当前信号量值+1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// (6)通过CAS更新信号量的值
if (compareAndSetState(current, next))
return true;
}
}

(7)void release(int permits)方法

该方法与release()的不同之处在于,前者让信号量加permits,后者加1

8.3.3 小结

Semaphore完全可以达到CountDownLatch的效果,但是Semaphore的计数器是不可以自动重置的,不过通过变相的改变acquire方法的参数还是可以实现CyclicBarrier的功能的。

8.4 经典题目

8.4.1 交错执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* A,B,C三个线程有序交替执行
*
* @author KHighness
* @since 2021-05-07
*/
@Slf4j(topic = "Alternately")
public class AlternatelyDemo {
/**
* 三个信号量分别控制A,B,C的打印
*/
private static final Semaphore[] s = { new Semaphore(1), new Semaphore(1), new Semaphore(1)};
private static final int size = 3;
private static final char[] arr = {'A', 'B', 'C'};
private static final ExecutorService executorService = Executors.newFixedThreadPool(3);

public static void main(String[] args) throws InterruptedException {
s[1].acquire();
s[2].acquire();
for (int i = 0; i < size; i++) {
final int finalI = i;
executorService.submit(() -> {
while (true) {
try {
s[finalI].acquire();
TimeUnit.MILLISECONDS.sleep(100);
log.debug("{} => [{}]", arr[finalI], System.nanoTime());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s[(finalI + 1) % size].release();
}
}
});
}
}
}

8.4.2 生产者-消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/**
* 生产者-消费者模型
*
* @author KHighness
* @since 2021-08-03
*/
@Slf4j(topic = "SemaphoreProducerConsumerModel")
public class ProducerConsumerDemo {
/** 数据队列 */
private final static Queue<Character> QUEUE = new LinkedList<>();
/** 最大容量 */
private final static int QUEUE_MAX_SIZE = 5;
/** 随机变量 */
private final static ThreadLocalRandom RANDOM = ThreadLocalRandom.current();
/** 表示可消费的资源数 */
private final static Semaphore FULL = new Semaphore(0);
/** 表示队列的剩余空间 */
private final static Semaphore EMPTY = new Semaphore(QUEUE_MAX_SIZE);
/** 产品队列访问互斥的信号量 */
private final static Semaphore MUTEX = new Semaphore(1);

/**
* 生成随机数字/字母字符
* ascii char
* 48-57 0~9
* 65-90 A~Z
* 97-122 a~z
* @return ascii码字符
*/
private static Character randomChar() {
int choice = RANDOM.nextInt(3);
if (choice == 0)
return (char) (RANDOM.nextInt(10) + 48);
else if (choice == 1)
return (char) (RANDOM.nextInt(26) + 65);
else
return (char) (RANDOM.nextInt(26) + 97);
}

/**
* 生产者
*/
private static class Producer extends Thread {
public Producer() {
super("生产者");
}

@SneakyThrows
@Override
public void run() {
while (true) {
Character c = randomChar(); // 生产数据
TimeUnit.SECONDS.sleep(1); // 模拟延时
EMPTY.acquire(); // 请求空间
MUTEX.acquire(); // 请求访问
log.debug("生产 => [{}]", c); // 输出日志
QUEUE.add(c); // 放入队列
MUTEX.release(); // 释放队列
FULL.release(); // 数据增加
}
}
}

/**
* 消费者
*/
private static class Consumer extends Thread {
public Consumer() {
super("消费者");
}

@SneakyThrows
@Override
public void run() {
while (true) {
FULL.acquire(); // 请求数据
MUTEX.acquire(); // 请求访问
TimeUnit.SECONDS.sleep(1); // 模拟延时
Character c = QUEUE.poll(); // 消费数据
log.debug("消费 => [{}]", c); // 输出日志
MUTEX.release(); // 释放队列
EMPTY.release(); // 空间增加
}
}
}

public static void main(String[] args) {
new Producer().start();
new Consumer().start();
}
}

8.4.3 读写者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 读写者问题
* 有读者和写者两组并发进程,共享一个文件,当两个或以上的读进程同时访问共享数据时
* 不会产生副作用,但若某个写进程和其他进程(读进程或写进程)同时访问共享数据时则
* 可能导致数据不一致的错误。
*
* <p>要求:
* <li>(1) 允许多个读者可以同时对文件执行读操作
* <li>(2) 只允许一个写者往文件中写信息
* <li>(3) 任一写者在完成写操作之前不允许其他读者或写者工作
* <li>(4) 写者执行写操作前,应让已有的读者和写者全部退出
*
* @author KHighness
* @since 2021-08-03
*/
@Slf4j(topic = "ReaderWriterDemo")
public class ReaderWriterDemo {
/** 用于读写者互斥访问文件 */
private final static Semaphore FILE_MUTEX = new Semaphore(1);
/** 当前读者数量 */
private static int READER_COUNT = 0;
/** 对READER_COUNT操作的互斥变量 */
private final static Semaphore COUNT_MUTEX = new Semaphore(1);

/**
* 写者
*/
private static class Writer extends Thread {
public Writer() {
super("写者");
}

@SneakyThrows
@Override
public void run() {
while (true) {
FILE_MUTEX.acquire(); // 请求访问文件
log.debug("写入文件"); // 输出日志
TimeUnit.SECONDS.sleep(1);
FILE_MUTEX.release(); // 释放共享文件
}
}
}

private static class Reader extends Thread {
public Reader() {
super("读者");
}

@SneakyThrows
@Override
public void run() {
while (true) {
COUNT_MUTEX.acquire(); // 请求访问COUNT
if (READER_COUNT == 0) // 第一个读进程
FILE_MUTEX.acquire(); // 阻止写进程写
READER_COUNT++; // 读者计数器递增
COUNT_MUTEX.release(); // 恢复访问COUNT
log.debug("阅读文件"); // 输出日志
TimeUnit.SECONDS.sleep(1);
COUNT_MUTEX.acquire(); // 请求访问COUNT
READER_COUNT--; // 读者计数器递减
if (READER_COUNT == 0) // 最后一个读进程
FILE_MUTEX.release(); // 允许写进程写
COUNT_MUTEX.release(); // 恢复访问COUNT
}
}
}

public static void main(String[] args) {
new Writer().start();
new Reader().start();
}
}

8.4.4 哲学家就餐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 哲学家就餐问题
* 当一名哲学家左右两边的筷子都可用时,才允许拿起筷子。
*
* @author KHighness
* @since 2021-08-03
*/
@Slf4j(topic = "PhilosopherDinnerDemo")
public class PhilosopherDinnerDemo {
/** 筷子 */
private final static Semaphore[] CHOPSTICKS = {new Semaphore(1), new Semaphore(1),
new Semaphore(1), new Semaphore(1), new Semaphore(1)};
/** 互斥取筷子 */
private final static Semaphore MUTEX = new Semaphore(1);

/**
* 哲学家
*/
private static class Philosopher extends Thread {
private final int index;

public Philosopher(int index) {
super("哲学家-" + index);
this.index = index;
}

@SneakyThrows
@Override
public void run() {
while (true) {
MUTEX.acquire();
CHOPSTICKS[index].acquire();
CHOPSTICKS[(index + 1) % 5].acquire();
MUTEX.release();
log.debug("进餐");
CHOPSTICKS[index].release();
CHOPSTICKS[(index + 1) % 5].release();
log.debug("思考");
}
}
}

public static void main(String[] args) {
Philosopher[] philosophers = new Philosopher[5];
for (int i = 0; i < philosophers.length; i++) {
philosophers[i] = new Philosopher(i);
philosophers[i].start();
}
}
}

8.4.5 和尚和水

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* 和尚与水问题
* 寺庙有小和尚和老和尚若干,有一水缸,由小和尚提水入缸供老和尚引用。
* 水缸可容10桶水,井每次只能容一个桶取水。水桶总数为3个,每次入缸仅取1桶水。
*
* @author KHighness
* @since 2021-08-03
*/
@Slf4j(topic = "MonkAndWaterDemo")
public class MonkAndWaterDemo {
/** 用于互斥地访问水井*/
private static final Semaphore WELL = new Semaphore(1);
/** 用于互斥地访问水缸 */
private static final Semaphore VAT = new Semaphore(1);
/** 用于表示水缸中剩余空间所能容纳的水的桶数 */
private static final Semaphore EMPTY = new Semaphore(10);
/** 表示水缸中水的桶数 */
private static final Semaphore FULL = new Semaphore(0);
/** 表示有多少个水桶可用 */
private static final Semaphore BUCKET = new Semaphore(3);

/**
* 老和尚
*/
private static class OldMonk extends Thread {
public OldMonk() {
super("老和尚");
}

@SneakyThrows
@Override
public void run() {
while (true) {
FULL.acquire(); // 请求水缸的水
BUCKET.acquire(); // 请求水桶资源
VAT.acquire(); // 请求水缸资源
log.debug("从水缸中打一桶水");
TimeUnit.SECONDS.sleep(1);
VAT.release(); // 释放水缸资源
EMPTY.release(); // 消耗水资源
log.debug("喝水水");
TimeUnit.SECONDS.sleep(1);
BUCKET.release(); // 释放水桶资源
}
}
}

/**
* 小和尚
*/
private static class YoungMonk extends Thread {
public YoungMonk() {
super("小和尚");
}

@SneakyThrows
@Override
public void run() {
while (true) {
EMPTY.acquire(); // 请求水缸空间
BUCKET.acquire(); // 请求桶资源
WELL.acquire(); // 请求水井资源
log.debug("从水井中打一桶水");
TimeUnit.SECONDS.sleep(1);
WELL.release(); // 释放水井资源
VAT.acquire(); // 请求水缸资源
log.debug("倒水水");
TimeUnit.SECONDS.sleep(1);
VAT.release(); // 释放水缸资源
FULL.release(); // 增加水缸的水
BUCKET.release(); // 释放水桶资源
}
}
}

public static void main(String[] args) {
new OldMonk().start();
new YoungMonk().start();
}
}