跳转至

线程池 (Thread Pool)

深入理解Executor框架和ThreadPoolExecutor的实现原理,掌握线程池的使用和最佳实践

目录


1. 为什么需要线程池 (Why Thread Pool?)

1.1 直接创建线程的问题

/**
 * 直接创建线程的问题
 * Problems with Direct Thread Creation
 */
public class DirectThreadCreation {
    public static void main(String[] args) {
        // 问题1:频繁创建和销毁线程,开销大
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                // 执行任务
                doTask();
            }).start();
        }

        // 问题2:无法控制线程数量,可能导致资源耗尽
        // 问题3:缺乏统一管理,难以监控和调优
    }
}

直接创建线程的问题: 1. 资源消耗大 - 创建和销毁线程开销大 2. 资源不可控 - 无法限制线程数量,可能导致系统资源耗尽 3. 缺乏管理 - 难以统一管理、监控和调优

1.2 线程池的优势

  1. 资源复用 - 线程可以重复使用,减少创建和销毁的开销
  2. 资源可控 - 可以限制线程数量,防止资源耗尽
  3. 统一管理 - 便于监控、调优和故障排查
  4. 提高响应速度 - 任务到达时,线程已创建好,可以立即执行

1.3 在算力平台中的应用

在算力平台中,线程池的应用场景:

/**
 * 算力平台中的线程池应用
 * Thread Pool Usage in Computing Platform
 */
public class PlatformThreadPool {

    // 任务调度线程池:处理Nomad任务状态同步
    private ExecutorService taskSchedulerPool = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors()
    );

    // 节点监控线程池:并发采集多个节点信息
    private ExecutorService nodeMonitorPool = Executors.newCachedThreadPool();

    // 结算处理线程池:批量处理钱包操作
    private ExecutorService billingPool = new ThreadPoolExecutor(
        5, 10, 60L, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(1000),
        new ThreadFactoryBuilder().setNameFormat("billing-%d").build(),
        new ThreadPoolExecutor.CallerRunsPolicy()
    );

    public void scheduleTask() {
        taskSchedulerPool.submit(() -> {
            // 从Nomad获取任务状态
            // 更新数据库和ES
        });
    }
}

2. Executor框架 (Executor Framework)

2.1 框架结构

Executor (接口)
ExecutorService (接口)
AbstractExecutorService (抽象类)
ThreadPoolExecutor (实现类)

2.2 核心接口

Executor接口

/**
 * Executor接口:执行任务的抽象
 * Executor Interface: Task Execution Abstraction
 */
public interface Executor {
    void execute(Runnable command);
}

ExecutorService接口

/**
 * ExecutorService接口:扩展Executor,提供更多功能
 * ExecutorService Interface: Extended Executor
 */
public interface ExecutorService extends Executor {
    // 提交任务(有返回值)
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    // 关闭线程池
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();

    // 等待终止
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    // 批量提交
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
    <T> T invokeAny(Collection<? extends Callable<T>> tasks);
}

2.3 使用示例

/**
 * ExecutorService使用示例
 * ExecutorService Usage Example
 */
public class ExecutorServiceDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        // 提交Runnable任务
        Future<?> future1 = executor.submit(() -> {
            System.out.println("执行任务1");
        });

        // 提交Callable任务(有返回值)
        Future<String> future2 = executor.submit(() -> {
            Thread.sleep(1000);
            return "任务2完成";
        });

        // 获取返回值
        String result = future2.get(); // 阻塞直到任务完成
        System.out.println("结果: " + result);

        // 关闭线程池
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

3. ThreadPoolExecutor详解 (ThreadPoolExecutor Details)

3.1 核心参数

/**
 * ThreadPoolExecutor构造方法
 * ThreadPoolExecutor Constructor
 */
public ThreadPoolExecutor(
    int corePoolSize,              // 核心线程数
    int maximumPoolSize,           // 最大线程数
    long keepAliveTime,            // 空闲线程存活时间
    TimeUnit unit,                 // 时间单位
    BlockingQueue<Runnable> workQueue,  // 任务队列
    ThreadFactory threadFactory,   // 线程工厂
    RejectedExecutionHandler handler     // 拒绝策略
)

3.2 参数详解

corePoolSize(核心线程数)

  • 定义:线程池中保持存活的最小线程数
  • 特点:即使空闲也不会被回收(除非allowCoreThreadTimeOut=true)
  • 设置建议:CPU密集型任务:CPU核心数;IO密集型任务:CPU核心数 * 2

maximumPoolSize(最大线程数)

  • 定义:线程池允许的最大线程数
  • 特点:当队列满且核心线程都在忙时,会创建新线程(不超过最大值)
  • 设置建议:根据业务场景和系统资源设置

keepAliveTime(空闲线程存活时间)

  • 定义:非核心线程空闲时的存活时间
  • 特点:超过此时间,非核心线程会被回收
  • 设置建议:根据任务特点设置,避免频繁创建销毁线程

workQueue(任务队列)

  • 定义:用于存放待执行任务的阻塞队列
  • 常用队列
  • LinkedBlockingQueue - 无界队列(可能导致OOM)
  • ArrayBlockingQueue - 有界队列(需要指定大小)
  • SynchronousQueue - 同步队列(不存储元素)
  • PriorityBlockingQueue - 优先级队列

threadFactory(线程工厂)

  • 定义:用于创建线程的工厂
  • 作用:可以自定义线程名称、优先级、是否为守护线程等
ThreadFactory factory = new ThreadFactoryBuilder()
    .setNameFormat("task-%d")
    .setDaemon(false)
    .setPriority(Thread.NORM_PRIORITY)
    .build();

handler(拒绝策略)

  • 定义:当线程池和队列都满时,如何处理新任务
  • 策略类型:见6. 拒绝策略

3.3 线程池状态

/**
 * 线程池状态(使用AtomicInteger的ctl字段存储)
 * Thread Pool States
 */
private static final int RUNNING    = -1 << COUNT_BITS; // 运行中
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 关闭中
private static final int STOP       =  1 << COUNT_BITS; // 停止
private static final int TIDYING    =  2 << COUNT_BITS; // 整理中
private static final int TERMINATED =  3 << COUNT_BITS; // 已终止

状态转换:

RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED


4. 线程池类型 (Thread Pool Types)

4.1 FixedThreadPool(固定线程池)

/**
 * FixedThreadPool:固定大小的线程池
 * FixedThreadPool: Fixed Size Thread Pool
 */
ExecutorService executor = Executors.newFixedThreadPool(5);

特点: - 核心线程数 = 最大线程数 = 指定大小 - 使用无界队列LinkedBlockingQueue - 适用于CPU密集型任务

源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
        nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()  // 无界队列
    );
}

4.2 CachedThreadPool(缓存线程池)

/**
 * CachedThreadPool:可缓存的线程池
 * CachedThreadPool: Cached Thread Pool
 */
ExecutorService executor = Executors.newCachedThreadPool();

特点: - 核心线程数为0,最大线程数为Integer.MAX_VALUE - 使用SynchronousQueue(不存储元素) - 线程空闲60秒后回收 - 适用于短时任务、IO密集型任务

源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
        0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>()  // 同步队列
    );
}

4.3 SingleThreadExecutor(单线程池)

/**
 * SingleThreadExecutor:单线程执行器
 * SingleThreadExecutor: Single Thread Executor
 */
ExecutorService executor = Executors.newSingleThreadExecutor();

特点: - 核心线程数 = 最大线程数 = 1 - 使用无界队列LinkedBlockingQueue - 保证任务顺序执行 - 适用于需要顺序执行任务的场景

4.4 ScheduledThreadPool(定时线程池)

/**
 * ScheduledThreadPool:定时任务线程池
 * ScheduledThreadPool: Scheduled Task Thread Pool
 */
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);

// 延迟执行
executor.schedule(() -> {
    System.out.println("延迟执行");
}, 5, TimeUnit.SECONDS);

// 定时执行
executor.scheduleAtFixedRate(() -> {
    System.out.println("定时执行");
}, 0, 1, TimeUnit.SECONDS);

特点: - 支持延迟执行和定时执行 - 使用DelayedWorkQueue(优先级队列) - 适用于定时任务、周期性任务

4.5 线程池类型对比

线程池类型 核心线程数 最大线程数 队列类型 适用场景
FixedThreadPool 固定 固定 LinkedBlockingQueue CPU密集型
CachedThreadPool 0 Integer.MAX_VALUE SynchronousQueue 短时任务、IO密集型
SingleThreadExecutor 1 1 LinkedBlockingQueue 顺序执行
ScheduledThreadPool 指定 Integer.MAX_VALUE DelayedWorkQueue 定时任务

5. 线程池执行流程 (Execution Flow)

5.1 执行流程图

提交任务 (execute/submit)
核心线程是否已满?
    ├─ 否 → 创建核心线程执行
    └─ 是 → 任务队列是否已满?
            ├─ 否 → 加入队列等待
            └─ 是 → 最大线程数是否已满?
                    ├─ 否 → 创建非核心线程执行
                    └─ 是 → 执行拒绝策略

5.2 源码分析

/**
 * ThreadPoolExecutor.execute()方法核心逻辑
 * Core Logic of ThreadPoolExecutor.execute()
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();

    // 1. 如果当前线程数 < 核心线程数,创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 2. 如果线程池运行中且任务可以加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查线程池状态
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 如果队列满,尝试创建非核心线程
    else if (!addWorker(command, false))
        // 4. 如果创建失败,执行拒绝策略
        reject(command);
}

5.3 示例说明

/**
 * 线程池执行流程示例
 * Execution Flow Example
 */
public class ExecutionFlowDemo {
    public static void main(String[] args) {
        // 创建线程池:核心2,最大4,队列容量2
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(2),
            new ThreadPoolExecutor.AbortPolicy()
        );

        // 提交6个任务
        for (int i = 1; i <= 6; i++) {
            final int taskId = i;
            executor.execute(() -> {
                System.out.println("任务" + taskId + "执行");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // 执行流程:
        // 任务1、2 → 核心线程执行
        // 任务3、4 → 加入队列
        // 任务5、6 → 创建非核心线程执行
        // 如果还有任务 → 拒绝策略
    }
}

6. 拒绝策略 (Rejection Policy)

6.1 内置拒绝策略

AbortPolicy(默认策略)

/**
 * AbortPolicy:直接抛出异常
 * AbortPolicy: Throw Exception
 */
new ThreadPoolExecutor.AbortPolicy()

行为: 直接抛出RejectedExecutionException异常

CallerRunsPolicy(调用者运行策略)

/**
 * CallerRunsPolicy:调用者线程执行
 * CallerRunsPolicy: Caller Runs
 */
new ThreadPoolExecutor.CallerRunsPolicy()

行为: 在调用者线程中执行任务(会阻塞调用者)

DiscardPolicy(丢弃策略)

/**
 * DiscardPolicy:直接丢弃任务
 * DiscardPolicy: Discard Task
 */
new ThreadPoolExecutor.DiscardPolicy()

行为: 静默丢弃任务,不抛异常

DiscardOldestPolicy(丢弃最老任务)

/**
 * DiscardOldestPolicy:丢弃队列最老的任务
 * DiscardOldestPolicy: Discard Oldest Task
 */
new ThreadPoolExecutor.DiscardOldestPolicy()

行为: 丢弃队列头部的任务,然后尝试执行新任务

6.2 自定义拒绝策略

/**
 * 自定义拒绝策略:记录日志并保存到数据库
 * Custom Rejection Policy
 */
public class CustomRejectionPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录日志
        logger.warn("任务被拒绝: " + r.toString());

        // 保存到数据库(异步)
        saveToDatabase(r);

        // 或者发送告警
        sendAlert("线程池任务被拒绝");
    }
}

6.3 拒绝策略对比

策略 行为 适用场景
AbortPolicy 抛异常 需要知道任务被拒绝
CallerRunsPolicy 调用者执行 可以接受降级处理
DiscardPolicy 静默丢弃 可以容忍任务丢失
DiscardOldestPolicy 丢弃最老任务 优先处理新任务

7. 线程池监控 (Thread Pool Monitoring)

7.1 监控指标

/**
 * 线程池监控示例
 * Thread Pool Monitoring Example
 */
public class ThreadPoolMonitor {
    public void monitor(ThreadPoolExecutor executor) {
        // 核心线程数
        int corePoolSize = executor.getCorePoolSize();

        // 最大线程数
        int maximumPoolSize = executor.getMaximumPoolSize();

        // 当前线程数
        int poolSize = executor.getPoolSize();

        // 活跃线程数
        int activeCount = executor.getActiveCount();

        // 已完成任务数
        long completedTaskCount = executor.getCompletedTaskCount();

        // 总任务数
        long taskCount = executor.getTaskCount();

        // 队列大小
        int queueSize = executor.getQueue().size();

        // 打印监控信息
        System.out.println(String.format(
            "线程池监控 - 核心:%d, 最大:%d, 当前:%d, 活跃:%d, 已完成:%d, 总任务:%d, 队列:%d",
            corePoolSize, maximumPoolSize, poolSize, activeCount,
            completedTaskCount, taskCount, queueSize
        ));
    }
}

7.2 在算力平台中的应用

/**
 * 算力平台线程池监控
 * Computing Platform Thread Pool Monitoring
 */
@Component
public class PlatformThreadPoolMonitor {

    @Scheduled(fixedRate = 60000) // 每分钟监控一次
    public void monitor() {
        // 监控任务调度线程池
        monitorThreadPool(taskSchedulerPool, "任务调度线程池");

        // 监控结算处理线程池
        monitorThreadPool(billingPool, "结算处理线程池");
    }

    private void monitorThreadPool(ThreadPoolExecutor executor, String name) {
        int queueSize = executor.getQueue().size();
        int activeCount = executor.getActiveCount();

        // 告警:队列积压
        if (queueSize > 1000) {
            logger.warn("{}队列积压: {}", name, queueSize);
        }

        // 告警:线程池满载
        if (activeCount == executor.getMaximumPoolSize()) {
            logger.warn("{}线程池满载", name);
        }
    }
}

8. 最佳实践 (Best Practices)

8.1 线程池参数设置

/**
 * 线程池参数设置建议
 * Thread Pool Parameter Configuration
 */
public class ThreadPoolConfig {

    // CPU密集型任务
    public static ThreadPoolExecutor createCpuIntensivePool() {
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            corePoolSize,
            corePoolSize,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("cpu-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    // IO密集型任务
    public static ThreadPoolExecutor createIoIntensivePool() {
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        return new ThreadPoolExecutor(
            corePoolSize,
            corePoolSize * 2,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("io-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

8.2 正确关闭线程池

/**
 * 正确关闭线程池
 * Proper Thread Pool Shutdown
 */
public void shutdownThreadPool(ExecutorService executor) {
    executor.shutdown(); // 停止接收新任务

    try {
        // 等待已提交的任务完成
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            // 超时后强制关闭
            executor.shutdownNow();

            // 再次等待
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                logger.error("线程池未能正常关闭");
            }
        }
    } catch (InterruptedException e) {
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}

8.3 使用有界队列

// ❌ 不推荐:无界队列可能导致OOM
new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>()  // 无界队列
);

// ✅ 推荐:使用有界队列
new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),  // 有界队列
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);

8.4 自定义线程工厂

/**
 * 自定义线程工厂:便于问题排查
 * Custom Thread Factory
 */
ThreadFactory factory = new ThreadFactoryBuilder()
    .setNameFormat("task-pool-%d")  // 线程名称格式
    .setDaemon(false)               // 非守护线程
    .setPriority(Thread.NORM_PRIORITY)  // 正常优先级
    .setUncaughtExceptionHandler((t, e) -> {
        logger.error("线程{}未捕获异常", t.getName(), e);
    })
    .build();

9. 面试高频问题 (Interview Questions)

Q1: 线程池的核心参数有哪些?

答案: corePoolSize、maximumPoolSize、keepAliveTime、workQueue、threadFactory、handler

Q2: 线程池的执行流程?

答案:5. 线程池执行流程

Q3: 如何合理设置线程池参数?

答案: - CPU密集型:核心线程数 = CPU核心数 - IO密集型:核心线程数 = CPU核心数 * 2 - 使用有界队列,设置合理的拒绝策略

Q4: 线程池的拒绝策略有哪些?

答案: AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy

Q5: 如何监控线程池?

答案: 监控核心线程数、最大线程数、当前线程数、活跃线程数、队列大小、已完成任务数等指标


📖 扩展阅读


返回: 07-Java并发编程
上一章: 07-04 - 并发工具类
下一章: 07-06 - 并发集合 →