并发集合 (Concurrent Collections)¶
深入理解ConcurrentHashMap、BlockingQueue、CopyOnWriteArrayList等并发集合的实现原理和使用场景
目录¶
- 1. 为什么需要并发集合
- 2. ConcurrentHashMap
- 3. BlockingQueue
- 4. CopyOnWriteArrayList
- 5. ConcurrentLinkedQueue
- 6. 并发集合对比
- 7. 最佳实践
- 8. 面试高频问题
1. 为什么需要并发集合 (Why Concurrent Collections?)¶
1.1 传统集合的线程安全问题¶
/**
* HashMap线程不安全示例
* HashMap Thread Safety Problem
*/
public class HashMapUnsafe {
public static void main(String[] args) throws InterruptedException {
Map<String, Integer> map = new HashMap<>();
// 多线程并发put
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
map.put("key" + j, j); // 可能丢失数据或死循环
}
});
threads[i].start();
}
for (Thread thread : threads) {
thread.join();
}
// 结果可能小于10000,甚至可能死循环
System.out.println("Map大小: " + map.size());
}
}
问题: 1. 数据丢失 - 并发put可能导致数据覆盖 2. 死循环 - JDK 1.7及之前版本,并发扩容可能导致死循环 3. 数据不一致 - 读取到中间状态的数据
1.2 解决方案对比¶
| 方案 | 实现 | 性能 | 适用场景 |
|---|---|---|---|
| synchronized包装 | Collections.synchronizedMap() |
低 | 低并发场景 |
| Hashtable | 所有方法synchronized | 低 | 已过时,不推荐 |
| ConcurrentHashMap | CAS + synchronized | 高 | 高并发场景(推荐) |
1.3 在算力平台中的应用¶
在算力平台的结算系统中,需要线程安全的Map存储用户钱包信息:
/**
* 算力平台中的并发集合应用
* Concurrent Collections in Computing Platform
*/
public class PlatformConcurrentCollections {
// 用户钱包余额:使用ConcurrentHashMap
private ConcurrentHashMap<Long, AtomicLong> walletMap = new ConcurrentHashMap<>();
// 任务状态队列:使用BlockingQueue
private BlockingQueue<TaskStatus> taskStatusQueue = new LinkedBlockingQueue<>();
// 节点信息缓存:使用CopyOnWriteArrayList(读多写少)
private CopyOnWriteArrayList<NodeInfo> nodeCache = new CopyOnWriteArrayList<>();
// 线程安全的扣费操作
public boolean deduct(Long userId, Long amount) {
AtomicLong balance = walletMap.computeIfAbsent(
userId, k -> new AtomicLong(0)
);
return balance.updateAndGet(current -> {
if (current >= amount) {
return current - amount;
}
throw new InsufficientBalanceException();
}) >= 0;
}
}
2. ConcurrentHashMap (ConcurrentHashMap)¶
2.1 核心特点¶
| 特性 | JDK 1.7 | JDK 1.8+ |
|---|---|---|
| 数据结构 | Segment + HashEntry数组 | Node数组 + 链表/红黑树 |
| 锁机制 | 分段锁(Segment) | CAS + synchronized |
| 并发度 | Segment数量(默认16) | 更高(接近无锁) |
| 性能 | 较好 | 更好 |
2.2 JDK 1.8实现原理¶
数据结构¶
/**
* ConcurrentHashMap核心数据结构(JDK 1.8)
* Core Data Structure (JDK 1.8)
*/
public class ConcurrentHashMap<K,V> {
// Node数组(类似HashMap的table)
transient volatile Node<K,V>[] table;
// 扩容时的临时数组
private transient volatile Node<K,V>[] nextTable;
// 控制标识符
private transient volatile int sizeCtl;
// Node节点(链表节点)
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
// TreeNode(红黑树节点)
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
}
}
核心方法:put()¶
/**
* ConcurrentHashMap.put()方法核心逻辑
* Core Logic of put() Method
*/
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 1. 计算hash值
int hash = spread(key.hashCode());
int binCount = 0;
// 2. 自旋插入
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 2.1 如果table未初始化,先初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 2.2 如果桶为空,CAS插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
// 2.3 如果正在扩容,帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 2.4 桶不为空,synchronized锁住链表头节点
else {
V oldVal = null;
synchronized (f) {
// 再次检查(双重检查)
if (tabAt(tab, i) == f) {
// 链表插入
if (fh >= 0) {
// ... 链表插入逻辑
}
// 红黑树插入
else if (f instanceof TreeBin) {
// ... 红黑树插入逻辑
}
}
}
}
}
// 3. 检查是否需要扩容
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
}
return null;
}
关键点: 1. CAS插入 - 桶为空时,使用CAS无锁插入 2. synchronized锁 - 桶不为空时,锁住链表头节点(细粒度锁) 3. 双重检查 - synchronized内部再次检查,防止并发问题
核心方法:get()¶
/**
* ConcurrentHashMap.get()方法
* get() Method
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// 1. table不为空且桶不为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 2. 检查第一个节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 3. hash值为负,说明是红黑树或正在扩容
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 4. 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
特点: get()操作**无锁**,性能高!
2.3 ConcurrentHashMap vs HashMap vs Hashtable¶
| 特性 | HashMap | Hashtable | ConcurrentHashMap |
|---|---|---|---|
| 线程安全 | ❌ | ✅ | ✅ |
| 锁机制 | 无 | synchronized(方法级) | CAS + synchronized(细粒度) |
| 性能 | 高 | 低 | 高 |
| null键值 | 允许 | 不允许 | 不允许 |
| 迭代器 | fail-fast | fail-fast | 弱一致性 |
| 适用场景 | 单线程 | 已过时 | 高并发(推荐) |
2.4 使用示例¶
/**
* ConcurrentHashMap使用示例
* ConcurrentHashMap Usage Example
*/
public class ConcurrentHashMapDemo {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 1. 基本操作
map.put("key1", 1);
map.get("key1");
map.remove("key1");
// 2. 线程安全的putIfAbsent
map.putIfAbsent("key2", 2); // 如果不存在才put
// 3. 原子操作:computeIfAbsent
map.computeIfAbsent("key3", k -> {
// 如果key3不存在,计算并put
return 3;
});
// 4. 原子操作:computeIfPresent
map.computeIfPresent("key3", (k, v) -> {
// 如果key3存在,更新值
return v + 1;
});
// 5. 遍历(弱一致性)
map.forEach((k, v) -> {
System.out.println(k + " = " + v);
});
}
}
3. BlockingQueue (BlockingQueue)¶
3.1 核心特点¶
**BlockingQueue(阻塞队列)**是一个支持阻塞操作的队列接口,常用于生产者-消费者模式。
核心方法:
| 操作 | 抛出异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 检查 | element() | peek() | - | - |
3.2 常用实现类¶
ArrayBlockingQueue(数组阻塞队列)¶
/**
* ArrayBlockingQueue:有界阻塞队列
* ArrayBlockingQueue: Bounded Blocking Queue
*/
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 生产者
queue.put("item"); // 阻塞直到有空间
// 消费者
String item = queue.take(); // 阻塞直到有元素
特点: - 有界队列(需要指定容量) - 基于数组实现 - 使用ReentrantLock实现阻塞
LinkedBlockingQueue(链表阻塞队列)¶
/**
* LinkedBlockingQueue:可选有界阻塞队列
* LinkedBlockingQueue: Optional Bounded Blocking Queue
*/
// 无界队列
BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>();
// 有界队列
BlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);
特点: - 可以是有界或无界 - 基于链表实现 - 吞吐量通常高于ArrayBlockingQueue
SynchronousQueue(同步队列)¶
/**
* SynchronousQueue:同步队列(不存储元素)
* SynchronousQueue: Synchronous Queue
*/
BlockingQueue<String> queue = new SynchronousQueue<>();
// 生产者:必须等待消费者取走
new Thread(() -> {
try {
queue.put("item"); // 阻塞直到有消费者
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 消费者:必须等待生产者放入
new Thread(() -> {
try {
String item = queue.take(); // 阻塞直到有生产者
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
特点: - 不存储元素 - 每个put必须等待take - 用于直接传递任务
PriorityBlockingQueue(优先级阻塞队列)¶
/**
* PriorityBlockingQueue:优先级阻塞队列
* PriorityBlockingQueue: Priority Blocking Queue
*/
BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.put(5);
queue.put(1);
queue.put(3);
// 按优先级取出:1, 3, 5
while (!queue.isEmpty()) {
System.out.println(queue.take());
}
特点: - 无界队列 - 基于堆实现 - 元素必须实现Comparable接口
3.3 生产者-消费者模式¶
/**
* 生产者-消费者模式示例
* Producer-Consumer Pattern Example
*/
public class ProducerConsumerDemo {
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// 生产者
class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
String item = "item-" + i;
queue.put(item); // 阻塞直到有空间
System.out.println("生产: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
String item = queue.take(); // 阻塞直到有元素
System.out.println("消费: " + item);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void start() {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}
3.4 在算力平台中的应用¶
/**
* 算力平台中的BlockingQueue应用
* BlockingQueue in Computing Platform
*/
public class PlatformBlockingQueue {
// 任务状态队列:生产者-消费者模式
private BlockingQueue<TaskStatus> taskStatusQueue = new LinkedBlockingQueue<>(1000);
// 生产者:定时任务采集Nomad任务状态
@Scheduled(fixedRate = 5000)
public void produceTaskStatus() {
List<TaskStatus> statuses = nomadClient.getTaskStatuses();
for (TaskStatus status : statuses) {
try {
taskStatusQueue.put(status); // 阻塞直到有空间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者:批量更新数据库
@PostConstruct
public void consumeTaskStatus() {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
List<TaskStatus> batch = new ArrayList<>();
while (true) {
try {
// 批量获取(最多100个)
taskStatusQueue.drainTo(batch, 100);
if (!batch.isEmpty()) {
batchUpdateDatabase(batch);
batch.clear();
} else {
// 队列为空,等待
TaskStatus status = taskStatusQueue.take();
batch.add(status);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
4. CopyOnWriteArrayList (CopyOnWriteArrayList)¶
4.1 核心特点¶
CopyOnWriteArrayList(写时复制列表):读操作无锁,写操作时复制整个数组。
| 特性 | 说明 |
|---|---|
| 读操作 | 无锁,高性能 |
| 写操作 | 加锁,复制整个数组 |
| 适用场景 | 读多写少 |
| 内存消耗 | 写操作时内存消耗大 |
4.2 实现原理¶
/**
* CopyOnWriteArrayList核心实现
* Core Implementation
*/
public class CopyOnWriteArrayList<E> {
private transient volatile Object[] array;
final transient ReentrantLock lock = new ReentrantLock();
// 读操作:无锁
public E get(int index) {
return get(getArray(), index);
}
final Object[] getArray() {
return array; // 直接返回数组引用
}
// 写操作:加锁 + 复制
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 复制数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
// 替换数组引用
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
}
关键点: 1. volatile数组 - 保证可见性 2. 写时复制 - 写操作时创建新数组 3. 原子替换 - 使用volatile保证数组引用的原子替换
4.3 使用示例¶
/**
* CopyOnWriteArrayList使用示例
* CopyOnWriteArrayList Usage Example
*/
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 添加元素(写操作:复制数组)
list.add("item1");
list.add("item2");
// 读取元素(读操作:无锁,高性能)
String item = list.get(0);
// 遍历(弱一致性:可能读到旧数据)
for (String s : list) {
System.out.println(s);
}
}
}
4.4 注意事项¶
⚠️ 弱一致性: 读操作可能读到旧数据
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("A");
// 线程1:遍历
new Thread(() -> {
for (String s : list) {
System.out.println(s); // 可能只打印"A"
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 线程2:添加元素
new Thread(() -> {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
list.add("B"); // 创建新数组,但线程1仍在使用旧数组
}).start();
5. ConcurrentLinkedQueue (ConcurrentLinkedQueue)¶
5.1 核心特点¶
ConcurrentLinkedQueue(并发链表队列):基于CAS实现的无锁队列。
| 特性 | 说明 |
|---|---|
| 线程安全 | CAS无锁实现 |
| 性能 | 高(无锁) |
| 有界性 | 无界队列 |
| 适用场景 | 高并发队列操作 |
5.2 实现原理¶
/**
* ConcurrentLinkedQueue核心实现
* Core Implementation
*/
public class ConcurrentLinkedQueue<E> {
// 头节点和尾节点
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
// 入队:CAS操作
public boolean offer(E e) {
Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// CAS设置next
if (p.casNext(null, newNode)) {
// 更新tail
if (p != t)
casTail(t, newNode);
return true;
}
}
// ... 其他逻辑
}
}
// 出队:CAS操作
public E poll() {
// ... CAS操作逻辑
}
}
关键点: 使用CAS保证线程安全,无锁高性能。
5.3 使用示例¶
/**
* ConcurrentLinkedQueue使用示例
* ConcurrentLinkedQueue Usage Example
*/
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 入队
queue.offer("item1");
queue.offer("item2");
// 出队
String item = queue.poll();
// 检查
String peek = queue.peek();
}
}
6. 并发集合对比 (Concurrent Collections Comparison)¶
6.1 对比表¶
| 集合类 | 线程安全机制 | 性能特点 | 适用场景 |
|---|---|---|---|
| ConcurrentHashMap | CAS + synchronized | 高并发性能好 | 高并发Map场景 |
| BlockingQueue | ReentrantLock + Condition | 阻塞机制 | 生产者-消费者 |
| CopyOnWriteArrayList | 写时复制 | 读快写慢 | 读多写少 |
| ConcurrentLinkedQueue | CAS | 无锁队列 | 高并发队列 |
6.2 选择建议¶
使用ConcurrentHashMap: - 需要线程安全的Map - 高并发读写场景
使用BlockingQueue: - 生产者-消费者模式 - 需要阻塞操作
使用CopyOnWriteArrayList: - 读多写少场景 - 可以容忍弱一致性
使用ConcurrentLinkedQueue: - 高并发队列操作 - 不需要阻塞操作
7. 最佳实践 (Best Practices)¶
7.1 选择合适的并发集合¶
// ✅ 高并发Map:使用ConcurrentHashMap
ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();
// ✅ 生产者-消费者:使用BlockingQueue
BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(1000);
// ✅ 读多写少:使用CopyOnWriteArrayList
CopyOnWriteArrayList<String> configList = new CopyOnWriteArrayList<>();
// ✅ 高并发队列:使用ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
7.2 注意CopyOnWriteArrayList的内存消耗¶
// ⚠️ 注意:CopyOnWriteArrayList写操作会复制整个数组
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 如果list很大,频繁写操作会导致内存消耗大
for (int i = 0; i < 10000; i++) {
list.add("item" + i); // 每次add都复制整个数组!
}
// ✅ 建议:批量操作
List<String> batch = new ArrayList<>();
// ... 添加元素到batch
list.addAll(batch); // 只复制一次
7.3 使用ConcurrentHashMap的原子操作¶
ConcurrentHashMap<String, AtomicLong> map = new ConcurrentHashMap<>();
// ✅ 使用computeIfAbsent避免重复计算
map.computeIfAbsent("key", k -> {
// 只在key不存在时执行
return new AtomicLong(expensiveComputation());
});
// ✅ 使用computeIfPresent原子更新
map.computeIfPresent("key", (k, v) -> {
return new AtomicLong(v.get() + 1);
});
8. 面试高频问题 (Interview Questions)¶
Q1: ConcurrentHashMap的实现原理?¶
答案: JDK 1.8使用CAS + synchronized实现,桶为空时CAS插入,桶不为空时锁住链表头节点。get操作无锁。
Q2: ConcurrentHashMap和HashMap的区别?¶
答案: ConcurrentHashMap线程安全,使用CAS + synchronized;HashMap非线程安全,性能更高。
Q3: BlockingQueue的实现原理?¶
答案: 基于ReentrantLock和Condition实现阻塞,put操作在队列满时阻塞,take操作在队列空时阻塞。
Q4: CopyOnWriteArrayList的优缺点?¶
答案: - 优点:读操作无锁,性能高 - 缺点:写操作需要复制整个数组,内存消耗大,弱一致性
Q5: 如何选择合适的并发集合?¶
答案: 根据使用场景选择:ConcurrentHashMap用于Map,BlockingQueue用于生产者-消费者,CopyOnWriteArrayList用于读多写少。
📖 扩展阅读¶
返回: 07-Java并发编程
上一章: 07-05 - 线程池
下一章: 07-07 - 原子类与CAS →