跳转至

并发工具类 (Concurrent Utilities)

深入理解CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Phaser等JUC工具类的使用场景和实现原理

目录


1. CountDownLatch (CountDownLatch)

1.1 核心特点

CountDownLatch(倒计时门闩):一个或多个线程等待其他线程完成操作。

特性 说明
一次性 计数器只能使用一次,不能重置
等待机制 一个或多个线程等待计数器归零
计数递减 其他线程调用countDown()递减计数
适用场景 等待多个任务完成

1.2 基本使用

import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch基本使用
 * Basic Usage of CountDownLatch
 */
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount);

        // 创建多个工作线程
        for (int i = 0; i < threadCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    // 执行任务
                    System.out.println("任务" + taskId + "执行中");
                    Thread.sleep(1000);
                    System.out.println("任务" + taskId + "完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 计数减1
                }
            }).start();
        }

        // 主线程等待所有任务完成
        latch.await(); // 阻塞直到计数器归零
        System.out.println("所有任务完成");
    }
}

1.3 核心方法

// 创建:指定初始计数
CountDownLatch latch = new CountDownLatch(5);

// 等待:阻塞直到计数归零
latch.await();

// 超时等待:设置超时时间
boolean completed = latch.await(5, TimeUnit.SECONDS);

// 计数减1:不阻塞
latch.countDown();

// 获取当前计数
long count = latch.getCount();

1.4 在算力平台中的应用

/**
 * 算力平台中的CountDownLatch应用
 * CountDownLatch in Computing Platform
 */
public class PlatformCountDownLatch {

    // 场景:等待所有节点信息采集完成
    public void collectAllNodeInfo(List<Node> nodes) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(nodes.size());

        // 并发采集所有节点信息
        for (Node node : nodes) {
            executorService.submit(() -> {
                try {
                    collectNodeInfo(node); // 采集节点信息
                } finally {
                    latch.countDown(); // 完成一个节点
                }
            });
        }

        // 等待所有节点采集完成
        latch.await();
        System.out.println("所有节点信息采集完成");

        // 继续后续处理
        processCollectedData();
    }

    // 场景:等待多个定时任务完成初始化
    public void initializeTasks() throws InterruptedException {
        CountDownLatch initLatch = new CountDownLatch(3);

        // 初始化任务调度器
        executorService.submit(() -> {
            initTaskScheduler();
            initLatch.countDown();
        });

        // 初始化节点监控
        executorService.submit(() -> {
            initNodeMonitor();
            initLatch.countDown();
        });

        // 初始化结算系统
        executorService.submit(() -> {
            initBillingSystem();
            initLatch.countDown();
        });

        // 等待所有初始化完成
        initLatch.await();
        System.out.println("系统初始化完成");
    }
}

2. CyclicBarrier (CyclicBarrier)

2.1 核心特点

CyclicBarrier(循环屏障):多个线程相互等待,到达屏障点后一起继续执行。

特性 说明
可重用 计数器可以重置,可以多次使用
相互等待 多个线程相互等待到达屏障点
屏障动作 到达屏障点后可以执行指定动作
适用场景 分阶段任务,需要同步点

2.2 基本使用

import java.util.concurrent.CyclicBarrier;

/**
 * CyclicBarrier基本使用
 * Basic Usage of CyclicBarrier
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            // 屏障动作:所有线程到达后执行
            System.out.println("所有线程到达屏障点");
        });

        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("线程" + threadId + "执行阶段1");
                    Thread.sleep(1000);
                    barrier.await(); // 等待其他线程

                    System.out.println("线程" + threadId + "执行阶段2");
                    Thread.sleep(1000);
                    barrier.await(); // 再次等待

                    System.out.println("线程" + threadId + "完成");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

2.3 CountDownLatch vs CyclicBarrier

特性 CountDownLatch CyclicBarrier
计数方向 递减到0 递增到指定值
可重用 否(一次性) 是(可循环使用)
等待关系 一个/多个线程等待其他线程 多个线程相互等待
适用场景 等待任务完成 分阶段同步

2.4 在算力平台中的应用

/**
 * 算力平台中的CyclicBarrier应用
 * CyclicBarrier in Computing Platform
 */
public class PlatformCyclicBarrier {

    // 场景:分阶段处理任务
    public void processTasksInPhases(List<Task> tasks) {
        int phaseCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(tasks.size(), () -> {
            System.out.println("当前阶段完成,进入下一阶段");
        });

        for (Task task : tasks) {
            executorService.submit(() -> {
                try {
                    // 阶段1:数据准备
                    prepareData(task);
                    barrier.await();

                    // 阶段2:数据处理
                    processData(task);
                    barrier.await();

                    // 阶段3:结果输出
                    outputResult(task);
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

3. Semaphore (Semaphore)

3.1 核心特点

Semaphore(信号量):控制同时访问资源的线程数量。

特性 说明
许可机制 通过许可(permits)控制访问
可重用 许可可以释放,可以重复使用
公平性 支持公平和非公平模式
适用场景 限流、资源池管理

3.2 基本使用

import java.util.concurrent.Semaphore;

/**
 * Semaphore基本使用
 * Basic Usage of Semaphore
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 创建信号量:最多3个线程同时访问
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 10; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println("线程" + threadId + "获得许可,开始执行");
                    Thread.sleep(2000);
                    System.out.println("线程" + threadId + "执行完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }).start();
        }
    }
}

3.3 核心方法

// 创建:指定许可数量
Semaphore semaphore = new Semaphore(5);

// 公平模式
Semaphore fairSemaphore = new Semaphore(5, true);

// 获取许可(阻塞)
semaphore.acquire();

// 获取多个许可
semaphore.acquire(3);

// 尝试获取许可(非阻塞)
boolean acquired = semaphore.tryAcquire();

// 超时获取许可
boolean acquired = semaphore.tryAcquire(5, TimeUnit.SECONDS);

// 释放许可
semaphore.release();

// 释放多个许可
semaphore.release(3);

// 获取可用许可数
int available = semaphore.availablePermits();

3.4 在算力平台中的应用

/**
 * 算力平台中的Semaphore应用
 * Semaphore in Computing Platform
 */
public class PlatformSemaphore {

    // 场景1:限制并发请求Nomad API的数量
    private Semaphore nomadApiSemaphore = new Semaphore(10); // 最多10个并发请求

    public void callNomadApi(String endpoint) throws InterruptedException {
        nomadApiSemaphore.acquire(); // 获取许可
        try {
            // 调用Nomad API
            nomadClient.get(endpoint);
        } finally {
            nomaphore.release(); // 释放许可
        }
    }

    // 场景2:限制数据库连接数
    private Semaphore dbConnectionSemaphore = new Semaphore(20); // 最多20个连接

    public void executeQuery(String sql) throws InterruptedException {
        dbConnectionSemaphore.acquire();
        try {
            // 执行数据库查询
            connection.execute(sql);
        } finally {
            dbConnectionSemaphore.release();
        }
    }

    // 场景3:限流:限制任务提交速率
    private Semaphore taskSubmissionSemaphore = new Semaphore(100); // 最多100个待处理任务

    public void submitTask(Task task) throws InterruptedException {
        if (taskSubmissionSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
            try {
                // 提交任务
                taskQueue.put(task);
            } finally {
                taskSubmissionSemaphore.release();
            }
        } else {
            throw new TooManyTasksException("任务提交过多,请稍后重试");
        }
    }
}

4. Exchanger (Exchanger)

4.1 核心特点

Exchanger(交换器):两个线程在同步点交换数据。

特性 说明
成对交换 只能两个线程交换数据
同步点 两个线程都到达交换点才交换
适用场景 生产者-消费者模式、数据交换

4.2 基本使用

import java.util.concurrent.Exchanger;

/**
 * Exchanger基本使用
 * Basic Usage of Exchanger
 */
public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        // 生产者线程
        new Thread(() -> {
            try {
                String data = "生产的数据";
                System.out.println("生产者发送: " + data);
                String received = exchanger.exchange(data); // 交换数据
                System.out.println("生产者收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        // 消费者线程
        new Thread(() -> {
            try {
                Thread.sleep(1000); // 模拟处理时间
                String data = "处理后的数据";
                System.out.println("消费者发送: " + data);
                String received = exchanger.exchange(data); // 交换数据
                System.out.println("消费者收到: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

4.3 在算力平台中的应用

/**
 * 算力平台中的Exchanger应用
 * Exchanger in Computing Platform
 */
public class PlatformExchanger {

    // 场景:数据转换管道
    public void dataPipeline() {
        Exchanger<Data> exchanger = new Exchanger<>();

        // 数据采集线程
        executorService.submit(() -> {
            try {
                while (true) {
                    Data rawData = collectData(); // 采集原始数据
                    Data processedData = exchanger.exchange(rawData); // 交换处理后的数据
                    storeData(processedData); // 存储数据
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 数据处理线程
        executorService.submit(() -> {
            try {
                while (true) {
                    Data rawData = exchanger.exchange(null); // 等待原始数据
                    Data processedData = processData(rawData); // 处理数据
                    exchanger.exchange(processedData); // 返回处理后的数据
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
}

5. Phaser (Phaser)

5.1 核心特点

Phaser(阶段器):JDK 1.7引入,可以动态调整参与线程数,支持多阶段同步。

特性 说明
动态调整 可以动态注册和注销线程
多阶段 支持多个阶段的同步
灵活 比CyclicBarrier和CountDownLatch更灵活
适用场景 复杂的分阶段任务

5.2 基本使用

import java.util.concurrent.Phaser;

/**
 * Phaser基本使用
 * Basic Usage of Phaser
 */
public class PhaserDemo {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 3个参与线程

        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            new Thread(() -> {
                System.out.println("线程" + threadId + "执行阶段1");
                phaser.arriveAndAwaitAdvance(); // 到达并等待

                System.out.println("线程" + threadId + "执行阶段2");
                phaser.arriveAndAwaitAdvance();

                System.out.println("线程" + threadId + "完成");
                phaser.arriveAndDeregister(); // 到达并注销
            }).start();
        }
    }
}

5.3 核心方法

// 创建:指定初始参与线程数
Phaser phaser = new Phaser(5);

// 注册线程
phaser.register();

// 注销线程
phaser.arriveAndDeregister();

// 到达并等待其他线程
phaser.arriveAndAwaitAdvance();

// 到达但不等待
int phase = phaser.arrive();

// 获取当前阶段
int currentPhase = phaser.getPhase();

// 获取注册的线程数
int parties = phaser.getRegisteredParties();

5.4 在算力平台中的应用

/**
 * 算力平台中的Phaser应用
 * Phaser in Computing Platform
 */
public class PlatformPhaser {

    // 场景:多阶段任务处理
    public void multiPhaseTaskProcessing(List<Task> tasks) {
        Phaser phaser = new Phaser(1); // 主线程也参与

        for (Task task : tasks) {
            phaser.register(); // 注册任务线程
            executorService.submit(() -> {
                try {
                    // 阶段1:准备
                    prepareTask(task);
                    phaser.arriveAndAwaitAdvance();

                    // 阶段2:执行
                    executeTask(task);
                    phaser.arriveAndAwaitAdvance();

                    // 阶段3:清理
                    cleanupTask(task);
                    phaser.arriveAndDeregister();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        // 主线程等待所有阶段完成
        phaser.arriveAndAwaitAdvance(); // 阶段1
        phaser.arriveAndAwaitAdvance(); // 阶段2
        phaser.arriveAndAwaitAdvance(); // 阶段3
    }
}

6. 工具类对比 (Utilities Comparison)

6.1 功能对比表

工具类 主要功能 线程数 可重用 适用场景
CountDownLatch 等待任务完成 多个 等待多个任务完成
CyclicBarrier 分阶段同步 多个 分阶段任务同步
Semaphore 控制并发数 多个 限流、资源池
Exchanger 数据交换 2个 两个线程交换数据
Phaser 多阶段同步 多个 复杂分阶段任务

6.2 选择建议

使用CountDownLatch: - 等待多个任务完成 - 一次性使用

使用CyclicBarrier: - 分阶段任务需要同步点 - 需要重复使用

使用Semaphore: - 需要限制并发数 - 限流场景

使用Exchanger: - 两个线程需要交换数据 - 生产者-消费者模式

使用Phaser: - 复杂的分阶段任务 - 需要动态调整线程数


7. 最佳实践 (Best Practices)

7.1 CountDownLatch:确保在finally中countDown

// ✅ 正确
try {
    // 执行任务
} finally {
    latch.countDown(); // 确保计数减1
}

// ❌ 错误:如果异常,计数不会减1
latch.countDown();
// 执行任务(可能抛异常)

7.2 Semaphore:合理设置许可数

// 根据系统资源设置许可数
// CPU密集型:许可数 = CPU核心数
Semaphore cpuSemaphore = new Semaphore(Runtime.getRuntime().availableProcessors());

// IO密集型:许可数可以更大
Semaphore ioSemaphore = new Semaphore(50);

7.3 CyclicBarrier:处理异常

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    // 屏障动作
});

try {
    barrier.await();
} catch (BrokenBarrierException e) {
    // 处理屏障被破坏的情况
    // 其他线程可能已经失败
}

8. 面试高频问题 (Interview Questions)

Q1: CountDownLatch和CyclicBarrier的区别?

答案: - CountDownLatch:一个/多个线程等待其他线程完成,一次性使用 - CyclicBarrier:多个线程相互等待到达屏障点,可重用

Q2: Semaphore的实现原理?

答案: 基于AQS(AbstractQueuedSynchronizer)实现,通过许可(permits)控制并发数。

Q3: 如何实现限流?

答案: 使用Semaphore,设置许可数限制并发请求数。

Q4: Exchanger的使用场景?

答案: 两个线程需要交换数据的场景,如生产者-消费者模式。

Q5: Phaser相比CyclicBarrier的优势?

答案: 可以动态调整参与线程数,支持更复杂的分阶段任务。


📖 扩展阅读


返回: 07-Java并发编程
上一章: 07-03 - 锁机制
下一章: 07-05 - 线程池 →