线程池 (Thread Pool)¶
深入理解Executor框架和ThreadPoolExecutor的实现原理,掌握线程池的使用和最佳实践
目录¶
- 1. 为什么需要线程池
- 2. Executor框架
- 3. ThreadPoolExecutor详解
- 4. 线程池类型
- 5. 线程池执行流程
- 6. 拒绝策略
- 7. 线程池监控
- 8. 最佳实践
- 9. 面试高频问题
1. 为什么需要线程池 (Why Thread Pool?)¶
1.1 直接创建线程的问题¶
/**
* 直接创建线程的问题
* Problems with Direct Thread Creation
*/
public class DirectThreadCreation {
public static void main(String[] args) {
// 问题1:频繁创建和销毁线程,开销大
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// 执行任务
doTask();
}).start();
}
// 问题2:无法控制线程数量,可能导致资源耗尽
// 问题3:缺乏统一管理,难以监控和调优
}
}
直接创建线程的问题: 1. 资源消耗大 - 创建和销毁线程开销大 2. 资源不可控 - 无法限制线程数量,可能导致系统资源耗尽 3. 缺乏管理 - 难以统一管理、监控和调优
1.2 线程池的优势¶
- 资源复用 - 线程可以重复使用,减少创建和销毁的开销
- 资源可控 - 可以限制线程数量,防止资源耗尽
- 统一管理 - 便于监控、调优和故障排查
- 提高响应速度 - 任务到达时,线程已创建好,可以立即执行
1.3 在算力平台中的应用¶
在算力平台中,线程池的应用场景:
/**
* 算力平台中的线程池应用
* Thread Pool Usage in Computing Platform
*/
public class PlatformThreadPool {
// 任务调度线程池:处理Nomad任务状态同步
private ExecutorService taskSchedulerPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// 节点监控线程池:并发采集多个节点信息
private ExecutorService nodeMonitorPool = Executors.newCachedThreadPool();
// 结算处理线程池:批量处理钱包操作
private ExecutorService billingPool = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("billing-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public void scheduleTask() {
taskSchedulerPool.submit(() -> {
// 从Nomad获取任务状态
// 更新数据库和ES
});
}
}
2. Executor框架 (Executor Framework)¶
2.1 框架结构¶
2.2 核心接口¶
Executor接口¶
/**
* Executor接口:执行任务的抽象
* Executor Interface: Task Execution Abstraction
*/
public interface Executor {
void execute(Runnable command);
}
ExecutorService接口¶
/**
* ExecutorService接口:扩展Executor,提供更多功能
* ExecutorService Interface: Extended Executor
*/
public interface ExecutorService extends Executor {
// 提交任务(有返回值)
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 关闭线程池
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
// 等待终止
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
// 批量提交
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
}
2.3 使用示例¶
/**
* ExecutorService使用示例
* ExecutorService Usage Example
*/
public class ExecutorServiceDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交Runnable任务
Future<?> future1 = executor.submit(() -> {
System.out.println("执行任务1");
});
// 提交Callable任务(有返回值)
Future<String> future2 = executor.submit(() -> {
Thread.sleep(1000);
return "任务2完成";
});
// 获取返回值
String result = future2.get(); // 阻塞直到任务完成
System.out.println("结果: " + result);
// 关闭线程池
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
3. ThreadPoolExecutor详解 (ThreadPoolExecutor Details)¶
3.1 核心参数¶
/**
* ThreadPoolExecutor构造方法
* ThreadPoolExecutor Constructor
*/
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)
3.2 参数详解¶
corePoolSize(核心线程数)¶
- 定义:线程池中保持存活的最小线程数
- 特点:即使空闲也不会被回收(除非allowCoreThreadTimeOut=true)
- 设置建议:CPU密集型任务:CPU核心数;IO密集型任务:CPU核心数 * 2
maximumPoolSize(最大线程数)¶
- 定义:线程池允许的最大线程数
- 特点:当队列满且核心线程都在忙时,会创建新线程(不超过最大值)
- 设置建议:根据业务场景和系统资源设置
keepAliveTime(空闲线程存活时间)¶
- 定义:非核心线程空闲时的存活时间
- 特点:超过此时间,非核心线程会被回收
- 设置建议:根据任务特点设置,避免频繁创建销毁线程
workQueue(任务队列)¶
- 定义:用于存放待执行任务的阻塞队列
- 常用队列:
LinkedBlockingQueue- 无界队列(可能导致OOM)ArrayBlockingQueue- 有界队列(需要指定大小)SynchronousQueue- 同步队列(不存储元素)PriorityBlockingQueue- 优先级队列
threadFactory(线程工厂)¶
- 定义:用于创建线程的工厂
- 作用:可以自定义线程名称、优先级、是否为守护线程等
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("task-%d")
.setDaemon(false)
.setPriority(Thread.NORM_PRIORITY)
.build();
handler(拒绝策略)¶
- 定义:当线程池和队列都满时,如何处理新任务
- 策略类型:见6. 拒绝策略
3.3 线程池状态¶
/**
* 线程池状态(使用AtomicInteger的ctl字段存储)
* Thread Pool States
*/
private static final int RUNNING = -1 << COUNT_BITS; // 运行中
private static final int SHUTDOWN = 0 << COUNT_BITS; // 关闭中
private static final int STOP = 1 << COUNT_BITS; // 停止
private static final int TIDYING = 2 << COUNT_BITS; // 整理中
private static final int TERMINATED = 3 << COUNT_BITS; // 已终止
状态转换:
4. 线程池类型 (Thread Pool Types)¶
4.1 FixedThreadPool(固定线程池)¶
/**
* FixedThreadPool:固定大小的线程池
* FixedThreadPool: Fixed Size Thread Pool
*/
ExecutorService executor = Executors.newFixedThreadPool(5);
特点:
- 核心线程数 = 最大线程数 = 指定大小
- 使用无界队列LinkedBlockingQueue
- 适用于CPU密集型任务
源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>() // 无界队列
);
}
4.2 CachedThreadPool(缓存线程池)¶
/**
* CachedThreadPool:可缓存的线程池
* CachedThreadPool: Cached Thread Pool
*/
ExecutorService executor = Executors.newCachedThreadPool();
特点:
- 核心线程数为0,最大线程数为Integer.MAX_VALUE
- 使用SynchronousQueue(不存储元素)
- 线程空闲60秒后回收
- 适用于短时任务、IO密集型任务
源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>() // 同步队列
);
}
4.3 SingleThreadExecutor(单线程池)¶
/**
* SingleThreadExecutor:单线程执行器
* SingleThreadExecutor: Single Thread Executor
*/
ExecutorService executor = Executors.newSingleThreadExecutor();
特点:
- 核心线程数 = 最大线程数 = 1
- 使用无界队列LinkedBlockingQueue
- 保证任务顺序执行
- 适用于需要顺序执行任务的场景
4.4 ScheduledThreadPool(定时线程池)¶
/**
* ScheduledThreadPool:定时任务线程池
* ScheduledThreadPool: Scheduled Task Thread Pool
*/
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
// 延迟执行
executor.schedule(() -> {
System.out.println("延迟执行");
}, 5, TimeUnit.SECONDS);
// 定时执行
executor.scheduleAtFixedRate(() -> {
System.out.println("定时执行");
}, 0, 1, TimeUnit.SECONDS);
特点:
- 支持延迟执行和定时执行
- 使用DelayedWorkQueue(优先级队列)
- 适用于定时任务、周期性任务
4.5 线程池类型对比¶
| 线程池类型 | 核心线程数 | 最大线程数 | 队列类型 | 适用场景 |
|---|---|---|---|---|
| FixedThreadPool | 固定 | 固定 | LinkedBlockingQueue | CPU密集型 |
| CachedThreadPool | 0 | Integer.MAX_VALUE | SynchronousQueue | 短时任务、IO密集型 |
| SingleThreadExecutor | 1 | 1 | LinkedBlockingQueue | 顺序执行 |
| ScheduledThreadPool | 指定 | Integer.MAX_VALUE | DelayedWorkQueue | 定时任务 |
5. 线程池执行流程 (Execution Flow)¶
5.1 执行流程图¶
提交任务 (execute/submit)
↓
核心线程是否已满?
├─ 否 → 创建核心线程执行
└─ 是 → 任务队列是否已满?
├─ 否 → 加入队列等待
└─ 是 → 最大线程数是否已满?
├─ 否 → 创建非核心线程执行
└─ 是 → 执行拒绝策略
5.2 源码分析¶
/**
* ThreadPoolExecutor.execute()方法核心逻辑
* Core Logic of ThreadPoolExecutor.execute()
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 如果当前线程数 < 核心线程数,创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果线程池运行中且任务可以加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查线程池状态
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果队列满,尝试创建非核心线程
else if (!addWorker(command, false))
// 4. 如果创建失败,执行拒绝策略
reject(command);
}
5.3 示例说明¶
/**
* 线程池执行流程示例
* Execution Flow Example
*/
public class ExecutionFlowDemo {
public static void main(String[] args) {
// 创建线程池:核心2,最大4,队列容量2
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.AbortPolicy()
);
// 提交6个任务
for (int i = 1; i <= 6; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("任务" + taskId + "执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
// 执行流程:
// 任务1、2 → 核心线程执行
// 任务3、4 → 加入队列
// 任务5、6 → 创建非核心线程执行
// 如果还有任务 → 拒绝策略
}
}
6. 拒绝策略 (Rejection Policy)¶
6.1 内置拒绝策略¶
AbortPolicy(默认策略)¶
行为: 直接抛出RejectedExecutionException异常
CallerRunsPolicy(调用者运行策略)¶
/**
* CallerRunsPolicy:调用者线程执行
* CallerRunsPolicy: Caller Runs
*/
new ThreadPoolExecutor.CallerRunsPolicy()
行为: 在调用者线程中执行任务(会阻塞调用者)
DiscardPolicy(丢弃策略)¶
行为: 静默丢弃任务,不抛异常
DiscardOldestPolicy(丢弃最老任务)¶
/**
* DiscardOldestPolicy:丢弃队列最老的任务
* DiscardOldestPolicy: Discard Oldest Task
*/
new ThreadPoolExecutor.DiscardOldestPolicy()
行为: 丢弃队列头部的任务,然后尝试执行新任务
6.2 自定义拒绝策略¶
/**
* 自定义拒绝策略:记录日志并保存到数据库
* Custom Rejection Policy
*/
public class CustomRejectionPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志
logger.warn("任务被拒绝: " + r.toString());
// 保存到数据库(异步)
saveToDatabase(r);
// 或者发送告警
sendAlert("线程池任务被拒绝");
}
}
6.3 拒绝策略对比¶
| 策略 | 行为 | 适用场景 |
|---|---|---|
| AbortPolicy | 抛异常 | 需要知道任务被拒绝 |
| CallerRunsPolicy | 调用者执行 | 可以接受降级处理 |
| DiscardPolicy | 静默丢弃 | 可以容忍任务丢失 |
| DiscardOldestPolicy | 丢弃最老任务 | 优先处理新任务 |
7. 线程池监控 (Thread Pool Monitoring)¶
7.1 监控指标¶
/**
* 线程池监控示例
* Thread Pool Monitoring Example
*/
public class ThreadPoolMonitor {
public void monitor(ThreadPoolExecutor executor) {
// 核心线程数
int corePoolSize = executor.getCorePoolSize();
// 最大线程数
int maximumPoolSize = executor.getMaximumPoolSize();
// 当前线程数
int poolSize = executor.getPoolSize();
// 活跃线程数
int activeCount = executor.getActiveCount();
// 已完成任务数
long completedTaskCount = executor.getCompletedTaskCount();
// 总任务数
long taskCount = executor.getTaskCount();
// 队列大小
int queueSize = executor.getQueue().size();
// 打印监控信息
System.out.println(String.format(
"线程池监控 - 核心:%d, 最大:%d, 当前:%d, 活跃:%d, 已完成:%d, 总任务:%d, 队列:%d",
corePoolSize, maximumPoolSize, poolSize, activeCount,
completedTaskCount, taskCount, queueSize
));
}
}
7.2 在算力平台中的应用¶
/**
* 算力平台线程池监控
* Computing Platform Thread Pool Monitoring
*/
@Component
public class PlatformThreadPoolMonitor {
@Scheduled(fixedRate = 60000) // 每分钟监控一次
public void monitor() {
// 监控任务调度线程池
monitorThreadPool(taskSchedulerPool, "任务调度线程池");
// 监控结算处理线程池
monitorThreadPool(billingPool, "结算处理线程池");
}
private void monitorThreadPool(ThreadPoolExecutor executor, String name) {
int queueSize = executor.getQueue().size();
int activeCount = executor.getActiveCount();
// 告警:队列积压
if (queueSize > 1000) {
logger.warn("{}队列积压: {}", name, queueSize);
}
// 告警:线程池满载
if (activeCount == executor.getMaximumPoolSize()) {
logger.warn("{}线程池满载", name);
}
}
}
8. 最佳实践 (Best Practices)¶
8.1 线程池参数设置¶
/**
* 线程池参数设置建议
* Thread Pool Parameter Configuration
*/
public class ThreadPoolConfig {
// CPU密集型任务
public static ThreadPoolExecutor createCpuIntensivePool() {
int corePoolSize = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
corePoolSize,
corePoolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("cpu-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
// IO密集型任务
public static ThreadPoolExecutor createIoIntensivePool() {
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
return new ThreadPoolExecutor(
corePoolSize,
corePoolSize * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("io-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
8.2 正确关闭线程池¶
/**
* 正确关闭线程池
* Proper Thread Pool Shutdown
*/
public void shutdownThreadPool(ExecutorService executor) {
executor.shutdown(); // 停止接收新任务
try {
// 等待已提交的任务完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 超时后强制关闭
executor.shutdownNow();
// 再次等待
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
logger.error("线程池未能正常关闭");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
8.3 使用有界队列¶
// ❌ 不推荐:无界队列可能导致OOM
new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>() // 无界队列
);
// ✅ 推荐:使用有界队列
new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), // 有界队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
8.4 自定义线程工厂¶
/**
* 自定义线程工厂:便于问题排查
* Custom Thread Factory
*/
ThreadFactory factory = new ThreadFactoryBuilder()
.setNameFormat("task-pool-%d") // 线程名称格式
.setDaemon(false) // 非守护线程
.setPriority(Thread.NORM_PRIORITY) // 正常优先级
.setUncaughtExceptionHandler((t, e) -> {
logger.error("线程{}未捕获异常", t.getName(), e);
})
.build();
9. 面试高频问题 (Interview Questions)¶
Q1: 线程池的核心参数有哪些?¶
答案: corePoolSize、maximumPoolSize、keepAliveTime、workQueue、threadFactory、handler
Q2: 线程池的执行流程?¶
答案: 见5. 线程池执行流程
Q3: 如何合理设置线程池参数?¶
答案: - CPU密集型:核心线程数 = CPU核心数 - IO密集型:核心线程数 = CPU核心数 * 2 - 使用有界队列,设置合理的拒绝策略
Q4: 线程池的拒绝策略有哪些?¶
答案: AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy
Q5: 如何监控线程池?¶
答案: 监控核心线程数、最大线程数、当前线程数、活跃线程数、队列大小、已完成任务数等指标
📖 扩展阅读¶
返回: 07-Java并发编程
上一章: 07-04 - 并发工具类
下一章: 07-06 - 并发集合 →