跳转至

并发集合 (Concurrent Collections)

深入理解ConcurrentHashMap、BlockingQueue、CopyOnWriteArrayList等并发集合的实现原理和使用场景

目录


1. 为什么需要并发集合 (Why Concurrent Collections?)

1.1 传统集合的线程安全问题

/**
 * HashMap线程不安全示例
 * HashMap Thread Safety Problem
 */
public class HashMapUnsafe {
    public static void main(String[] args) throws InterruptedException {
        Map<String, Integer> map = new HashMap<>();

        // 多线程并发put
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    map.put("key" + j, j); // 可能丢失数据或死循环
                }
            });
            threads[i].start();
        }

        for (Thread thread : threads) {
            thread.join();
        }

        // 结果可能小于10000,甚至可能死循环
        System.out.println("Map大小: " + map.size());
    }
}

问题: 1. 数据丢失 - 并发put可能导致数据覆盖 2. 死循环 - JDK 1.7及之前版本,并发扩容可能导致死循环 3. 数据不一致 - 读取到中间状态的数据

1.2 解决方案对比

方案 实现 性能 适用场景
synchronized包装 Collections.synchronizedMap() 低并发场景
Hashtable 所有方法synchronized 已过时,不推荐
ConcurrentHashMap CAS + synchronized 高并发场景(推荐)

1.3 在算力平台中的应用

在算力平台的结算系统中,需要线程安全的Map存储用户钱包信息:

/**
 * 算力平台中的并发集合应用
 * Concurrent Collections in Computing Platform
 */
public class PlatformConcurrentCollections {

    // 用户钱包余额:使用ConcurrentHashMap
    private ConcurrentHashMap<Long, AtomicLong> walletMap = new ConcurrentHashMap<>();

    // 任务状态队列:使用BlockingQueue
    private BlockingQueue<TaskStatus> taskStatusQueue = new LinkedBlockingQueue<>();

    // 节点信息缓存:使用CopyOnWriteArrayList(读多写少)
    private CopyOnWriteArrayList<NodeInfo> nodeCache = new CopyOnWriteArrayList<>();

    // 线程安全的扣费操作
    public boolean deduct(Long userId, Long amount) {
        AtomicLong balance = walletMap.computeIfAbsent(
            userId, k -> new AtomicLong(0)
        );

        return balance.updateAndGet(current -> {
            if (current >= amount) {
                return current - amount;
            }
            throw new InsufficientBalanceException();
        }) >= 0;
    }
}

2. ConcurrentHashMap (ConcurrentHashMap)

2.1 核心特点

特性 JDK 1.7 JDK 1.8+
数据结构 Segment + HashEntry数组 Node数组 + 链表/红黑树
锁机制 分段锁(Segment) CAS + synchronized
并发度 Segment数量(默认16) 更高(接近无锁)
性能 较好 更好

2.2 JDK 1.8实现原理

数据结构

/**
 * ConcurrentHashMap核心数据结构(JDK 1.8)
 * Core Data Structure (JDK 1.8)
 */
public class ConcurrentHashMap<K,V> {
    // Node数组(类似HashMap的table)
    transient volatile Node<K,V>[] table;

    // 扩容时的临时数组
    private transient volatile Node<K,V>[] nextTable;

    // 控制标识符
    private transient volatile int sizeCtl;

    // Node节点(链表节点)
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    }

    // TreeNode(红黑树节点)
    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;
        TreeNode<K,V> left;
        TreeNode<K,V> right;
    }
}

核心方法:put()

/**
 * ConcurrentHashMap.put()方法核心逻辑
 * Core Logic of put() Method
 */
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 1. 计算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;

    // 2. 自旋插入
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;

        // 2.1 如果table未初始化,先初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        // 2.2 如果桶为空,CAS插入
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;
        }

        // 2.3 如果正在扩容,帮助扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);

        // 2.4 桶不为空,synchronized锁住链表头节点
        else {
            V oldVal = null;
            synchronized (f) {
                // 再次检查(双重检查)
                if (tabAt(tab, i) == f) {
                    // 链表插入
                    if (fh >= 0) {
                        // ... 链表插入逻辑
                    }
                    // 红黑树插入
                    else if (f instanceof TreeBin) {
                        // ... 红黑树插入逻辑
                    }
                }
            }
        }
    }

    // 3. 检查是否需要扩容
    if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
    }
    return null;
}

关键点: 1. CAS插入 - 桶为空时,使用CAS无锁插入 2. synchronized锁 - 桶不为空时,锁住链表头节点(细粒度锁) 3. 双重检查 - synchronized内部再次检查,防止并发问题

核心方法:get()

/**
 * ConcurrentHashMap.get()方法
 * get() Method
 */
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());

    // 1. table不为空且桶不为空
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {

        // 2. 检查第一个节点
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }

        // 3. hash值为负,说明是红黑树或正在扩容
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;

        // 4. 遍历链表
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

特点: get()操作**无锁**,性能高!

2.3 ConcurrentHashMap vs HashMap vs Hashtable

特性 HashMap Hashtable ConcurrentHashMap
线程安全
锁机制 synchronized(方法级) CAS + synchronized(细粒度)
性能
null键值 允许 不允许 不允许
迭代器 fail-fast fail-fast 弱一致性
适用场景 单线程 已过时 高并发(推荐)

2.4 使用示例

/**
 * ConcurrentHashMap使用示例
 * ConcurrentHashMap Usage Example
 */
public class ConcurrentHashMapDemo {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // 1. 基本操作
        map.put("key1", 1);
        map.get("key1");
        map.remove("key1");

        // 2. 线程安全的putIfAbsent
        map.putIfAbsent("key2", 2); // 如果不存在才put

        // 3. 原子操作:computeIfAbsent
        map.computeIfAbsent("key3", k -> {
            // 如果key3不存在,计算并put
            return 3;
        });

        // 4. 原子操作:computeIfPresent
        map.computeIfPresent("key3", (k, v) -> {
            // 如果key3存在,更新值
            return v + 1;
        });

        // 5. 遍历(弱一致性)
        map.forEach((k, v) -> {
            System.out.println(k + " = " + v);
        });
    }
}

3. BlockingQueue (BlockingQueue)

3.1 核心特点

**BlockingQueue(阻塞队列)**是一个支持阻塞操作的队列接口,常用于生产者-消费者模式。

核心方法:

操作 抛出异常 返回特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() - -

3.2 常用实现类

ArrayBlockingQueue(数组阻塞队列)

/**
 * ArrayBlockingQueue:有界阻塞队列
 * ArrayBlockingQueue: Bounded Blocking Queue
 */
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 生产者
queue.put("item"); // 阻塞直到有空间

// 消费者
String item = queue.take(); // 阻塞直到有元素

特点: - 有界队列(需要指定容量) - 基于数组实现 - 使用ReentrantLock实现阻塞

LinkedBlockingQueue(链表阻塞队列)

/**
 * LinkedBlockingQueue:可选有界阻塞队列
 * LinkedBlockingQueue: Optional Bounded Blocking Queue
 */
// 无界队列
BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>();

// 有界队列
BlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);

特点: - 可以是有界或无界 - 基于链表实现 - 吞吐量通常高于ArrayBlockingQueue

SynchronousQueue(同步队列)

/**
 * SynchronousQueue:同步队列(不存储元素)
 * SynchronousQueue: Synchronous Queue
 */
BlockingQueue<String> queue = new SynchronousQueue<>();

// 生产者:必须等待消费者取走
new Thread(() -> {
    try {
        queue.put("item"); // 阻塞直到有消费者
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

// 消费者:必须等待生产者放入
new Thread(() -> {
    try {
        String item = queue.take(); // 阻塞直到有生产者
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

特点: - 不存储元素 - 每个put必须等待take - 用于直接传递任务

PriorityBlockingQueue(优先级阻塞队列)

/**
 * PriorityBlockingQueue:优先级阻塞队列
 * PriorityBlockingQueue: Priority Blocking Queue
 */
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();

queue.put(5);
queue.put(1);
queue.put(3);

// 按优先级取出:1, 3, 5
while (!queue.isEmpty()) {
    System.out.println(queue.take());
}

特点: - 无界队列 - 基于堆实现 - 元素必须实现Comparable接口

3.3 生产者-消费者模式

/**
 * 生产者-消费者模式示例
 * Producer-Consumer Pattern Example
 */
public class ProducerConsumerDemo {
    private BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);

    // 生产者
    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    String item = "item-" + i;
                    queue.put(item); // 阻塞直到有空间
                    System.out.println("生产: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 消费者
    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    String item = queue.take(); // 阻塞直到有元素
                    System.out.println("消费: " + item);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void start() {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }
}

3.4 在算力平台中的应用

/**
 * 算力平台中的BlockingQueue应用
 * BlockingQueue in Computing Platform
 */
public class PlatformBlockingQueue {

    // 任务状态队列:生产者-消费者模式
    private BlockingQueue<TaskStatus> taskStatusQueue = new LinkedBlockingQueue<>(1000);

    // 生产者:定时任务采集Nomad任务状态
    @Scheduled(fixedRate = 5000)
    public void produceTaskStatus() {
        List<TaskStatus> statuses = nomadClient.getTaskStatuses();
        for (TaskStatus status : statuses) {
            try {
                taskStatusQueue.put(status); // 阻塞直到有空间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    // 消费者:批量更新数据库
    @PostConstruct
    public void consumeTaskStatus() {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(() -> {
            List<TaskStatus> batch = new ArrayList<>();
            while (true) {
                try {
                    // 批量获取(最多100个)
                    taskStatusQueue.drainTo(batch, 100);
                    if (!batch.isEmpty()) {
                        batchUpdateDatabase(batch);
                        batch.clear();
                    } else {
                        // 队列为空,等待
                        TaskStatus status = taskStatusQueue.take();
                        batch.add(status);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }
}

4. CopyOnWriteArrayList (CopyOnWriteArrayList)

4.1 核心特点

CopyOnWriteArrayList(写时复制列表):读操作无锁,写操作时复制整个数组。

特性 说明
读操作 无锁,高性能
写操作 加锁,复制整个数组
适用场景 读多写少
内存消耗 写操作时内存消耗大

4.2 实现原理

/**
 * CopyOnWriteArrayList核心实现
 * Core Implementation
 */
public class CopyOnWriteArrayList<E> {
    private transient volatile Object[] array;
    final transient ReentrantLock lock = new ReentrantLock();

    // 读操作:无锁
    public E get(int index) {
        return get(getArray(), index);
    }

    final Object[] getArray() {
        return array; // 直接返回数组引用
    }

    // 写操作:加锁 + 复制
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 复制数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            // 替换数组引用
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
}

关键点: 1. volatile数组 - 保证可见性 2. 写时复制 - 写操作时创建新数组 3. 原子替换 - 使用volatile保证数组引用的原子替换

4.3 使用示例

/**
 * CopyOnWriteArrayList使用示例
 * CopyOnWriteArrayList Usage Example
 */
public class CopyOnWriteArrayListDemo {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

        // 添加元素(写操作:复制数组)
        list.add("item1");
        list.add("item2");

        // 读取元素(读操作:无锁,高性能)
        String item = list.get(0);

        // 遍历(弱一致性:可能读到旧数据)
        for (String s : list) {
            System.out.println(s);
        }
    }
}

4.4 注意事项

⚠️ 弱一致性: 读操作可能读到旧数据

CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("A");

// 线程1:遍历
new Thread(() -> {
    for (String s : list) {
        System.out.println(s); // 可能只打印"A"
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}).start();

// 线程2:添加元素
new Thread(() -> {
    try {
        Thread.sleep(50);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    list.add("B"); // 创建新数组,但线程1仍在使用旧数组
}).start();

5. ConcurrentLinkedQueue (ConcurrentLinkedQueue)

5.1 核心特点

ConcurrentLinkedQueue(并发链表队列):基于CAS实现的无锁队列。

特性 说明
线程安全 CAS无锁实现
性能 高(无锁)
有界性 无界队列
适用场景 高并发队列操作

5.2 实现原理

/**
 * ConcurrentLinkedQueue核心实现
 * Core Implementation
 */
public class ConcurrentLinkedQueue<E> {
    // 头节点和尾节点
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;

    // 入队:CAS操作
    public boolean offer(E e) {
        Node<E> newNode = new Node<E>(e);
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // CAS设置next
                if (p.casNext(null, newNode)) {
                    // 更新tail
                    if (p != t)
                        casTail(t, newNode);
                    return true;
                }
            }
            // ... 其他逻辑
        }
    }

    // 出队:CAS操作
    public E poll() {
        // ... CAS操作逻辑
    }
}

关键点: 使用CAS保证线程安全,无锁高性能。

5.3 使用示例

/**
 * ConcurrentLinkedQueue使用示例
 * ConcurrentLinkedQueue Usage Example
 */
public class ConcurrentLinkedQueueDemo {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

        // 入队
        queue.offer("item1");
        queue.offer("item2");

        // 出队
        String item = queue.poll();

        // 检查
        String peek = queue.peek();
    }
}

6. 并发集合对比 (Concurrent Collections Comparison)

6.1 对比表

集合类 线程安全机制 性能特点 适用场景
ConcurrentHashMap CAS + synchronized 高并发性能好 高并发Map场景
BlockingQueue ReentrantLock + Condition 阻塞机制 生产者-消费者
CopyOnWriteArrayList 写时复制 读快写慢 读多写少
ConcurrentLinkedQueue CAS 无锁队列 高并发队列

6.2 选择建议

使用ConcurrentHashMap: - 需要线程安全的Map - 高并发读写场景

使用BlockingQueue: - 生产者-消费者模式 - 需要阻塞操作

使用CopyOnWriteArrayList: - 读多写少场景 - 可以容忍弱一致性

使用ConcurrentLinkedQueue: - 高并发队列操作 - 不需要阻塞操作


7. 最佳实践 (Best Practices)

7.1 选择合适的并发集合

// ✅ 高并发Map:使用ConcurrentHashMap
ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();

// ✅ 生产者-消费者:使用BlockingQueue
BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(1000);

// ✅ 读多写少:使用CopyOnWriteArrayList
CopyOnWriteArrayList<String> configList = new CopyOnWriteArrayList<>();

// ✅ 高并发队列:使用ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();

7.2 注意CopyOnWriteArrayList的内存消耗

// ⚠️ 注意:CopyOnWriteArrayList写操作会复制整个数组
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

// 如果list很大,频繁写操作会导致内存消耗大
for (int i = 0; i < 10000; i++) {
    list.add("item" + i); // 每次add都复制整个数组!
}

// ✅ 建议:批量操作
List<String> batch = new ArrayList<>();
// ... 添加元素到batch
list.addAll(batch); // 只复制一次

7.3 使用ConcurrentHashMap的原子操作

ConcurrentHashMap<String, AtomicLong> map = new ConcurrentHashMap<>();

// ✅ 使用computeIfAbsent避免重复计算
map.computeIfAbsent("key", k -> {
    // 只在key不存在时执行
    return new AtomicLong(expensiveComputation());
});

// ✅ 使用computeIfPresent原子更新
map.computeIfPresent("key", (k, v) -> {
    return new AtomicLong(v.get() + 1);
});

8. 面试高频问题 (Interview Questions)

Q1: ConcurrentHashMap的实现原理?

答案: JDK 1.8使用CAS + synchronized实现,桶为空时CAS插入,桶不为空时锁住链表头节点。get操作无锁。

Q2: ConcurrentHashMap和HashMap的区别?

答案: ConcurrentHashMap线程安全,使用CAS + synchronized;HashMap非线程安全,性能更高。

Q3: BlockingQueue的实现原理?

答案: 基于ReentrantLock和Condition实现阻塞,put操作在队列满时阻塞,take操作在队列空时阻塞。

Q4: CopyOnWriteArrayList的优缺点?

答案: - 优点:读操作无锁,性能高 - 缺点:写操作需要复制整个数组,内存消耗大,弱一致性

Q5: 如何选择合适的并发集合?

答案: 根据使用场景选择:ConcurrentHashMap用于Map,BlockingQueue用于生产者-消费者,CopyOnWriteArrayList用于读多写少。


📖 扩展阅读


返回: 07-Java并发编程
上一章: 07-05 - 线程池
下一章: 07-07 - 原子类与CAS →