并发笔记-ThreadPoolExecutor详解

image_副本.png

简介

ThreadPoolExecutor一种ExecutorService,使用可能的多个池线程之一执行每个提交的任务,通常使用Executors工厂方法进行配置。
线程池解决了两个不同的问题:它们通常在执行大量异步任务时提供更好的性能,这是因为减少了每个任务的调用开销;它们还提供了一种方法来限制和管理执行任务集合时消耗的资源,包括线程。每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。

为了在广泛的上下文中有用,这个类提供了许多可调整的参数和可扩展性挂钩。然而,我们敦促程序员使用更方便的Executors工厂方法Executors。newCachedThreadPool(无边界线程池,具有自动线程回收),执行器。newFixedThreadPool(固定大小的线程池)和执行器。newSingleThreadExecutor(单后台线程),为最常见的使用场景预配置设置。否则,在手动配置和调优此类时,请使用以下指南:

image.png

七大核心参数详解

  • int corePoolSize,//线程池的核心线程数量
  • int maximumPoolSize,//线程池的最大线程数
  • long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
  • TimeUnit unit,//时间单位
  • BlockingQueue workQueue,//任务队列,用来储存等待执行任务的队列
  • ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
  • RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务

BlockingQueue

image.png

RejectedExecutionHandler

AbortPolicy(抛出异常)

AbortPolicy,缺省策略,处理程序在拒绝时抛出runtime RejectedExecutionException。

CallerRunsPolicy(自身执行)

CallerRunPolicy,调用自身执行的线程运行任务。这提供了一种简单的反馈控制机制,可以降低提交新任务的速度。

DiscardPolicy(丢弃当前)

DiscardPolicy,将简单地删除无法执行的任务。

DiscardOldestPolicy(丢弃队头)

DiscardOldestPolicy,如果未关闭执行器,将丢弃工作队列头部的任务,然后重试执行(可能再次失败,导致重复执行)

如何合理配置线程池参数

一般采用如下方式进行配置,但是这种只是理论,在实际的生产环境表现并不是那么令人满意,更加推荐后面两种方式

普通配置

1.CPU密集型

单核:当我们是单核CPU的时候,不建议使用线程池
多核:当我们是多核CPU的时候,理论上应该是 线程数=CPU核数,但实际上,一般会设置为 核数+1,这是因为万一某个线程因为某个未知的错误停止的时候,可以确保有一个“额外”线程可以执行任务,让CPU持续工作
简单来说:线程数=CPU核数+1

2.IO密集型

IO密集型与CPU密集型相对,一个完整的请求,CPU执行完后需要进行很多IO操作,也就是IO操作需要大量时间,却不占用CPU资源,所以理论上我们一般保证有多少个IO大型任务(n),线程数就是这个任务数的两倍(2n),但实际上,一般是2n+1个

线程数=2*CPU核数+1

结合任务配置

我们来分析一下哪些因素影响我们对线程池参数的配置

  • 任务量
  • 任务执行耗时
  • 允许最大的响应时间
  • 机器硬件大小(CPU核心数)

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
tasks :每秒的任务数,假设为500~1000

taskcost:每个任务花费时间,假设为0.1s

responsetime:系统允许容忍的最大响应时间,假设为1s

做几个计算

corePoolSize = 每秒需要多少个线程处理?

threadcount = tasks/(1/taskcost) = tasks*taskcout = (500 ~ 1000)*0.1 = 50~100 个线程。

corePoolSize设置应该大于50

根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可。

queueCapacity = (coreSizePool/taskcost)*responsetime

计算可得 queueCapacity = 80/0.1*1 = 800。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行。

切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。

maxPoolSize 最大线程数在生产环境上我们往往设置成corePoolSize一样,这样可以减少在处理过程中创建线程的开销。

rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理。

keepAliveTime和allowCoreThreadTimeout采用默认通常能满足。

动态配置

提供了setCorePoolSize、setMaximumPoolSize的方法进行动态的配置核心线程数和最大线程池大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize) interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty()) break;
}
}
}
1
2
3
4
5
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize) interruptIdleWorkers();
}

一般可以通过搭建一个线程池控制平台,对运行中的线程池进行动态的配置其参数的大小,结合业务量的实际情况达到动态的伸缩效果!

资料

  1. https://people.apache.org/~tellison/classlib_doc/html/classjava_1_1util_1_1concurrent_1_1ThreadPoolExecutor.html
  2. https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
  3. https://blog.csdn.net/riemann_/article/details/104704197

并发笔记-ThreadPoolExecutor详解
https://mikeygithub.github.io/2022/05/30/yuque/并发笔记-ThreadPoolExecutor详解/
作者
Mikey
发布于
2022年5月30日
许可协议