跳转至

并发设计模式 (Concurrent Design Patterns)

掌握生产者-消费者模式、ThreadLocal、Future模式等并发设计模式,提升并发编程能力

目录


1. 生产者-消费者模式 (Producer-Consumer Pattern)

1.1 模式概述

生产者-消费者模式:生产者生产数据放入队列,消费者从队列中取出数据消费。

核心组件: - 生产者(Producer) - 生产数据 - 消费者(Consumer) - 消费数据 - 缓冲区(Buffer) - 使用BlockingQueue作为缓冲区

1.2 基本实现

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 生产者-消费者模式基本实现
 * Basic Producer-Consumer Pattern
 */
public class ProducerConsumerDemo {
    private BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);

    // 生产者
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    String item = "item-" + i;
                    queue.put(item); // 阻塞直到有空间
                    System.out.println("生产: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 消费者
    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    String item = queue.take(); // 阻塞直到有元素
                    System.out.println("消费: " + item);
                    processItem(item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        private void processItem(String item) {
            // 处理数据
        }
    }

    public void start() {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}

1.3 多生产者多消费者

/**
 * 多生产者多消费者模式
 * Multiple Producers and Consumers
 */
public class MultiProducerConsumer {
    private BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(100);
    private ExecutorService producerPool = Executors.newFixedThreadPool(5);
    private ExecutorService consumerPool = Executors.newFixedThreadPool(10);

    public void start() {
        // 启动多个生产者
        for (int i = 0; i < 5; i++) {
            producerPool.submit(() -> {
                while (true) {
                    Task task = generateTask();
                    taskQueue.put(task);
                }
            });
        }

        // 启动多个消费者
        for (int i = 0; i < 10; i++) {
            consumerPool.submit(() -> {
                while (true) {
                    Task task = taskQueue.take();
                    processTask(task);
                }
            });
        }
    }
}

1.4 在算力平台中的应用

/**
 * 算力平台中的生产者-消费者模式
 * Producer-Consumer Pattern in Computing Platform
 */
public class PlatformProducerConsumer {

    // 任务状态队列:生产者-消费者模式
    private BlockingQueue<TaskStatus> statusQueue = new LinkedBlockingQueue<>(1000);

    // 生产者:定时任务采集Nomad任务状态
    @Scheduled(fixedRate = 5000)
    public void produceTaskStatus() {
        List<TaskStatus> statuses = nomadClient.getTaskStatuses();
        for (TaskStatus status : statuses) {
            try {
                statusQueue.put(status); // 生产数据
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 消费者:批量更新数据库
    @PostConstruct
    public void consumeTaskStatus() {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(() -> {
            List<TaskStatus> batch = new ArrayList<>();
            while (true) {
                try {
                    // 批量消费
                    statusQueue.drainTo(batch, 100);
                    if (!batch.isEmpty()) {
                        batchUpdateDatabase(batch);
                        batch.clear();
                    } else {
                        TaskStatus status = statusQueue.take(); // 阻塞等待
                        batch.add(status);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }
}

2. ThreadLocal模式 (ThreadLocal Pattern)

2.1 核心特点

ThreadLocal(线程本地变量):为每个线程提供独立的变量副本,线程间互不干扰。

特性 说明
线程隔离 每个线程有独立的变量副本
避免同步 不需要同步,性能好
内存泄漏 需要注意内存泄漏问题
适用场景 线程上下文、用户信息传递

2.2 基本使用

import java.util.concurrent.ThreadLocal;

/**
 * ThreadLocal基本使用
 * Basic Usage of ThreadLocal
 */
public class ThreadLocalDemo {
    // 方式1:使用ThreadLocal类
    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    // 方式2:使用ThreadLocalRandom(JDK 1.7+)
    private ThreadLocalRandom random = ThreadLocalRandom.current();

    public void setValue(String value) {
        threadLocal.set(value); // 设置当前线程的值
    }

    public String getValue() {
        return threadLocal.get(); // 获取当前线程的值
    }

    public void remove() {
        threadLocal.remove(); // 移除当前线程的值(防止内存泄漏)
    }

    public static void main(String[] args) {
        ThreadLocalDemo demo = new ThreadLocalDemo();

        // 线程1
        new Thread(() -> {
            demo.setValue("线程1的值");
            System.out.println("线程1: " + demo.getValue()); // 线程1的值
        }).start();

        // 线程2
        new Thread(() -> {
            demo.setValue("线程2的值");
            System.out.println("线程2: " + demo.getValue()); // 线程2的值
        }).start();
    }
}

2.3 ThreadLocal实现原理

/**
 * ThreadLocal实现原理(简化版)
 * ThreadLocal Implementation Principle
 */
public class ThreadLocal<T> {
    // ThreadLocalMap:Thread的成员变量
    // ThreadLocalMap.Entry[] table

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t); // 获取当前线程的ThreadLocalMap
        if (map != null) {
            map.set(this, value); // 以ThreadLocal为key存储值
        } else {
            createMap(t, value);
        }
    }

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                return (T) e.value;
            }
        }
        return setInitialValue();
    }
}

关键点: - 每个Thread有一个ThreadLocalMap - ThreadLocal作为key,值作为value - 线程隔离,互不干扰

2.4 内存泄漏问题

/**
 * ThreadLocal内存泄漏问题
 * ThreadLocal Memory Leak Problem
 */
public class ThreadLocalMemoryLeak {
    private static ThreadLocal<LargeObject> threadLocal = new ThreadLocal<>();

    public void problem() {
        threadLocal.set(new LargeObject()); // 设置大对象
        // 忘记remove(),导致内存泄漏
        // ThreadLocalMap.Entry的key是弱引用,但value是强引用
    }

    public void solution() {
        try {
            threadLocal.set(new LargeObject());
            // 使用ThreadLocal
            useThreadLocal();
        } finally {
            threadLocal.remove(); // ✅ 必须remove,防止内存泄漏
        }
    }
}

内存泄漏原因: - ThreadLocalMap.Entry的key是弱引用(ThreadLocal) - 但value是强引用 - 如果ThreadLocal被回收,但value还在,导致内存泄漏

解决方案: - 使用完ThreadLocal后调用remove() - 使用线程池时,线程复用,必须remove

2.5 在算力平台中的应用

/**
 * 算力平台中的ThreadLocal应用
 * ThreadLocal in Computing Platform
 */
public class PlatformThreadLocal {

    // 用户上下文:存储当前请求的用户信息
    private static ThreadLocal<UserContext> userContext = new ThreadLocal<>();

    // 请求ID:追踪请求链路
    private static ThreadLocal<String> requestId = new ThreadLocal<>();

    // 拦截器:设置用户上下文
    @Component
    public class UserContextInterceptor implements HandlerInterceptor {
        @Override
        public boolean preHandle(HttpServletRequest request, 
                                HttpServletResponse response, 
                                Object handler) {
            // 从请求头获取用户信息
            String userId = request.getHeader("X-User-Id");
            userContext.set(new UserContext(userId));

            // 生成请求ID
            requestId.set(UUID.randomUUID().toString());
            return true;
        }

        @Override
        public void afterCompletion(HttpServletRequest request, 
                                   HttpServletResponse response, 
                                   Object handler, 
                                   Exception ex) {
            // 清理ThreadLocal,防止内存泄漏
            userContext.remove();
            requestId.remove();
        }
    }

    // 业务代码:获取当前用户
    public void businessMethod() {
        UserContext context = userContext.get(); // 获取当前线程的用户上下文
        String currentUserId = context.getUserId();

        // 记录日志:包含请求ID
        logger.info("请求ID: {}, 用户ID: {}", requestId.get(), currentUserId);
    }
}

3. Future模式 (Future Pattern)

3.1 核心特点

Future模式:异步执行任务,通过Future对象获取结果。

特性 说明
异步执行 任务异步执行,不阻塞主线程
结果获取 通过Future获取结果
超时控制 可以设置超时时间
适用场景 耗时操作、并行计算

3.2 基本使用

import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Future模式基本使用
 * Basic Usage of Future Pattern
 */
public class FutureDemo {
    private ExecutorService executor = Executors.newFixedThreadPool(5);

    public void basicUsage() throws Exception {
        // 提交任务,返回Future
        Future<String> future = executor.submit(() -> {
            Thread.sleep(2000);
            return "任务完成";
        });

        // 方式1:阻塞获取结果
        String result = future.get();

        // 方式2:超时获取结果
        String result2 = future.get(5, TimeUnit.SECONDS);

        // 方式3:检查是否完成
        if (future.isDone()) {
            String result3 = future.get();
        }

        // 方式4:取消任务
        boolean cancelled = future.cancel(true);
    }

    // 并行执行多个任务
    public void parallelExecution() throws Exception {
        List<Future<String>> futures = new ArrayList<>();

        // 提交多个任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            Future<String> future = executor.submit(() -> {
                return "任务" + taskId + "完成";
            });
            futures.add(future);
        }

        // 收集结果
        List<String> results = new ArrayList<>();
        for (Future<String> future : futures) {
            results.add(future.get());
        }
    }
}

3.3 CompletableFuture(JDK 1.8+)

import java.util.concurrent.CompletableFuture;

/**
 * CompletableFuture使用示例
 * CompletableFuture Usage Example
 */
public class CompletableFutureDemo {

    // 异步执行
    public void asyncExecution() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            return "异步任务结果";
        });

        // 链式调用
        future.thenApply(s -> s + "处理")
              .thenAccept(System.out::println)
              .thenRun(() -> System.out.println("完成"));
    }

    // 组合多个Future
    public void combineFutures() {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "结果1");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "结果2");

        // 等待两个Future都完成
        CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> {
            return s1 + " + " + s2;
        });
    }

    // 异常处理
    public void exceptionHandling() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("异常");
        });

        future.exceptionally(ex -> {
            System.out.println("处理异常: " + ex.getMessage());
            return "默认值";
        });
    }
}

3.4 在算力平台中的应用

/**
 * 算力平台中的Future模式应用
 * Future Pattern in Computing Platform
 */
public class PlatformFuture {

    // 场景:并行采集多个节点信息
    public Map<String, NodeInfo> collectNodeInfoParallel(List<String> nodeIds) {
        List<CompletableFuture<NodeInfo>> futures = nodeIds.stream()
            .map(nodeId -> CompletableFuture.supplyAsync(() -> {
                return nomadClient.getNodeInfo(nodeId);
            }, executorService))
            .collect(Collectors.toList());

        // 等待所有任务完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .join();

        // 收集结果
        Map<String, NodeInfo> result = new HashMap<>();
        for (int i = 0; i < nodeIds.size(); i++) {
            result.put(nodeIds.get(i), futures.get(i).join());
        }
        return result;
    }

    // 场景:异步处理任务状态更新
    public void updateTaskStatusAsync(String taskId, TaskStatus status) {
        CompletableFuture.runAsync(() -> {
            // 更新数据库
            taskRepository.updateStatus(taskId, status);

            // 更新ES
            esClient.updateTaskStatus(taskId, status);
        }, executorService).exceptionally(ex -> {
            logger.error("更新任务状态失败", ex);
            return null;
        });
    }
}

4. 不变模式 (Immutable Pattern)

4.1 核心特点

不变模式(Immutable Pattern):对象创建后状态不可变,天然线程安全。

实现方式: - 所有字段final - 不提供setter方法 - 类声明为final(防止继承) - 返回新对象而不是修改原对象

4.2 基本实现

/**
 * 不变模式示例
 * Immutable Pattern Example
 */
public final class ImmutablePoint {
    private final int x;
    private final int y;

    public ImmutablePoint(int x, int y) {
        this.x = x;
        this.y = y;
    }

    public int getX() {
        return x;
    }

    public int getY() {
        return y;
    }

    // 返回新对象,而不是修改原对象
    public ImmutablePoint move(int deltaX, int deltaY) {
        return new ImmutablePoint(x + deltaX, y + deltaY);
    }
}

4.3 String的不变性

/**
 * String不变性示例
 * String Immutability Example
 */
public class StringImmutability {
    public void demonstrate() {
        String str = "Hello";
        str.concat(" World"); // 返回新String,原String不变
        System.out.println(str); // 仍然是"Hello"

        String newStr = str.concat(" World"); // 需要接收新对象
        System.out.println(newStr); // "Hello World"
    }
}

5. 工作窃取模式 (Work-Stealing Pattern)

5.1 核心特点

工作窃取模式(Work-Stealing):ForkJoinPool使用的模式,空闲线程从其他线程的队列中窃取任务。

5.2 ForkJoinPool

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * ForkJoinPool工作窃取示例
 * ForkJoinPool Work-Stealing Example
 */
public class ForkJoinDemo {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();

        // 提交任务
        Long result = pool.invoke(new SumTask(1, 1000000));
        System.out.println("结果: " + result);
    }

    static class SumTask extends RecursiveTask<Long> {
        private final int start;
        private final int end;
        private static final int THRESHOLD = 10000;

        public SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            if (end - start <= THRESHOLD) {
                // 直接计算
                long sum = 0;
                for (int i = start; i <= end; i++) {
                    sum += i;
                }
                return sum;
            } else {
                // 分割任务
                int mid = (start + end) / 2;
                SumTask left = new SumTask(start, mid);
                SumTask right = new SumTask(mid + 1, end);

                left.fork(); // 异步执行
                Long rightResult = right.compute(); // 同步执行
                Long leftResult = left.join(); // 等待结果

                return leftResult + rightResult;
            }
        }
    }
}

6. 最佳实践 (Best Practices)

6.1 ThreadLocal必须remove

// ✅ 正确:使用完remove
try {
    threadLocal.set(value);
    // 使用ThreadLocal
} finally {
    threadLocal.remove(); // 必须remove
}

// ❌ 错误:忘记remove,导致内存泄漏
threadLocal.set(value);
// 使用ThreadLocal
// 忘记remove

6.2 Future设置超时

// ✅ 正确:设置超时
Future<String> future = executor.submit(task);
try {
    String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true); // 取消任务
}

// ❌ 错误:无限等待
String result = future.get(); // 可能永远等待

6.3 使用不变对象

// ✅ 推荐:使用不变对象
public final class Config {
    private final String key;
    private final String value;
    // ...
}

// ❌ 不推荐:可变对象需要同步
public class Config {
    private String key;
    private String value;
    // 需要synchronized保护
}

7. 面试高频问题 (Interview Questions)

Q1: ThreadLocal的实现原理?

答案: 每个Thread有一个ThreadLocalMap,ThreadLocal作为key存储值,实现线程隔离。

Q2: ThreadLocal的内存泄漏问题?

答案: ThreadLocalMap.Entry的value是强引用,如果ThreadLocal被回收但value还在,导致内存泄漏。使用完必须remove()。

Q3: Future和CompletableFuture的区别?

答案: CompletableFuture支持链式调用、组合多个Future、更好的异常处理。

Q4: 什么是工作窃取模式?

答案: ForkJoinPool使用的模式,空闲线程从其他线程的队列中窃取任务,提高CPU利用率。

Q5: 不变模式的优势?

答案: 天然线程安全,不需要同步,性能好,易于理解。


📖 扩展阅读


返回: 07-Java并发编程
上一章: 07-07 - 原子类与CAS
下一章: 07-09 - 性能优化与最佳实践 →