并发工具类 (Concurrent Utilities)¶
深入理解CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Phaser等JUC工具类的使用场景和实现原理
目录¶
1. CountDownLatch (CountDownLatch)¶
1.1 核心特点¶
CountDownLatch(倒计时门闩):一个或多个线程等待其他线程完成操作。
| 特性 | 说明 |
|---|---|
| 一次性 | 计数器只能使用一次,不能重置 |
| 等待机制 | 一个或多个线程等待计数器归零 |
| 计数递减 | 其他线程调用countDown()递减计数 |
| 适用场景 | 等待多个任务完成 |
1.2 基本使用¶
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch基本使用
* Basic Usage of CountDownLatch
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
// 创建多个工作线程
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
// 执行任务
System.out.println("任务" + taskId + "执行中");
Thread.sleep(1000);
System.out.println("任务" + taskId + "完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数减1
}
}).start();
}
// 主线程等待所有任务完成
latch.await(); // 阻塞直到计数器归零
System.out.println("所有任务完成");
}
}
1.3 核心方法¶
// 创建:指定初始计数
CountDownLatch latch = new CountDownLatch(5);
// 等待:阻塞直到计数归零
latch.await();
// 超时等待:设置超时时间
boolean completed = latch.await(5, TimeUnit.SECONDS);
// 计数减1:不阻塞
latch.countDown();
// 获取当前计数
long count = latch.getCount();
1.4 在算力平台中的应用¶
/**
* 算力平台中的CountDownLatch应用
* CountDownLatch in Computing Platform
*/
public class PlatformCountDownLatch {
// 场景:等待所有节点信息采集完成
public void collectAllNodeInfo(List<Node> nodes) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(nodes.size());
// 并发采集所有节点信息
for (Node node : nodes) {
executorService.submit(() -> {
try {
collectNodeInfo(node); // 采集节点信息
} finally {
latch.countDown(); // 完成一个节点
}
});
}
// 等待所有节点采集完成
latch.await();
System.out.println("所有节点信息采集完成");
// 继续后续处理
processCollectedData();
}
// 场景:等待多个定时任务完成初始化
public void initializeTasks() throws InterruptedException {
CountDownLatch initLatch = new CountDownLatch(3);
// 初始化任务调度器
executorService.submit(() -> {
initTaskScheduler();
initLatch.countDown();
});
// 初始化节点监控
executorService.submit(() -> {
initNodeMonitor();
initLatch.countDown();
});
// 初始化结算系统
executorService.submit(() -> {
initBillingSystem();
initLatch.countDown();
});
// 等待所有初始化完成
initLatch.await();
System.out.println("系统初始化完成");
}
}
2. CyclicBarrier (CyclicBarrier)¶
2.1 核心特点¶
CyclicBarrier(循环屏障):多个线程相互等待,到达屏障点后一起继续执行。
| 特性 | 说明 |
|---|---|
| 可重用 | 计数器可以重置,可以多次使用 |
| 相互等待 | 多个线程相互等待到达屏障点 |
| 屏障动作 | 到达屏障点后可以执行指定动作 |
| 适用场景 | 分阶段任务,需要同步点 |
2.2 基本使用¶
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier基本使用
* Basic Usage of CyclicBarrier
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
// 屏障动作:所有线程到达后执行
System.out.println("所有线程到达屏障点");
});
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程" + threadId + "执行阶段1");
Thread.sleep(1000);
barrier.await(); // 等待其他线程
System.out.println("线程" + threadId + "执行阶段2");
Thread.sleep(1000);
barrier.await(); // 再次等待
System.out.println("线程" + threadId + "完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
2.3 CountDownLatch vs CyclicBarrier¶
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 计数方向 | 递减到0 | 递增到指定值 |
| 可重用 | 否(一次性) | 是(可循环使用) |
| 等待关系 | 一个/多个线程等待其他线程 | 多个线程相互等待 |
| 适用场景 | 等待任务完成 | 分阶段同步 |
2.4 在算力平台中的应用¶
/**
* 算力平台中的CyclicBarrier应用
* CyclicBarrier in Computing Platform
*/
public class PlatformCyclicBarrier {
// 场景:分阶段处理任务
public void processTasksInPhases(List<Task> tasks) {
int phaseCount = 3;
CyclicBarrier barrier = new CyclicBarrier(tasks.size(), () -> {
System.out.println("当前阶段完成,进入下一阶段");
});
for (Task task : tasks) {
executorService.submit(() -> {
try {
// 阶段1:数据准备
prepareData(task);
barrier.await();
// 阶段2:数据处理
processData(task);
barrier.await();
// 阶段3:结果输出
outputResult(task);
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
3. Semaphore (Semaphore)¶
3.1 核心特点¶
Semaphore(信号量):控制同时访问资源的线程数量。
| 特性 | 说明 |
|---|---|
| 许可机制 | 通过许可(permits)控制访问 |
| 可重用 | 许可可以释放,可以重复使用 |
| 公平性 | 支持公平和非公平模式 |
| 适用场景 | 限流、资源池管理 |
3.2 基本使用¶
import java.util.concurrent.Semaphore;
/**
* Semaphore基本使用
* Basic Usage of Semaphore
*/
public class SemaphoreDemo {
public static void main(String[] args) {
// 创建信号量:最多3个线程同时访问
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
final int threadId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("线程" + threadId + "获得许可,开始执行");
Thread.sleep(2000);
System.out.println("线程" + threadId + "执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}
3.3 核心方法¶
// 创建:指定许可数量
Semaphore semaphore = new Semaphore(5);
// 公平模式
Semaphore fairSemaphore = new Semaphore(5, true);
// 获取许可(阻塞)
semaphore.acquire();
// 获取多个许可
semaphore.acquire(3);
// 尝试获取许可(非阻塞)
boolean acquired = semaphore.tryAcquire();
// 超时获取许可
boolean acquired = semaphore.tryAcquire(5, TimeUnit.SECONDS);
// 释放许可
semaphore.release();
// 释放多个许可
semaphore.release(3);
// 获取可用许可数
int available = semaphore.availablePermits();
3.4 在算力平台中的应用¶
/**
* 算力平台中的Semaphore应用
* Semaphore in Computing Platform
*/
public class PlatformSemaphore {
// 场景1:限制并发请求Nomad API的数量
private Semaphore nomadApiSemaphore = new Semaphore(10); // 最多10个并发请求
public void callNomadApi(String endpoint) throws InterruptedException {
nomadApiSemaphore.acquire(); // 获取许可
try {
// 调用Nomad API
nomadClient.get(endpoint);
} finally {
nomaphore.release(); // 释放许可
}
}
// 场景2:限制数据库连接数
private Semaphore dbConnectionSemaphore = new Semaphore(20); // 最多20个连接
public void executeQuery(String sql) throws InterruptedException {
dbConnectionSemaphore.acquire();
try {
// 执行数据库查询
connection.execute(sql);
} finally {
dbConnectionSemaphore.release();
}
}
// 场景3:限流:限制任务提交速率
private Semaphore taskSubmissionSemaphore = new Semaphore(100); // 最多100个待处理任务
public void submitTask(Task task) throws InterruptedException {
if (taskSubmissionSemaphore.tryAcquire(5, TimeUnit.SECONDS)) {
try {
// 提交任务
taskQueue.put(task);
} finally {
taskSubmissionSemaphore.release();
}
} else {
throw new TooManyTasksException("任务提交过多,请稍后重试");
}
}
}
4. Exchanger (Exchanger)¶
4.1 核心特点¶
Exchanger(交换器):两个线程在同步点交换数据。
| 特性 | 说明 |
|---|---|
| 成对交换 | 只能两个线程交换数据 |
| 同步点 | 两个线程都到达交换点才交换 |
| 适用场景 | 生产者-消费者模式、数据交换 |
4.2 基本使用¶
import java.util.concurrent.Exchanger;
/**
* Exchanger基本使用
* Basic Usage of Exchanger
*/
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
// 生产者线程
new Thread(() -> {
try {
String data = "生产的数据";
System.out.println("生产者发送: " + data);
String received = exchanger.exchange(data); // 交换数据
System.out.println("生产者收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
Thread.sleep(1000); // 模拟处理时间
String data = "处理后的数据";
System.out.println("消费者发送: " + data);
String received = exchanger.exchange(data); // 交换数据
System.out.println("消费者收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
4.3 在算力平台中的应用¶
/**
* 算力平台中的Exchanger应用
* Exchanger in Computing Platform
*/
public class PlatformExchanger {
// 场景:数据转换管道
public void dataPipeline() {
Exchanger<Data> exchanger = new Exchanger<>();
// 数据采集线程
executorService.submit(() -> {
try {
while (true) {
Data rawData = collectData(); // 采集原始数据
Data processedData = exchanger.exchange(rawData); // 交换处理后的数据
storeData(processedData); // 存储数据
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 数据处理线程
executorService.submit(() -> {
try {
while (true) {
Data rawData = exchanger.exchange(null); // 等待原始数据
Data processedData = processData(rawData); // 处理数据
exchanger.exchange(processedData); // 返回处理后的数据
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
5. Phaser (Phaser)¶
5.1 核心特点¶
Phaser(阶段器):JDK 1.7引入,可以动态调整参与线程数,支持多阶段同步。
| 特性 | 说明 |
|---|---|
| 动态调整 | 可以动态注册和注销线程 |
| 多阶段 | 支持多个阶段的同步 |
| 灵活 | 比CyclicBarrier和CountDownLatch更灵活 |
| 适用场景 | 复杂的分阶段任务 |
5.2 基本使用¶
import java.util.concurrent.Phaser;
/**
* Phaser基本使用
* Basic Usage of Phaser
*/
public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 3个参与线程
for (int i = 0; i < 3; i++) {
final int threadId = i;
new Thread(() -> {
System.out.println("线程" + threadId + "执行阶段1");
phaser.arriveAndAwaitAdvance(); // 到达并等待
System.out.println("线程" + threadId + "执行阶段2");
phaser.arriveAndAwaitAdvance();
System.out.println("线程" + threadId + "完成");
phaser.arriveAndDeregister(); // 到达并注销
}).start();
}
}
}
5.3 核心方法¶
// 创建:指定初始参与线程数
Phaser phaser = new Phaser(5);
// 注册线程
phaser.register();
// 注销线程
phaser.arriveAndDeregister();
// 到达并等待其他线程
phaser.arriveAndAwaitAdvance();
// 到达但不等待
int phase = phaser.arrive();
// 获取当前阶段
int currentPhase = phaser.getPhase();
// 获取注册的线程数
int parties = phaser.getRegisteredParties();
5.4 在算力平台中的应用¶
/**
* 算力平台中的Phaser应用
* Phaser in Computing Platform
*/
public class PlatformPhaser {
// 场景:多阶段任务处理
public void multiPhaseTaskProcessing(List<Task> tasks) {
Phaser phaser = new Phaser(1); // 主线程也参与
for (Task task : tasks) {
phaser.register(); // 注册任务线程
executorService.submit(() -> {
try {
// 阶段1:准备
prepareTask(task);
phaser.arriveAndAwaitAdvance();
// 阶段2:执行
executeTask(task);
phaser.arriveAndAwaitAdvance();
// 阶段3:清理
cleanupTask(task);
phaser.arriveAndDeregister();
} catch (Exception e) {
e.printStackTrace();
}
});
}
// 主线程等待所有阶段完成
phaser.arriveAndAwaitAdvance(); // 阶段1
phaser.arriveAndAwaitAdvance(); // 阶段2
phaser.arriveAndAwaitAdvance(); // 阶段3
}
}
6. 工具类对比 (Utilities Comparison)¶
6.1 功能对比表¶
| 工具类 | 主要功能 | 线程数 | 可重用 | 适用场景 |
|---|---|---|---|---|
| CountDownLatch | 等待任务完成 | 多个 | 否 | 等待多个任务完成 |
| CyclicBarrier | 分阶段同步 | 多个 | 是 | 分阶段任务同步 |
| Semaphore | 控制并发数 | 多个 | 是 | 限流、资源池 |
| Exchanger | 数据交换 | 2个 | 是 | 两个线程交换数据 |
| Phaser | 多阶段同步 | 多个 | 是 | 复杂分阶段任务 |
6.2 选择建议¶
使用CountDownLatch: - 等待多个任务完成 - 一次性使用
使用CyclicBarrier: - 分阶段任务需要同步点 - 需要重复使用
使用Semaphore: - 需要限制并发数 - 限流场景
使用Exchanger: - 两个线程需要交换数据 - 生产者-消费者模式
使用Phaser: - 复杂的分阶段任务 - 需要动态调整线程数
7. 最佳实践 (Best Practices)¶
7.1 CountDownLatch:确保在finally中countDown¶
// ✅ 正确
try {
// 执行任务
} finally {
latch.countDown(); // 确保计数减1
}
// ❌ 错误:如果异常,计数不会减1
latch.countDown();
// 执行任务(可能抛异常)
7.2 Semaphore:合理设置许可数¶
// 根据系统资源设置许可数
// CPU密集型:许可数 = CPU核心数
Semaphore cpuSemaphore = new Semaphore(Runtime.getRuntime().availableProcessors());
// IO密集型:许可数可以更大
Semaphore ioSemaphore = new Semaphore(50);
7.3 CyclicBarrier:处理异常¶
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// 屏障动作
});
try {
barrier.await();
} catch (BrokenBarrierException e) {
// 处理屏障被破坏的情况
// 其他线程可能已经失败
}
8. 面试高频问题 (Interview Questions)¶
Q1: CountDownLatch和CyclicBarrier的区别?¶
答案: - CountDownLatch:一个/多个线程等待其他线程完成,一次性使用 - CyclicBarrier:多个线程相互等待到达屏障点,可重用
Q2: Semaphore的实现原理?¶
答案: 基于AQS(AbstractQueuedSynchronizer)实现,通过许可(permits)控制并发数。
Q3: 如何实现限流?¶
答案: 使用Semaphore,设置许可数限制并发请求数。
Q4: Exchanger的使用场景?¶
答案: 两个线程需要交换数据的场景,如生产者-消费者模式。
Q5: Phaser相比CyclicBarrier的优势?¶
答案: 可以动态调整参与线程数,支持更复杂的分阶段任务。
📖 扩展阅读¶
返回: 07-Java并发编程
上一章: 07-03 - 锁机制
下一章: 07-05 - 线程池 →