线程池的意义:
降低资源消耗,复用已创建的线程,降低开销、控制最大并发数;
隔离线程环境,可以配置独立线程池,将较慢的线程与较快的隔离开,避免相互影响;
实现任务线程队列缓冲策略和拒绝策略;
实现某些与时间相关的功能,如定时执行和周期执行等。
6.1 自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;import java.util.Deque;import java.util.HashSet;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "ParaKThreadPoolDemo") public class ParaKThreadPoolDemo { public static void main (String[] args) { ParaKThreadPool threadPool = new ParaKThreadPool ( 1 , 1000 , TimeUnit.MILLISECONDS, 1 , (queue, task) -> { queue.offer(task, 1500 , TimeUnit.MILLISECONDS); } ); for (int i = 0 ; i < 3 ; i++) { int k = i; threadPool.execute(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("{}" , k); }); } } } @Slf4j(topic = "ParaKThreadPool") class ParaKThreadPool { private BlockingQueue<Runnable> taskQueue; private final HashSet<Worker> workers = new HashSet <>(); private final int coreSize; private final long timeout; private final TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; public ParaKThreadPool (int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .timeout = timeout; this .timeUnit = timeUnit; this .taskQueue = new BlockingQueue <>(queueCapacity); this .rejectPolicy = rejectPolicy; } public void execute (Runnable task) { synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker (task); log.debug("执行线程新增: {}" , worker); workers.add(worker); worker.start(); } else { taskQueue.tryPut(rejectPolicy, task); } } } class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null ) { try { log.debug("正在执行 => {}" , task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (workers) { log.debug("执行线程移除: {}" , this ); workers.remove(this ); } } } public static void main (String[] args) { } } @Slf4j(topic = "BlockingQueue") class BlockingQueue <T> { private final Deque<T> queue = new ArrayDeque <>(); private final ReentrantLock lock = new ReentrantLock (); private final int capacity; private final Condition fullWaitSet = lock.newCondition(); private final Condition emptyWaitSet = lock.newCondition(); public BlockingQueue (int capacity) { this .capacity = capacity; } public T poll (long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0 ) { return null ; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } public T take () { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } public void put (T element) { lock.lock(); try { while (queue.size() == capacity) { log.debug("等待加入任务队列: {}..." , element); fullWaitSet.await(); } log.debug("加入任务队列: {}" , element); queue.addLast(element); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public boolean offer (T element, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capacity) { if (nanos <= 0 ) { return false ; } log.debug("等待加入任务队列: {}..." , element); try { nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列: {}" , element); queue.addLast(element); emptyWaitSet.signal(); return true ; }finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capacity) { rejectPolicy.reject(this , task); } else { log.debug("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } } @FunctionalInterface interface RejectPolicy <T> { void reject (BlockingQueue<T> queue, T task) ; }
6.2 ThreadPoolExecutor 6.2.1 线程池的继承关系
6.2.2 Executor框架结构 三大部分:
任务类(Runnable / Callable):执行任务需要实现的Runnable
或者Callable
接口。
任何的执行(Executor):任务执行机制的核心接口Executor
,以及继承自Executor
接口的ExecutorService
接口。
异步计算结果(Future):Future
接口以及FutureTask
类都可以代表异步计算的结果。
使用示意:
6.2.3 线程池状态 ThreadPoolExecutor
使用int的高3位来表示线程池状态,低29位表示线程数量。
状态名
高3位
接收新任务
处理阻塞队列任务
说明
RUNNING
111
Y
Y
接收新任务,同时处理任务队列中的任务
SHUTDOWN
000
N
Y
不会接收新任务,但会处理阻塞队列剩余任务
STOP
001
N
N
会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING
010
-
-
任务全执行完毕,活动线程为0即将进入终结
TERMINATED
011
-
-
终结状态
从数字上比较(第一位是符号位),TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING。
这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次CAS原子操作进行赋值。
1 2 3 4 5 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))); private static int ctlOf (int rs, int wc) { return rs | wc; }
6.2.4 线程池属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; } private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock ();private final HashSet<Worker> workers = new HashSet <Worker>();
6.2.5 构造函数和参数 最全构造函数:
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数解释:
corePoolSize
:核心线程数量
maximumPoolSize
:最大线程数量
maximumPool - corePoolSize
:救急线程数量
在没有空闲的核心线程和任务队列满了的情况下才使用救急线程
keepAliveTime
:最大生存时间(对于救急线程)
unit
:时间单位
workQueue
:阻塞队列(存放任务)
有界阻塞队列ArrayBlockingQueue
无界阻塞队列LinkedBlockingQueue
最多只有一个同步元素的SynchronousQueue
优先队列PriorityBlockingQueue
threadFactory
:线程工厂
handler
:拒绝策略
工作流程:
6.2.6 拒绝策略和场景 如果线程数量达到maximumPoolSize
仍然有新人物时会执行拒绝策略,JDK提供了四种实现。
具体:
(1)AbortPolicy
终止策略:丢弃任务并抛出RejectedExecutionException
异常,这是默认策略。
详述:这是线程池默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。
功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程。
使用场景:这个就没有特殊的场景了,但是有一点要正确处理抛出的异常。ThreadPoolExecutor中默认的策略就是AbortPolicy,ExecutorService接口的系列ThreadPoolExecutor因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。
(2)DiscardPolicy
丢弃策略:丢弃任务,但是不抛出异常。如果线程队列已满,则后续提交的任务都会被丢弃,且静默丢弃。
详述:使用此策略,可能使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。
功能:直接静静地丢弃这个任务,不触发任何动作。
使用场景:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了。
(3)DiscardOldestPolicy
弃老策略:丢弃队列最前面的任务,然后重新提交被拒绝的任务。
详述:喜新厌旧的拒绝策略。是否采用还是得根据实际业务是否允许丢弃老任务来认真衡量。
功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行。
使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,想到的场景就是,发布消息和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较。
(4)CallerRunsPolicy
调用者运行策略:由调用线程处理该任务。
功能:当触发拒绝策略时,只要线程没有关闭,就由提交任务的当前线程处理。
使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。
其他框架实现
Dubbo:在抛出RejectedExecutionException
异常之前会记录日志和dump线程栈信息,方便定位问题
Netty:创建一个新的线程来执行任务
ActiveMQ:超时等待(60s),尝试放入队列
PinPoint:使用了一种拒绝策略链,会逐一尝试策略链中每种拒绝策略
口诀:拒终丢老调(线程池拒绝策略:终止策略、丢弃策略、弃老策略、调用者运行策略)
场景:
终止策略:无特殊场景
丢弃策略:无关紧要的任务
弃老策略:发布消息
调用者运行策略:不允许失败
过程:
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
当线程数达到corePoolSize
并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue
队列,直到有空闲的线程。
如果队列选择了有界队列,那么任务超过了队列大小时,会创建maximumPoolSize
-corePoolSize
数目的线程来救急。
如果线程达到maximumPoolSize
仍然有新任务时会执行拒绝策略。
当高峰过去后,超过corePoolSize
的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间kepAliveTime
和unit
来控制。
6.2.7 固定大小线程池 newFixedThreadPool
构造方法的参数:
核心线程数:nThreads
线程工厂:threadFactory
1 2 3 4 5 6 public static ExecutorService newFixedThreadPool (int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>(), threadFactory); }
特点:
核心线程数 = 最大线程数,没有救急线程,无需设置超时时间。
阻塞队列是无界的,可以放任意数量的任务。
场景:适用于任务量已知,相对耗时的任务。
6.2.8 无界限制线程池 newCachedThreadPool
构造方法的参数:
1 2 3 4 5 6 public static ExecutorService newCachedThreadPool (ThreadFactory threadFactory) { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>(), threadFactory); }
特点:
核心线程是0
,最大线程数是Integer.MAX_VALUE
,救急线程的空闲生存时间是60S
,意味着
全部都是救急线程(空闲60S后可以回收)
救急线程可以无限创建
任务队列采用SyncheonousQueue
实现特点是,它没有容量,没有线程来取是放不进去的
场景:
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程。
适合任务数比较密集,但每个任务执行时间较短的情况。
SyncheonousQueue
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Slf4j(topic = "SynchronousQueue") public class SynchronousQueueDemo { public static void main (String[] args) throws InterruptedException { SynchronousQueue<Integer> integers = new SynchronousQueue <>(); new Thread (() -> { try { log.debug("putting {}" , 1 ); integers.put(1 ); log.debug("{} finish" , 1 ); log.debug("putting {}" , 2 ); integers.put(2 ); log.debug("{} finish" , 2 ); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1" ).start(); TimeUnit.SECONDS.sleep(1 ); new Thread (() -> { try { log.debug("taking {}" , 1 ); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2" ).start(); TimeUnit.SECONDS.sleep(1 ); new Thread (() -> { try { log.debug("taking {}" , 2 ); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } }, "t3" ).start(); } }
输出:
1 2 3 4 5 6 2021-05-01 15:19:10.611 [t1] DEBUG SynchronousQueue - putting 1 2021-05-01 15:19:11.621 [t2] DEBUG SynchronousQueue - taking 1 2021-05-01 15:19:11.621 [t1] DEBUG SynchronousQueue - 1 finish 2021-05-01 15:19:11.621 [t1] DEBUG SynchronousQueue - putting 2 2021-05-01 15:19:12.632 [t3] DEBUG SynchronousQueue - taking 2 2021-05-01 15:19:12.633 [t1] DEBUG SynchronousQueue - 2 finish
6.2.9 始终如一线程池 newSingleThreadExecutor
构造方法的参数:
1 2 3 4 5 6 7 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
场景:希望多个任务排队执行。线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
区别:
对比:创建一个单线程串行执行任务
单线程如果任务执行失败而终止那么没有任何补救措施。
newSingleThreadExecutor
线程池还会新建一个线程,保证线程池的正常工作。
对比:Executors.newFixedThreadPool(1)
newFixedThreadPool
对外暴露的是ThreadPoolExecutor
对象,可以强转后调用setCoreSize
等方法进行修改。
FinalizableDelegatedExecutorService
应用的是装饰器模式,只对外暴露了ExecutorService
接口,因此不能ThreadPoolExecutor
中特有的方法。
6.2.10 执行/提交任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void execute (Runnable command) ;<T> Future<T> submit (Callable<T> task) ; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) sthrows InterruptedException, ExecutionException, TimeoutException;
6.2.11 关闭线程池 shutdown
将线程池的状态改为SHUTDOWN
不再接收新任务,但是会将阻塞队列中的任务执行完
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow
将线程池的状态改为STOP
不再接收新任务,也不会再执行阻塞队列中的任务
会将阻塞队列中未执行的任务作为方法返回值
并用interrupt
的方式中断正在执行的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
其他方法:
1 2 3 4 5 6 7 8 public boolean isShutdown () ;public boolean isTerminated () ;public boolean awaitTermination (long timeout, TimeUnit unit) ;
6.3 异步模式之工作线程 6.3.1 概述 定义 :让有限的工作线程(Worker Thread)来轮流移除处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
例如 :海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了。(对比另一种多线程设计模式:Thread-Per-Message)
注意 :不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。
例如 :如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然还有更细的分工。
饥饿 :固定大小线程池会有饥饿现象
两个工人是同一个线程池中的两个线程
他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
后厨做菜:做就完了
如果只有一个客人,工人A处理了点餐任务,接下来它要等着工人B去把菜做好,然后上菜
如果同时来了两个客人,这个时候工人A和工人B都去处理点餐了,这时没人做饭了,饥饿
解决 :
可以增加线程池的大小,但是不是根本解决之道。不同的任务类型,创建不同的线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j(topic = "Starvation") public class StarvationDemo { static final List<String> MENU = Arrays.asList("鱼香肉丝" , "宫保鸡丁" , "红烧肉" , "烤鸡翅" ); static final Random RANDOM = new Random (); static String cooking () { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main (String[] args) { ExecutorService waiterPool = Executors.newFixedThreadPool(1 ); ExecutorService cookPool = Executors.newFixedThreadPool(1 ); for (int i = 0 ; i < 10 ; i++) { waiterPool.execute(() -> { log.debug("处理点餐" ); Future<String> future = cookPool.submit(() -> { log.debug("做菜" ); return cooking(); }); try { log.debug("上菜: {}" , future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } } }
6.3.2 创建多少线程池合适
CPU密集型运算
例如:加密、解密和计算等。
一般:CPU核数 + 1
+1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费。
I/O密集型运算
例如:文件读写、网络通信。
一般:CPU核数 * 2
经验公式:线程数 = 核数 * 期望CPU利用率 * 总时间(CPU计算时间 + 等待时间) / CPU计算时间
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行IO操作时、远程RPC调用时,包括进行数据操作时,这时候CPU就闲下来了,你可以利用多线程提高它的利用率。
6.3.3 任务调度线程池 在『任务调度线程池』功能加入之前,可以使用java.util.Timer
来实现定时功能,Timer
的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务再执行,前一个任务的延迟或异常都将会影响到之后的任务。
例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf4j(topic = "Timer") public class TimerDemo { public static void main (String[] args) { Timer timer = new Timer (); TimerTask task1 = new TimerTask () { @Override public void run () { log.debug("task1" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } } }; TimerTask task2 = new TimerTask () { @Override public void run () { log.debug("task2" ); } }; log.debug("start..." ); timer.schedule(task1, 1000 ); timer.schedule(task2, 1000 ); } }
运行结果:
1 2 3 2021-05-01 22:34:08.514 [main] DEBUG Timer - start... 2021-05-01 22:34:09.528 [Timer-0] DEBUG Timer - task1 2021-05-01 22:34:11.541 [Timer-0] DEBUG Timer - task2
由于task1的原因导致task2被延迟执行。
使用ScheduledThreadPool
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Slf4j(topic = "NewScheduledThreadPool") public class NewScheduledThreadPoolDemo { public static void main (String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(2 ); pool.schedule(() -> { log.debug("task1" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } int i = 1 / 0 ; }, 1 , TimeUnit.SECONDS); pool.schedule(() -> { log.debug("task2" ); }, 1 , TimeUnit.SECONDS); } }
运行结果:
1 2 2021-05-01 22:46:23.053 [pool-1-thread-2] DEBUG NewScheduledThreadPool - task2 2021-05-01 22:46:23.053 [pool-1-thread-1] DEBUG NewScheduledThreadPool - task1
task1出现延迟或者抛出异常都不会影响到task2的执行。
scheduleAtFixedRate
:创建并执行周期性操作。该操作在给定的初始延迟initialDelay后首先启用,随后在给定的周期period内启用。
作用:给一个执行的开始和下一个执行的开始加上给定的延迟period。
异常:如果任务的任何执行遇到异常,则会抑制后续执行。否则,任务将仅通过取消或终止执行程序来终止。如果此任务的任何执行时间超过其周期,则后续执行可能会较晚开始,但不会并发执行。『任务的异常被catch则不影响后续执行』
1 2 3 4 5 6 7 8 9 pool.scheduleAtFixedRate(() -> { log.debug("running" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } }, 1 , 1 , TimeUnit.SECONDS);
scheduleWithFixedDelay
:创建并执行周期性操作。该操作在给定的初始延迟initialDelay后首先启用,随后在给定的周期period内启用。
作用:给一个执行的终止和下一个执行的开始加上给定的延迟period。
异常:如果任务的任何执行遇到异常,则会抑制后续执行。否则,任务将仅通过取消或终止执行程序来终止。『任务的异常被catch则不影响后续执行』
1 2 3 4 5 6 7 8 9 pool.scheduleWithFixedDelay(() -> { log.debug("running" ); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } }, 1 , 1 , TimeUnit.SECONDS);
6.3.4 正确处理异常 线程池中的线程执行任务时,如果任务抛出了异常,默认是中断该任务而不是抛出异常或者打印异常信息。
6.3.5 实现定时任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Slf4j(topic = "Schedule") public class ScheduleDemo { private static final ScheduledExecutorService pool = Executors.newScheduledThreadPool(1 ); public static void main (String[] args) { solution(DayOfWeek.SUNDAY, 15 , 16 , 00 , () -> { log.debug("KHighness" ); }); } public static void solution (DayOfWeek day, int hour, int minute, int second, Runnable task) { LocalDateTime now = LocalDateTime.now(); LocalDateTime tar = now.withHour(hour).withMinute(minute).withSecond(second).with(day); if (now.compareTo(tar) > 0 ) { tar.plusWeeks(1 ); } long initialDelay = Duration.between(now, tar).toMillis(); long period = 7 * 24 * 3600 * 1000 ; pool.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS); } }
6.4 Tomcat线程池 6.4.1 总体结构
LimitLatch用来限流,可以控制最大连接个数,类似JUC中的Semaphore
Acceptor只负责接收新的socket连接
Poller只负责监听socket channel是否有新的可读的I/O事件
一旦可读,封装一个任务对象socketProcessor,提交给Executor线程池处理
Executor线程池中的工作线程最终负责处理请求
6.4.2 不同之处 Tomcat线程池扩展了ThreadPoolExecutor,行为稍有不同
如果总线程数达到maximumPoolSize
这时不会立刻抛RejectedExecutionException
而是尝试将任务放入队列,如果还失败,才抛出RejectedExecutionException
tomcat-7.0.42
源码 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void execute (Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super .execute(command); } catch (RejectedExecutionException rx) { if (super .getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super .getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException ("Queue capacity is full." ); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException (x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
1 2 3 4 5 6 7 public boolean force (Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if ( parent.isShutdown() ) throw new RejectedExecutionException ( "Executor not running, can't force a command into the queue" ); return super .offer(o,timeout,unit); }
6.4.3 可配置项 Connector配置
配置项
默认值
说明
acceptorThreadCount
1
accter线程数量
pollerThreadCount
1
poller线程数量
minSpareThreads
10
核心线程数,即corePoolSize
maxThreads
200
最大线程数,即maximumPoolSize
executor
-
Executor名称,用来引用下面的Executor
Executor配置
配置项
默认值
说明
threadPriority
5
线程优先级
daemon
true
是否守护线程
minSpareThreads
25
核心线程数,即corePoolSize
maxThreads
200
最大线程数,即maximumPoolSize
maxIdleTime
60000
线程生存时间,单位是毫秒,默认值即1分钟
maxQueueSize
Integer.MAX_VALUE
队列长度
prestartminSpareThreads
false
核心线程是否在服务器启动时启动
6.5 Fork/Join 6.5.1 概述 Fork/Join是JDK1.7加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的CPU密集型运算。
所谓的任务拆分,是将一个大任务拆分成算法上相同的小人物,直至不能拆分成可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列,都可以用分治思想进行求解。
Fork/Join在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
Fork/Join默认会创建与CPU核心数大小相同的线程池。
6.5.2 使用 提交给Fork/Join线程池的任务需要继承RecursiveTask
(有返回值)或RecuisiveAction
(没有返回值),例如下面定义了一个对1~n之间的整数求和的任务。
如下面定义了一个对1~n之间的整数求和的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j(topic = "ForkJoin") public class ForkJoinDemo { public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool (4 ); System.out.println(pool.invoke(new Task (5 ))); } } @Slf4j(topic = "Task") class Task extends RecursiveTask <Integer> { private final int n; public Task (int n) { this .n = n; } @Override public String toString () { return "『" + n + "』" ; } @Override protected Integer compute () { if (n == 1 ) return 1 ; Task tas = new Task (n - 1 ); tas.fork(); log.debug("fork: {} {}" , n, tas); int res = n + tas.join(); log.debug("join: {} + {} = {}" , n, tas, res); return res; } }
运行结果:
1 2 3 4 5 6 7 8 9 2021-05-02 17:04:41.631 [ForkJoinPool-1-worker-2] DEBUG Task - fork: 4 『3』 2021-05-02 17:04:41.631 [ForkJoinPool-1-worker-1] DEBUG Task - fork: 5 『4』 2021-05-02 17:04:41.631 [ForkJoinPool-1-worker-0] DEBUG Task - fork: 2 『1』 2021-05-02 17:04:41.631 [ForkJoinPool-1-worker-3] DEBUG Task - fork: 3 『2』 2021-05-02 17:04:41.634 [ForkJoinPool-1-worker-0] DEBUG Task - join: 2 + 『1』 = 3 2021-05-02 17:04:41.634 [ForkJoinPool-1-worker-3] DEBUG Task - join: 3 + 『2』 = 6 2021-05-02 17:04:41.634 [ForkJoinPool-1-worker-2] DEBUG Task - join: 4 + 『3』 = 10 2021-05-02 17:04:41.634 [ForkJoinPool-1-worker-1] DEBUG Task - join: 5 + 『4』 = 15 15
流程:
改进:任务拆分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j(topic = "AddTask") class AddTask extends RecursiveTask <Integer> { int begin; int end; public AddTask (int begin, int end) { this .begin = begin; this .end = end; } @Override public String toString () { return "『" + begin + ", " + end + "』" ; } @Override protected Integer compute () { if (begin == end) { log.debug("join: {}" , begin); return begin; } if (end - begin == 1 ) { log.debug("join: {} + {} = {}" , begin, end, begin + end); return begin + end; } int mid = begin + (end - begin >> 1 ); AddTask task1 = new AddTask (begin, mid); task1.fork(); AddTask task2 = new AddTask (mid + 1 , end); task2.fork(); log.debug("fork: {} + {} = ?" , task1, task2); int res = task1.join() + task2.join(); log.debug("join: {} + {} = {}" , task1, task2, res); return res; } }
流程:
1 2 3 4 5 6 7 8 2021-05-02 17:39:00.721 [ForkJoinPool-1-worker-0] DEBUG AddTask - join: 1 + 2 = 3 2021-05-02 17:39:00.721 [ForkJoinPool-1-worker-2] DEBUG AddTask - fork: 『1, 2』 + 『3, 3』 = ? 2021-05-02 17:39:00.721 [ForkJoinPool-1-worker-3] DEBUG AddTask - join: 4 + 5 = 9 2021-05-02 17:39:00.721 [ForkJoinPool-1-worker-1] DEBUG AddTask - fork: 『1, 3』 + 『4, 5』 = ? 2021-05-02 17:39:00.724 [ForkJoinPool-1-worker-0] DEBUG AddTask - join: 3 2021-05-02 17:39:00.724 [ForkJoinPool-1-worker-2] DEBUG AddTask - join: 『1, 2』 + 『3, 3』 = 6 2021-05-02 17:39:00.724 [ForkJoinPool-1-worker-1] DEBUG AddTask - join: 『1, 3』 + 『4, 5』 = 15 15
流程: