8.1 CountDownLatch CountDownLatch
计数器
特点:内部计数器递减。
功能:主线程等待所有子线程执行完毕进行汇总。
8.1.1 案例 任务分解,第三个任务需要等待第一个任务和第二个任务执行计算后进行汇总。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4j(topic = "CountDownLatch") public class CountDownLatchDemo { private static int total = 0 ; private static final CountDownLatch countDownLatch = new CountDownLatch(2 ); private static final ExecutorService executorService = Executors.newFixedThreadPool(3 ); private static void sleep (int timeout, TimeUnit unit) { try { unit.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main (String[] args) { executorService.submit(() -> { log.debug("state = {}" , countDownLatch.getCount()); total += 1 ; sleep(1 , TimeUnit.SECONDS); log.debug("{} run over" , Thread.currentThread().getName()); countDownLatch.countDown(); log.debug("state = {}" , countDownLatch.getCount()); }); executorService.submit(() -> { log.debug("state = {}" , countDownLatch.getCount()); total += 2 ; sleep(1 , TimeUnit.SECONDS); log.debug("{} run over" , Thread.currentThread().getName()); countDownLatch.countDown(); log.debug("state = {}" , countDownLatch.getCount()); }); executorService.submit(() -> { log.debug("state = {}" , countDownLatch.getCount()); try { countDownLatch.await(); log.debug("result: total = {}" , total); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{} run over" , Thread.currentThread().getName()); }); executorService.shutdown(); sleep(3 , TimeUnit.SECONDS); executorService.shutdownNow(); } }
运行结果:
1 2 3 4 5 6 7 8 9 2021 -05 -08 12 :14 :58.374 [pool -1 -thread -1 ] DEBUG CountDownLatch - state = 2 2021 -05 -08 12 :14 :58.374 [pool -1 -thread -2 ] DEBUG CountDownLatch - state = 2 2021 -05 -08 12 :14 :58.374 [pool -1 -thread -3 ] DEBUG CountDownLatch - state = 2 2021 -05 -08 12 :14 :59.379 [pool -1 -thread -2 ] DEBUG CountDownLatch - pool-1 -thread -2 run over2021 -05 -08 12 :14 :59.379 [pool -1 -thread -1 ] DEBUG CountDownLatch - pool-1 -thread -1 run over2021 -05 -08 12 :14 :59.380 [pool -1 -thread -2 ] DEBUG CountDownLatch - state = 1 2021 -05 -08 12 :14 :59.380 [pool -1 -thread -1 ] DEBUG CountDownLatch - state = 0 2021 -05 -08 12 :14 :59.380 [pool -1 -thread -3 ] DEBUG CountDownLatch - result: total = 3 2021 -05 -08 12 :14 :59.380 [pool -1 -thread -3 ] DEBUG CountDownLatch - pool-1 -thread -3 run over
8.1.2 原理 类图
(1)构造方法
入参:count
,会将计数器值count
赋给了AQS的状态变量state
。
1 2 3 4 5 6 7 8 9 10 public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException("count < 0" ); this .sync = new Sync(count); } Sync(int count) { setState(count); }
(2)void await()
方法
当线程调用CountDownLatch
的await
方法后,当前线程就会阻塞,直到:
所有线程都调用了CountDownLatch
对象的countDown
方法后,即计数器值为0时
其他线程调用了当前线程的interrupt
方法中断了当前线程,当前线程抛出InterruptedException
异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; }
(3)boolean await(long timeout, TimeUnit unit)
方法
当线程调用了CountDownLatch
的该方法后,当前线程会被阻塞,直到:
所有线程都调用了CountDownLatch
对象的countDown
方法后,即计数器值为0时
设置的timeout
时间到了,因为超时返回false
其他线程调用了当前线程的interrupt
方法中断了当前线程,当前线程抛出InterruptedException
异常
1 2 3 4 5 public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); }
(4)void countDown()
方法
线程调用该方法后,计数器的值递减,递减后如果计数器的值为0则唤醒所有因调用await方法而被阻塞的线程,否则什么都不做。
如果state原始值为n,有(n+1)个线程调用了countDown
方法,那么第(n+1)个线程调用无效。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void countDown () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
(5)long getCount()
方法
获取当前计数器的值,即AQS的state
值,一般用于测试。
1 2 3 4 5 6 7 8 9 public long getCount () { return sync.getCount(); } int getCount () { return getState(); }
8.1.3 小结 CountDownLatch
相比于使用线程的join
方法来实现线程间同步,前者更具有灵活性和方便性,因为在ExecutorService
线程池中无法直接调用其他线程的join
方法。
CountDownLatch
使用AQS的状态变量state
来存放计数器的值。首先在初始化设置计数器值(AQS状态值),多个线程调用countDown
方法实际是原子性递减AQS的状态值。当线程调用await
方法后当前线程会被放入AQS的阻塞队列等待,待计数器为0再返回。其他线程调用countDown
方法让计数器值递减1,当计数器值变成0时,当前线程还要调用AQS的doReleaseSShared
方法来激活由于调用await
方法而被阻塞的线程。
8.2 CyclicBarrier CyclicBarrier
回环^1 屏障^2
特点:内部计数器递减。
功能:让一组线程全部达到一个状态后再全部同时执行。
[^ 1]: 回环是因为当所有线程执行完毕,并重置CylicBarrier
的状态以便重用。
[^ 2]:线程调用await
方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await
方法后,线程们就会冲破屏障,继续向下运行。
8.2.1 案例 一个任务需要三步完成,需要执行多个任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j(topic = "CyclicBarrier") public class CyclicBarrierDemo { private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(3 , () -> {log.debug("==========================" );}); private static final ExecutorService executorService = Executors.newFixedThreadPool(10 ); private static void sleep (int timeout, TimeUnit unit) { try { unit.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main (String[] args) { for (int i = 0 ; i < 3 ; i++) { executorService.submit(() -> { try { log.debug("{} first step" , Thread.currentThread().getName()); sleep(1 , TimeUnit.SECONDS); cyclicBarrier.await(); log.debug("{} second step" , Thread.currentThread().getName()); sleep(1 , TimeUnit.SECONDS); cyclicBarrier.await(); log.debug("{} third step" , Thread.currentThread().getName()); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
运行结果:
1 2 3 4 5 6 7 8 9 10 11 2021 -05 -08 16 :28 :58.549 [pool -1 -thread -2 ] DEBUG CyclicBarrier - pool-1 -thread -2 first step2021 -05 -08 16 :28 :58.549 [pool -1 -thread -1 ] DEBUG CyclicBarrier - pool-1 -thread -1 first step2021 -05 -08 16 :28 :58.549 [pool -1 -thread -3 ] DEBUG CyclicBarrier - pool-1 -thread -3 first step2021 -05 -08 16 :28 :59.558 [pool -1 -thread -2 ] DEBUG CyclicBarrier - ==========================2021 -05 -08 16 :28 :59.559 [pool -1 -thread -2 ] DEBUG CyclicBarrier - pool-1 -thread -2 second step2021 -05 -08 16 :28 :59.559 [pool -1 -thread -3 ] DEBUG CyclicBarrier - pool-1 -thread -3 second step2021 -05 -08 16 :28 :59.559 [pool -1 -thread -1 ] DEBUG CyclicBarrier - pool-1 -thread -1 second step2021 -05 -08 16 :29 :00.570 [pool -1 -thread -2 ] DEBUG CyclicBarrier - ==========================2021 -05 -08 16 :29 :00.570 [pool -1 -thread -2 ] DEBUG CyclicBarrier - pool-1 -thread -2 third step2021 -05 -08 16 :29 :00.570 [pool -1 -thread -1 ] DEBUG CyclicBarrier - pool-1 -thread -1 third step2021 -05 -08 16 :29 :00.570 [pool -1 -thread -3 ] DEBUG CyclicBarrier - pool-1 -thread -3 third step
8.2.2 原理 类图
CyclicBarrier
基于独占锁实现,本质底层还是基于AQS的。
属性:
lock
:独占锁。
trip
:条件变量。
barrierCommand
:到达屏障点执行的任务。
parties
:线程计数器,这里表示多少线程调用await
后,所有线程才会冲破屏障继续向下允许。
count
:执行记录器,一开始等于parties
,每当有线程调用await
就递减1,当count
为0时就表示所有线程都到了屏障点。
parties
始终用来记录总的线程个数,当count
计数器值变为0后,会将parties
的值赋给count
,进而进行服用。
内部类Generation
仅有一个属性broken
,用来记录当前屏障是否被打破。是在锁内使用变量,所以并没有声明为volatile
。
(1)构造方法
入参:parties
(必选)、barrierAction
(可选)
1 2 3 4 5 6 public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException(); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; }
(2)int await()
方法
当前线程调用了CyclicBarrier
的该方法时会被阻塞,直到:
parties
个线程都调用了await
方法,线程到达屏障点
其他线程调用了当前线程的interrupt
方法中断了当前线程,则当前线程抛出InterruptedExcetion
异常
与当前屏障点关联的Generation
对象的broken
标志被设置为true
时,会抛出BrokenBarrierException
异常
内部调用dowait
方法,第一个参数为false说明不设置超时时间
1 2 3 4 5 6 7 public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error(toe); } }
(3)boolean await(long timeout, TimeUnit unit)
方法
当先线程调用了CyclicBarrier
的该方法时会被阻塞,直到:
parties
个线程都调用了await
方法,也就是线程都到了屏障点,这时候返回true
设置的timeout
时间到了,因为超时返回false
其他线程调用当前线程的interrupt
方法中断了当前线程,则当前线程会抛出InterruptedException
与当前屏障点关联的Generation
对象的broken
标志被设置为true
时,会抛出BrokenBarrierException
异常
内部调用dowait
方法,第一个参数说明设置超时,第二个参数是超时时间
1 2 3 public int await (long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true , unit.toNanos(timeout)); }
(3)int dowait(boolean timed, long nanos)
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { ... int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { ... } ... } } finally { lock.unlock(); } } private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation(); }
8.2.3 小结 CycleBarrier
与CountDownLatch
的不同之处在于,前者是可以复用的,并且前者特别适合分段任务有序执行的场景。
内部通过ReentrantLock
独占锁实现计数器原子性更新,并使用条件变量队列来实现线程同步。
8.3 Semaphore Semaphore
信号量
特点:内部计数器递增。
功能:限制能同时访问共享资源的线程上限。
8.3.1 案例 主线程等待两个子任务执行完毕。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Slf4j(topic = "CountDownLatch") public class CountDownLatchDemo { private static final CountDownLatch countDownLatch = new CountDownLatch(2 ); private static final ExecutorService executorService = Executors.newFixedThreadPool(2 ); private static void sleep (int timeout, TimeUnit unit) { try { unit.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main (String[] args) throws InterruptedException { executorService.submit(() -> { sleep(1 , TimeUnit.SECONDS); countDownLatch.countDown(); }); executorService.submit(() -> { sleep(1 , TimeUnit.SECONDS); countDownLatch.countDown(); }); log.debug("wait all child thread over" ); countDownLatch.await(); log.debug("all child thread over" ); } }
8.3.2 原理 类图
Semaphore
还是使用AQS实现的,Sync
只是对AQS的一个修饰,并且Sync
有两个实现类,用来指定获取信号量时是否采用公平策略。
(1)构造方法
入参:permits
(必选)、fair
(可选)
Semphore
默认采用非公平策略,如果需要使用公平策略需要使用双参构造方法。初始化信号量个数permits
被赋给了AQS的状态变量state
。
1 2 3 4 5 6 7 8 9 public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } Sync(int permits) { setState(permits); }
(2)void acquire()
方法
当前线程调用该方法的目的是希望获取一个信号量资源。
如果当前信号量个数大于0,则当前信号量的计数会减1,然后直接返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
对于tryAcquireShared
,分两种情况讨论:
非公平锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
先获取当前信号量,然后减去需要获取的值,得到剩余信号量个数,如果剩余信号量小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。
另外,由于NonfairSync
是非公平获取的,也就是说先调用acquire
方法获取信号量的线程不一定比后来者先获取到信号量。
公平锁
1 2 3 4 5 6 7 8 9 10 11 12 protected int tryAcquireShared (int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
AQS公平性的保证就靠hasQueuedPredecessors
这个方法,如果当前线程节点的前驱结点是否也在等待获取该资源,是则放弃自己获取信号量的资格。
(3)void acquire(int permits)
方法
该方法与acquire()
方法不同,后者只需要获取一个信号量值,而前者则获取perimits
个。
1 2 3 4 public void acquire (int permits) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
(4)void acquireUninterruptibly()
方法
该方法与acquire()
类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了acquireUninterruptibly
获取资源时,其他线程调用了当前线程的interrupt
方法设置了当前线程的中断标志,此时当前线程并不会抛出InterruptedException
异常而返回。
(5)void accquireUninterruptibly(int permits)
方法
该方法与acquire(int permits)
方法的不同之处在于,该方法对中断不响应。
1 2 3 4 public void acquireUninterruptibly (int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireShared(permits); }
(6)void release()
方法
该方法的作用是把当前Semaphore
对象的信号量增加1
,如果当前有线程因为调用acquire
方法被阻塞而被放入AQS的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void release () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } }
(7)void release(int permits)
方法
该方法与release()
的不同之处在于,前者让信号量加permits
,后者加1
。
8.3.3 小结 Semaphore
完全可以达到CountDownLatch
的效果,但是Semaphore
的计数器是不可以自动重置的,不过通过变相的改变acquire
方法的参数还是可以实现CyclicBarrier
的功能的。
8.4 经典题目 8.4.1 交错执行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Slf4j(topic = "Alternately") public class AlternatelyDemo { private static final Semaphore[] s = { new Semaphore(1 ), new Semaphore(1 ), new Semaphore(1 )}; private static final int size = 3 ; private static final char [] arr = {'A' , 'B' , 'C' }; private static final ExecutorService executorService = Executors.newFixedThreadPool(3 ); public static void main (String[] args) throws InterruptedException { s[1 ].acquire(); s[2 ].acquire(); for (int i = 0 ; i < size; i++) { final int finalI = i; executorService.submit(() -> { while (true ) { try { s[finalI].acquire(); TimeUnit.MILLISECONDS.sleep(100 ); log.debug("{} => [{}]" , arr[finalI], System.nanoTime()); } catch (InterruptedException e) { e.printStackTrace(); } finally { s[(finalI + 1 ) % size].release(); } } }); } } }
8.4.2 生产者-消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 @Slf4j(topic = "SemaphoreProducerConsumerModel") public class ProducerConsumerDemo { private final static Queue<Character> QUEUE = new LinkedList<>(); private final static int QUEUE_MAX_SIZE = 5 ; private final static ThreadLocalRandom RANDOM = ThreadLocalRandom.current(); private final static Semaphore FULL = new Semaphore(0 ); private final static Semaphore EMPTY = new Semaphore(QUEUE_MAX_SIZE); private final static Semaphore MUTEX = new Semaphore(1 ); private static Character randomChar () { int choice = RANDOM.nextInt(3 ); if (choice == 0 ) return (char ) (RANDOM.nextInt(10 ) + 48 ); else if (choice == 1 ) return (char ) (RANDOM.nextInt(26 ) + 65 ); else return (char ) (RANDOM.nextInt(26 ) + 97 ); } private static class Producer extends Thread { public Producer () { super ("生产者" ); } @SneakyThrows @Override public void run () { while (true ) { Character c = randomChar(); TimeUnit.SECONDS.sleep(1 ); EMPTY.acquire(); MUTEX.acquire(); log.debug("生产 => [{}]" , c); QUEUE.add(c); MUTEX.release(); FULL.release(); } } } private static class Consumer extends Thread { public Consumer () { super ("消费者" ); } @SneakyThrows @Override public void run () { while (true ) { FULL.acquire(); MUTEX.acquire(); TimeUnit.SECONDS.sleep(1 ); Character c = QUEUE.poll(); log.debug("消费 => [{}]" , c); MUTEX.release(); EMPTY.release(); } } } public static void main (String[] args) { new Producer().start(); new Consumer().start(); } }
8.4.3 读写者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Slf4j(topic = "ReaderWriterDemo") public class ReaderWriterDemo { private final static Semaphore FILE_MUTEX = new Semaphore(1 ); private static int READER_COUNT = 0 ; private final static Semaphore COUNT_MUTEX = new Semaphore(1 ); private static class Writer extends Thread { public Writer () { super ("写者" ); } @SneakyThrows @Override public void run () { while (true ) { FILE_MUTEX.acquire(); log.debug("写入文件" ); TimeUnit.SECONDS.sleep(1 ); FILE_MUTEX.release(); } } } private static class Reader extends Thread { public Reader () { super ("读者" ); } @SneakyThrows @Override public void run () { while (true ) { COUNT_MUTEX.acquire(); if (READER_COUNT == 0 ) FILE_MUTEX.acquire(); READER_COUNT++; COUNT_MUTEX.release(); log.debug("阅读文件" ); TimeUnit.SECONDS.sleep(1 ); COUNT_MUTEX.acquire(); READER_COUNT--; if (READER_COUNT == 0 ) FILE_MUTEX.release(); COUNT_MUTEX.release(); } } } public static void main (String[] args) { new Writer().start(); new Reader().start(); } }
8.4.4 哲学家就餐 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Slf4j(topic = "PhilosopherDinnerDemo") public class PhilosopherDinnerDemo { private final static Semaphore[] CHOPSTICKS = {new Semaphore(1 ), new Semaphore(1 ), new Semaphore(1 ), new Semaphore(1 ), new Semaphore(1 )}; private final static Semaphore MUTEX = new Semaphore(1 ); private static class Philosopher extends Thread { private final int index; public Philosopher (int index) { super ("哲学家-" + index); this .index = index; } @SneakyThrows @Override public void run () { while (true ) { MUTEX.acquire(); CHOPSTICKS[index].acquire(); CHOPSTICKS[(index + 1 ) % 5 ].acquire(); MUTEX.release(); log.debug("进餐" ); CHOPSTICKS[index].release(); CHOPSTICKS[(index + 1 ) % 5 ].release(); log.debug("思考" ); } } } public static void main (String[] args) { Philosopher[] philosophers = new Philosopher[5 ]; for (int i = 0 ; i < philosophers.length; i++) { philosophers[i] = new Philosopher(i); philosophers[i].start(); } } }
8.4.5 和尚和水 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 @Slf4j(topic = "MonkAndWaterDemo") public class MonkAndWaterDemo { private static final Semaphore WELL = new Semaphore(1 ); private static final Semaphore VAT = new Semaphore(1 ); private static final Semaphore EMPTY = new Semaphore(10 ); private static final Semaphore FULL = new Semaphore(0 ); private static final Semaphore BUCKET = new Semaphore(3 ); private static class OldMonk extends Thread { public OldMonk () { super ("老和尚" ); } @SneakyThrows @Override public void run () { while (true ) { FULL.acquire(); BUCKET.acquire(); VAT.acquire(); log.debug("从水缸中打一桶水" ); TimeUnit.SECONDS.sleep(1 ); VAT.release(); EMPTY.release(); log.debug("喝水水" ); TimeUnit.SECONDS.sleep(1 ); BUCKET.release(); } } } private static class YoungMonk extends Thread { public YoungMonk () { super ("小和尚" ); } @SneakyThrows @Override public void run () { while (true ) { EMPTY.acquire(); BUCKET.acquire(); WELL.acquire(); log.debug("从水井中打一桶水" ); TimeUnit.SECONDS.sleep(1 ); WELL.release(); VAT.acquire(); log.debug("倒水水" ); TimeUnit.SECONDS.sleep(1 ); VAT.release(); FULL.release(); BUCKET.release(); } } } public static void main (String[] args) { new OldMonk().start(); new YoungMonk().start(); } }