并发设计模式 (Concurrent Design Patterns)¶
掌握生产者-消费者模式、ThreadLocal、Future模式等并发设计模式,提升并发编程能力
目录¶
1. 生产者-消费者模式 (Producer-Consumer Pattern)¶
1.1 模式概述¶
生产者-消费者模式:生产者生产数据放入队列,消费者从队列中取出数据消费。
核心组件: - 生产者(Producer) - 生产数据 - 消费者(Consumer) - 消费数据 - 缓冲区(Buffer) - 使用BlockingQueue作为缓冲区
1.2 基本实现¶
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 生产者-消费者模式基本实现
* Basic Producer-Consumer Pattern
*/
public class ProducerConsumerDemo {
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// 生产者
class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
String item = "item-" + i;
queue.put(item); // 阻塞直到有空间
System.out.println("生产: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
String item = queue.take(); // 阻塞直到有元素
System.out.println("消费: " + item);
processItem(item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processItem(String item) {
// 处理数据
}
}
public void start() {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}
1.3 多生产者多消费者¶
/**
* 多生产者多消费者模式
* Multiple Producers and Consumers
*/
public class MultiProducerConsumer {
private BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(100);
private ExecutorService producerPool = Executors.newFixedThreadPool(5);
private ExecutorService consumerPool = Executors.newFixedThreadPool(10);
public void start() {
// 启动多个生产者
for (int i = 0; i < 5; i++) {
producerPool.submit(() -> {
while (true) {
Task task = generateTask();
taskQueue.put(task);
}
});
}
// 启动多个消费者
for (int i = 0; i < 10; i++) {
consumerPool.submit(() -> {
while (true) {
Task task = taskQueue.take();
processTask(task);
}
});
}
}
}
1.4 在算力平台中的应用¶
/**
* 算力平台中的生产者-消费者模式
* Producer-Consumer Pattern in Computing Platform
*/
public class PlatformProducerConsumer {
// 任务状态队列:生产者-消费者模式
private BlockingQueue<TaskStatus> statusQueue = new LinkedBlockingQueue<>(1000);
// 生产者:定时任务采集Nomad任务状态
@Scheduled(fixedRate = 5000)
public void produceTaskStatus() {
List<TaskStatus> statuses = nomadClient.getTaskStatuses();
for (TaskStatus status : statuses) {
try {
statusQueue.put(status); // 生产数据
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者:批量更新数据库
@PostConstruct
public void consumeTaskStatus() {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
List<TaskStatus> batch = new ArrayList<>();
while (true) {
try {
// 批量消费
statusQueue.drainTo(batch, 100);
if (!batch.isEmpty()) {
batchUpdateDatabase(batch);
batch.clear();
} else {
TaskStatus status = statusQueue.take(); // 阻塞等待
batch.add(status);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
2. ThreadLocal模式 (ThreadLocal Pattern)¶
2.1 核心特点¶
ThreadLocal(线程本地变量):为每个线程提供独立的变量副本,线程间互不干扰。
| 特性 | 说明 |
|---|---|
| 线程隔离 | 每个线程有独立的变量副本 |
| 避免同步 | 不需要同步,性能好 |
| 内存泄漏 | 需要注意内存泄漏问题 |
| 适用场景 | 线程上下文、用户信息传递 |
2.2 基本使用¶
import java.util.concurrent.ThreadLocal;
/**
* ThreadLocal基本使用
* Basic Usage of ThreadLocal
*/
public class ThreadLocalDemo {
// 方式1:使用ThreadLocal类
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
// 方式2:使用ThreadLocalRandom(JDK 1.7+)
private ThreadLocalRandom random = ThreadLocalRandom.current();
public void setValue(String value) {
threadLocal.set(value); // 设置当前线程的值
}
public String getValue() {
return threadLocal.get(); // 获取当前线程的值
}
public void remove() {
threadLocal.remove(); // 移除当前线程的值(防止内存泄漏)
}
public static void main(String[] args) {
ThreadLocalDemo demo = new ThreadLocalDemo();
// 线程1
new Thread(() -> {
demo.setValue("线程1的值");
System.out.println("线程1: " + demo.getValue()); // 线程1的值
}).start();
// 线程2
new Thread(() -> {
demo.setValue("线程2的值");
System.out.println("线程2: " + demo.getValue()); // 线程2的值
}).start();
}
}
2.3 ThreadLocal实现原理¶
/**
* ThreadLocal实现原理(简化版)
* ThreadLocal Implementation Principle
*/
public class ThreadLocal<T> {
// ThreadLocalMap:Thread的成员变量
// ThreadLocalMap.Entry[] table
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t); // 获取当前线程的ThreadLocalMap
if (map != null) {
map.set(this, value); // 以ThreadLocal为key存储值
} else {
createMap(t, value);
}
}
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
return (T) e.value;
}
}
return setInitialValue();
}
}
关键点: - 每个Thread有一个ThreadLocalMap - ThreadLocal作为key,值作为value - 线程隔离,互不干扰
2.4 内存泄漏问题¶
/**
* ThreadLocal内存泄漏问题
* ThreadLocal Memory Leak Problem
*/
public class ThreadLocalMemoryLeak {
private static ThreadLocal<LargeObject> threadLocal = new ThreadLocal<>();
public void problem() {
threadLocal.set(new LargeObject()); // 设置大对象
// 忘记remove(),导致内存泄漏
// ThreadLocalMap.Entry的key是弱引用,但value是强引用
}
public void solution() {
try {
threadLocal.set(new LargeObject());
// 使用ThreadLocal
useThreadLocal();
} finally {
threadLocal.remove(); // ✅ 必须remove,防止内存泄漏
}
}
}
内存泄漏原因: - ThreadLocalMap.Entry的key是弱引用(ThreadLocal) - 但value是强引用 - 如果ThreadLocal被回收,但value还在,导致内存泄漏
解决方案:
- 使用完ThreadLocal后调用remove()
- 使用线程池时,线程复用,必须remove
2.5 在算力平台中的应用¶
/**
* 算力平台中的ThreadLocal应用
* ThreadLocal in Computing Platform
*/
public class PlatformThreadLocal {
// 用户上下文:存储当前请求的用户信息
private static ThreadLocal<UserContext> userContext = new ThreadLocal<>();
// 请求ID:追踪请求链路
private static ThreadLocal<String> requestId = new ThreadLocal<>();
// 拦截器:设置用户上下文
@Component
public class UserContextInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) {
// 从请求头获取用户信息
String userId = request.getHeader("X-User-Id");
userContext.set(new UserContext(userId));
// 生成请求ID
requestId.set(UUID.randomUUID().toString());
return true;
}
@Override
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler,
Exception ex) {
// 清理ThreadLocal,防止内存泄漏
userContext.remove();
requestId.remove();
}
}
// 业务代码:获取当前用户
public void businessMethod() {
UserContext context = userContext.get(); // 获取当前线程的用户上下文
String currentUserId = context.getUserId();
// 记录日志:包含请求ID
logger.info("请求ID: {}, 用户ID: {}", requestId.get(), currentUserId);
}
}
3. Future模式 (Future Pattern)¶
3.1 核心特点¶
Future模式:异步执行任务,通过Future对象获取结果。
| 特性 | 说明 |
|---|---|
| 异步执行 | 任务异步执行,不阻塞主线程 |
| 结果获取 | 通过Future获取结果 |
| 超时控制 | 可以设置超时时间 |
| 适用场景 | 耗时操作、并行计算 |
3.2 基本使用¶
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Future模式基本使用
* Basic Usage of Future Pattern
*/
public class FutureDemo {
private ExecutorService executor = Executors.newFixedThreadPool(5);
public void basicUsage() throws Exception {
// 提交任务,返回Future
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "任务完成";
});
// 方式1:阻塞获取结果
String result = future.get();
// 方式2:超时获取结果
String result2 = future.get(5, TimeUnit.SECONDS);
// 方式3:检查是否完成
if (future.isDone()) {
String result3 = future.get();
}
// 方式4:取消任务
boolean cancelled = future.cancel(true);
}
// 并行执行多个任务
public void parallelExecution() throws Exception {
List<Future<String>> futures = new ArrayList<>();
// 提交多个任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
Future<String> future = executor.submit(() -> {
return "任务" + taskId + "完成";
});
futures.add(future);
}
// 收集结果
List<String> results = new ArrayList<>();
for (Future<String> future : futures) {
results.add(future.get());
}
}
}
3.3 CompletableFuture(JDK 1.8+)¶
import java.util.concurrent.CompletableFuture;
/**
* CompletableFuture使用示例
* CompletableFuture Usage Example
*/
public class CompletableFutureDemo {
// 异步执行
public void asyncExecution() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "异步任务结果";
});
// 链式调用
future.thenApply(s -> s + "处理")
.thenAccept(System.out::println)
.thenRun(() -> System.out.println("完成"));
}
// 组合多个Future
public void combineFutures() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");
// 等待两个Future都完成
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> {
return s1 + " + " + s2;
});
}
// 异常处理
public void exceptionHandling() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("异常");
});
future.exceptionally(ex -> {
System.out.println("处理异常: " + ex.getMessage());
return "默认值";
});
}
}
3.4 在算力平台中的应用¶
/**
* 算力平台中的Future模式应用
* Future Pattern in Computing Platform
*/
public class PlatformFuture {
// 场景:并行采集多个节点信息
public Map<String, NodeInfo> collectNodeInfoParallel(List<String> nodeIds) {
List<CompletableFuture<NodeInfo>> futures = nodeIds.stream()
.map(nodeId -> CompletableFuture.supplyAsync(() -> {
return nomadClient.getNodeInfo(nodeId);
}, executorService))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
// 收集结果
Map<String, NodeInfo> result = new HashMap<>();
for (int i = 0; i < nodeIds.size(); i++) {
result.put(nodeIds.get(i), futures.get(i).join());
}
return result;
}
// 场景:异步处理任务状态更新
public void updateTaskStatusAsync(String taskId, TaskStatus status) {
CompletableFuture.runAsync(() -> {
// 更新数据库
taskRepository.updateStatus(taskId, status);
// 更新ES
esClient.updateTaskStatus(taskId, status);
}, executorService).exceptionally(ex -> {
logger.error("更新任务状态失败", ex);
return null;
});
}
}
4. 不变模式 (Immutable Pattern)¶
4.1 核心特点¶
不变模式(Immutable Pattern):对象创建后状态不可变,天然线程安全。
实现方式:
- 所有字段final
- 不提供setter方法
- 类声明为final(防止继承)
- 返回新对象而不是修改原对象
4.2 基本实现¶
/**
* 不变模式示例
* Immutable Pattern Example
*/
public final class ImmutablePoint {
private final int x;
private final int y;
public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() {
return x;
}
public int getY() {
return y;
}
// 返回新对象,而不是修改原对象
public ImmutablePoint move(int deltaX, int deltaY) {
return new ImmutablePoint(x + deltaX, y + deltaY);
}
}
4.3 String的不变性¶
/**
* String不变性示例
* String Immutability Example
*/
public class StringImmutability {
public void demonstrate() {
String str = "Hello";
str.concat(" World"); // 返回新String,原String不变
System.out.println(str); // 仍然是"Hello"
String newStr = str.concat(" World"); // 需要接收新对象
System.out.println(newStr); // "Hello World"
}
}
5. 工作窃取模式 (Work-Stealing Pattern)¶
5.1 核心特点¶
工作窃取模式(Work-Stealing):ForkJoinPool使用的模式,空闲线程从其他线程的队列中窃取任务。
5.2 ForkJoinPool¶
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* ForkJoinPool工作窃取示例
* ForkJoinPool Work-Stealing Example
*/
public class ForkJoinDemo {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
// 提交任务
Long result = pool.invoke(new SumTask(1, 1000000));
System.out.println("结果: " + result);
}
static class SumTask extends RecursiveTask<Long> {
private final int start;
private final int end;
private static final int THRESHOLD = 10000;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 直接计算
long sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 分割任务
int mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
left.fork(); // 异步执行
Long rightResult = right.compute(); // 同步执行
Long leftResult = left.join(); // 等待结果
return leftResult + rightResult;
}
}
}
}
6. 最佳实践 (Best Practices)¶
6.1 ThreadLocal必须remove¶
// ✅ 正确:使用完remove
try {
threadLocal.set(value);
// 使用ThreadLocal
} finally {
threadLocal.remove(); // 必须remove
}
// ❌ 错误:忘记remove,导致内存泄漏
threadLocal.set(value);
// 使用ThreadLocal
// 忘记remove
6.2 Future设置超时¶
// ✅ 正确:设置超时
Future<String> future = executor.submit(task);
try {
String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true); // 取消任务
}
// ❌ 错误:无限等待
String result = future.get(); // 可能永远等待
6.3 使用不变对象¶
// ✅ 推荐:使用不变对象
public final class Config {
private final String key;
private final String value;
// ...
}
// ❌ 不推荐:可变对象需要同步
public class Config {
private String key;
private String value;
// 需要synchronized保护
}
7. 面试高频问题 (Interview Questions)¶
Q1: ThreadLocal的实现原理?¶
答案: 每个Thread有一个ThreadLocalMap,ThreadLocal作为key存储值,实现线程隔离。
Q2: ThreadLocal的内存泄漏问题?¶
答案: ThreadLocalMap.Entry的value是强引用,如果ThreadLocal被回收但value还在,导致内存泄漏。使用完必须remove()。
Q3: Future和CompletableFuture的区别?¶
答案: CompletableFuture支持链式调用、组合多个Future、更好的异常处理。
Q4: 什么是工作窃取模式?¶
答案: ForkJoinPool使用的模式,空闲线程从其他线程的队列中窃取任务,提高CPU利用率。
Q5: 不变模式的优势?¶
答案: 天然线程安全,不需要同步,性能好,易于理解。
📖 扩展阅读¶
返回: 07-Java并发编程
上一章: 07-07 - 原子类与CAS
下一章: 07-09 - 性能优化与最佳实践 →