31 包罗万象,风雨同舟:并发集合(上)
你好,我是鸟窝。
接下来的两节课程中,我们会详细学习并发编程常用的集合类型,如map、vec等标准库和第三库中的类型。之所以要单拎出来它们重点讲解,是因为我们大部分的并发程序中都会或多或少的使用到它们。
它们有的是几种同步原语的组合,有的是专门为并发场景单独实现,目的只有一个:为并发程序提供线程安全的集合类型。
这几节课中提到的库主要都是第三方库,主要还是为了开拓大家的视野,了解丰富的Rust生态圈千姿百态的crate,即使你现在用不到它们,也不妨先了解一下。
线程安全的map
写多线程代码的时候,经常会遇到需要让好几个线程都能访问甚至修改同一个数据集合的场景。比如,一个网络服务器可能需要一个全局的缓存,或者一个状态管理器,让不同的请求处理线程都能读写。这时候,HashMap
这种我们平时用得飞起的数据结构,就显得有点“脆弱”了。
为啥 std::collections::HashMap
在多线程里“不安全”?
很简单,Rust 标准库里的 HashMap
设计时就没考虑并发访问。想象一下:
- 线程 A 正在往 Map 里加一个新键值对,它可能需要重新分配内存、移动元素。
- 线程 B 同时想读取 Map 里的一个值。
- 线程 C 又想删除一个元素。
如果没有任何保护措施,这几个操作搅和在一起,内存状态就会乱七八糟,这就是所谓的数据竞争(Data Race)。在 Rust 里,这是未定义行为(Undefined Behavior),轻则数据错乱,重则程序崩溃,而且调试起来极其痛苦。Rust 的编译器虽然很强,能帮你避免很多内存安全问题,但它默认阻止你在线程间直接共享可变状态(比如一个裸的 &mut HashMap
),除非你用了特定的同步工具。
那么,想在多线程环境里安全地用 Map,有啥招呢?
简单粗暴 - Mutex
+ HashMap
这是最经典、最常见的解决方案。把你的 HashMap
包在一个 Mutex
(互斥锁)里。Mutex
就像一个房间的钥匙,同一时间只有一个线程能拿到钥匙,进入房间(访问 HashMap
),其他线程想进去?对不起,需要先在门口排队等着。
为了能在线程间共享这个被 Mutex
包裹的 HashMap
,我们还需要 Arc
(原子引用计数)。Arc
允许你在多个线程之间安全地共享数据的所有权。
下面就是一个Mutex
+HashMap
的例子:
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// 创建一个被 Arc 和 Mutex 包裹的 HashMap
let shared_map = Arc::new(Mutex::new(HashMap::<String, String>::new()));
let mut handles = vec![];
for i in 0..5 {
// 克隆 Arc,增加引用计数,这样每个线程都有一个指向 Mutex<HashMap> 的指针
let map_clone = Arc::clone(&shared_map);
let handle = thread::spawn(move || {
// 获取锁,如果锁被其他线程持有,这里会阻塞
let mut map = map_clone.lock().unwrap(); // unwrap() 在这里是为了简化,实际项目中要处理可能的毒化错误
let key = format!("key_{}", i);
let value = format!("value_from_thread_{}", i);
println!("线程 {} 正在插入: {} -> {}", i, key, value);
map.insert(key, value);
// 锁会自动在 map 变量离开作用域时释放
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 在主线程中访问,同样需要获取锁
let map = shared_map.lock().unwrap();
println!("\n最终的 Map 内容:");
for (key, value) in map.iter() {
println!("{} -> {}", key, value);
}
println!("Map size: {}", map.len());
}
这样的实现简单直接,并且能够保证绝对的数据一致性,同一时间只有一个线程操作 Map。
但是缺点也是很明显的,一是锁的粒度太大,整个 Map 都被锁住了。即使两个线程想操作不同的 key,也得排队。在高并发、写操作频繁的场景下,线程会花大量时间等待锁,性能会急剧下降。而是如果你不小心在持有这个锁的同时,又去获取另一个锁,而其他线程又反过来操作,就可能造成死锁。
读多写少?试试 RwLock
+ HashMap
如果你的场景是读操作远远多于写操作,那么 Mutex
可能有点“一刀切”了。RwLock
(读写锁)或许是更好的选择。
RwLock
允许多个线程同时获取读锁(只要没有线程持有写锁),但只允许一个线程获取写锁(此时不能有任何其他读锁或写锁)。
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
fn main() {
let shared_map = Arc::new(RwLock::new(HashMap::<String, String>::new()));
// --- 启动一个写线程 ---
let map_clone_writer = Arc::clone(&shared_map);
let writer_handle = thread::spawn(move || {
for i in 0..3 {
// 获取写锁
let mut map = map_clone_writer.write().unwrap();
let key = format!("key_{}", i);
let value = format!("value_written_{}", i);
println!("写线程: 插入 {} -> {}", key, value);
map.insert(key.clone(), value);
// 显式 drop 锁,让读线程有机会运行 (或者等作用域结束)
drop(map);
thread::sleep(Duration::from_millis(10)); // 模拟写入间隔
}
});
// --- 启动多个读线程 ---
let mut reader_handles = vec![];
for i in 0..5 {
let map_clone_reader = Arc::clone(&shared_map);
let handle = thread::spawn(move || {
for _ in 0..5 {
// 获取读锁
let map = map_clone_reader.read().unwrap();
if let Some(value) = map.get("key_1") {
println!("读线程 {}: 读取到 key_1 -> {}", i, value);
} else {
// println!("读线程 {}: 未找到 key_1", i);
}
// 读锁在 map 离开作用域时释放
drop(map); // 显式释放,便于观察
thread::sleep(Duration::from_millis(5)); // 模拟读取间隔
}
});
reader_handles.push(handle);
}
// 等待所有线程结束
writer_handle.join().unwrap();
for handle in reader_handles {
handle.join().unwrap();
}
println!("\n最终 Map (主线程读取):");
let map = shared_map.read().unwrap();
for (key, value) in map.iter() {
println!("{} -> {}", key, value);
}
}
在读多写少的场景下,性能比 Mutex
好很多,因为多个读线程可以并行。
但是这个实现依然还是存在一些问题。
- 写操作依然阻塞:写操作需要独占访问,会阻塞所有其他读写操作。
- 写者饥饿:如果读操作非常频繁,写线程可能一直抢不到锁,导致“写者饥饿”。
- 仍然是全局锁:锁的粒度还是整个 Map。
接下来我们就要学习几个专门优化的线程安全的map库。第一个推荐的就是dashmap
。
dashmap
dashmap
非常注重性能,目标是尽可能地快。
dashmap
旨在提供一个易于使用的 API,该 API 类似于 std::collections::HashMap
,但为处理并发性做了一些微小调整。
dashmap
力求简单易用,并旨在成为 RwLock<HashMap<K, V>>
的直接替代方案。为达成这些目标,它的所有方法都接收 &self
参数,而非让修改型方法接收 &mut self
参数。这样我们就可以将 DashMap
放入 Arc<T>
中,在线程间共享的同时仍能对它进行修改。
dashmap
的核心思想是分片(Sharding) 或 桶(Bucketing)。它内部其实是维护了多个小的、独立的 Map(或称为分片、桶),每个小 Map 都有自己的锁(通常是 RwLock
)。当你操作某个 key 时,dashmap
会根据 key 的哈希值计算出它属于哪个分片,然后只需要锁住那个特定的分片即可。
这样一来,不同线程操作不同分片里的 key 时,就不会互相阻塞了,大大提高了并发度。
// 需要在 Cargo.toml 中添加依赖: dashmap = "5.5" (请使用最新版本)
use dashmap::DashMap; // 注意是 DashMap,不是 HashMap
use std::sync::Arc;
use std::thread;
fn main() {
// DashMap 天然支持并发,内部处理了同步,通常直接用 Arc 包裹即可共享
let shared_map = Arc::new(DashMap::<String, String>::new());
let mut handles = vec![];
for i in 0..10 { // 增加线程数,更能体现优势
let map_clone = Arc::clone(&shared_map);
let handle = thread::spawn(move || {
let key = format!("key_{}", i % 5); // 制造一些 key 的碰撞,测试并发写同一分片
let value = format!("value_from_thread_{}", i);
// DashMap 的 API 和 HashMap 很像,但不需要手动 .lock()
println!("线程 {} 正在操作 key: {}", i, key);
map_clone.insert(key.clone(), value.clone());
// 读取也类似
if let Some(entry) = map_clone.get(&key) {
// println!("线程 {} 读取到: {} -> {}", i, key, entry.value());
} else {
println!("线程 {} 未找到 {}", i, key); // 可能因为其他线程正在写
}
// DashMap 还提供了一些原子操作,比如 entry API,更灵活
map_clone.entry(format!("entry_key_{}", i)).or_insert_with(|| format!("default_val_{}", i));
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
println!("\n最终的 DashMap 内容 (部分展示):");
// 遍历 DashMap 没有 RwLock/Mutex 那么直接,但也很方便
let mut count = 0;
for entry in shared_map.iter() {
if count < 10 { // 只打印前 10 个,避免刷屏
println!("{} -> {}", entry.key(), entry.value());
count += 1;
} else {
break;
}
}
println!("DashMap size: {}", shared_map.len());
}
这三节课我们就不具体讲解库的每一个方法了,我们对这些库有一个初步印象即可。我捡重点的方法为你讲解。
除了 new
函数创建一个dashmap
实例外,你可以基于下面的组合,使用 with
相关的方法创建一个dashmap
的实例:
- capacity:map的初始容量。
- shard_amount:分片的数量,一定是2的指数。
- hasher:哈希算法。
因为这个库目的就是要和标准库的 hashmap
API保持兼容,所以读、写、遍历等操作和hashmap
一致。
同时,这个库还基于 DashMap
类型封装了 DashSet
,这是一个线程安全的集合,可以看做标准库HashSet
的线程安全版。
evmap
如果说 Mutex
是简单粗暴,RwLock
是读写侧重,dashmap
是精细化分片,那 evmap
(eventual map) 就是为了极致的读性能而生的一种有点“非主流”但非常有效的武器。它的核心理念是 “读写分离” 和 “最终一致性”。
evmap
内部维护着(至少)两份 Map 的拷贝。想象一下:
- 一份 Map(我们叫它“后台 Map”或“写 Map”):专门用来处理所有的写操作(插入、更新、删除)。写操作是互斥的,通常由一个或少数几个指定的写线程来执行。
- 另一份 Map(我们叫它“前台 Map”或“读 Map”):专门用来处理所有的读操作。关键在于,读线程访问的是这个相对静态的“读 Map”。
“魔法”发生在 refresh()
的时候。当写线程完成一批修改后,它会调用一个类似 refresh()
或 swap()
的方法。这时 evmap
会:
- 将“写 Map”中的更改(或者整个 Map)同步到“读 Map”。
- 这个过程完成后,所有的读线程就能看到最新的数据了。
最重要的特性:读操作是“几乎”无锁(Wait-Free on Read)的!
因为读线程访问的是那个相对固定的“读 Map”,它们不需要等待任何锁,也不需要担心 Map 在读取过程中被修改。这让得evmap
在读密集型场景下可以达到极高的并发读性能。
但是,天下没有免费的午餐,代价是“最终一致性”:
在写线程修改了数据、但还没调用 refresh()
之前,读线程看到的是旧的数据(上一次 refresh
时的快照)。数据最终会一致,但不是实时的。
use evmap::{ReadHandle, WriteHandle};
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个新的evmap,分别获取读句柄和写句柄
let (r_handle, mut w_handle): (ReadHandle<String, i32>, WriteHandle<String, i32>) = evmap::new();
// 在一个单独的线程中进行写操作
let write_thread = thread::spawn(move || {
// 添加一些键值对
w_handle.insert("key1".to_string(), 10);
w_handle.insert("key2".to_string(), 20);
w_handle.insert("key3".to_string(), 30);
// 必须调用refresh()使更改对读者可见
w_handle.refresh();
println!("Initial data written and refreshed");
// 等待一段时间
thread::sleep(Duration::from_millis(500));
// 更新一个值
w_handle.update("key1".to_string(), 15);
// 删除一个键
w_handle.empty("key3".to_string());
// 再次refresh使更改可见
w_handle.refresh();
println!("Updates applied and refreshed");
// 保持写句柄活着
thread::sleep(Duration::from_secs(2));
});
// 主线程作为读者
thread::sleep(Duration::from_millis(100)); // 等待写线程完成初始写入
// 获取一个读视图
let map = r_handle.clone();
// 读取并打印值
println!("First read:");
if let Some(values) = map.get("key1") {
println!("key1: {:?}", values);
}
if let Some(values) = map.get("key2") {
println!("key2: {:?}", values);
}
if let Some(values) = map.get("key3") {
println!("key3: {:?}", values);
}
// 等待更新
thread::sleep(Duration::from_secs(1));
// 重新获取读视图(会看到新的更改)
println!("\nSecond read after updates:");
if let Some(values) = map.get("key1") {
println!("key1: {:?}", values);
} else {
println!("key1: not found");
}
if let Some(value) = map.get_one("key2") {
println!("key2: {:?}", *value);
} else {
println!("key2: not found");
}
if let Some(values) = map.get("key3") {
println!("key3: {:?}", values);
} else {
println!("key3: not found");
}
// 等待写线程完成
write_thread.join().unwrap();
}
papaya
适用于读取密集型工作负载的、快速且实用的并发哈希表。
papaya
是为读取密集型工作负载设计的。因此,读取操作具有极高的吞吐量,并且随着并发量的增加并不会显著影响性能,特别适合读操作远远大于写操作的场景。在写入密集型工作负载中, papaya
尽管不是其主要用途,但仍能提供具有竞争力的性能。
papaya
的目标是在所有操作中提供可预测且一致的延迟。大多数操作是无锁的,只有在极少数且受限的情况下才会阻塞。 papaya
还具有增量调整map大小的功能。可预测的延迟是性能的重要组成部分,这在基准测试中往往不会体现出来,但在实际使用中具有重要意义。
// 使用papaya的HashMap
use papaya::HashMap;
fn main() {
// 从多个线程使用map
let map = HashMap::new();
std::thread::scope(|s| {
// 插入一些值
s.spawn(|| {
let map = map.pin();
for i in 'A'..='Z' {
map.insert(i, 1);
}
});
// 删除这些值
s.spawn(|| {
let map = map.pin();
for i in 'A'..='Z' {
map.remove(&i);
}
});
// 读取这些值
s.spawn(|| {
for (key, value) in map.pin().iter() {
println!("{key}: {value}");
}
});
});
}
线程安全的vec
Rust 的所有权和借用系统是其内存安全保证的基石,它规定了对于任何给定的数据,在同一时刻只能存在一个可变引用 (&mut T
) 或任意数量的不可变引用 (&T
)。对 Vec<T>
进行修改操作(如 push
, pop
, insert
等)需要获取其可变引用 &mut Vec<T>
。
若允许多个线程同时持有对同一个 Vec
的可变引用并尝试修改,就会发生数据竞争。例如,两个线程可能同时尝试在向量末尾扩展内存并写入新元素,这会破坏 Vec
内部状态的一致性(如长度、容量和指向数据的指针),导致内存损坏或程序崩溃。Rust 的编译器(借用检查器)会在编译时阻止明显违反这些规则的代码。即便通过 unsafe
代码绕过编译时检查,运行时的数据竞争仍然是未定义行为的根源。
虽然 Vec<T>
本身可能满足 Send
(允许所有权在线程间转移)和 Sync
(允许多个线程同时持有其不可变引用)的条件(取决于类型 T
是否满足),但这并不意味着 &mut Vec<T>
是线程安全的。并发修改的核心问题在于对可变状态的无序访问。因此,必须引入额外的同步机制。
我们同样可以使用 Mutex
或者 RwLock
包裹 vec
, 再使用 Arc
在多线程中使用。
当共享 Vec
的主要操作是读取,而写入操作相对较少时,使用 Mutex
可能会过于严格,因为它要求即使是读取操作也必须排队。在这种情况下,读写锁 std::sync::RwLock<T>
是一个更优的选择。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let shared_vec = Arc::new(RwLock::new(vec![1, 2, 3]));
// 读取线程
let vec_clone = Arc::clone(&shared_vec);
let handle1 = thread::spawn(move || match vec_clone.read() {
Ok(guard) => println!("Read access: {:?}", *guard),
Err(_) => eprintln!("RwLock read poisoned"),
});
// 写入线程
let vec_clone = Arc::clone(&shared_vec);
let handle2 = thread::spawn(move || match vec_clone.write() {
Ok(mut guard) => guard.push(4),
Err(_) => eprintln!("RwLock write poisoned"),
});
// 等待线程完成
handle1.join().unwrap();
handle2.join().unwrap();
}
总结
这节课主要介绍了线程安全的 map
和 vec
。
首先我们解释了为什么标准库的 HashMap
在多线程环境下不安全(存在数据竞争风险),并了解了两种基础的线程安全解决方案。
Arc<Mutex<HashMap>>
:简单直接,通过互斥锁保证同一时间只有一个线程能访问HashMap
,确保了数据一致性,但缺点是锁粒度大,可能在高并发写场景下成为性能瓶颈,且有死锁风险。Arc<RwLock<HashMap>>
:适用于读多写少的场景,允许多个线程同时读,但写操作仍需独占,性能优于Mutex
,但存在写者饥饿和全局锁的问题。
随后,重点介绍了几个专门为并发优化的第三方 map
库。
dashmap
:通过内部分片(Sharding/Bucketing)技术,将map
拆分为多个带独立锁的小map
,显著提高了并发性能,旨在成为RwLock<HashMap>
的高性能替代品,API 尽量与标准HashMap
兼容。evmap
:采用读写分离和最终一致性模型,维护独立的读写map
副本,通过refresh
操作同步数据。其核心优势是读操作几乎无锁(Wait-Free),极大提升读密集型场景的性能,但代价是读取的数据可能不是最新的。papaya
:同样针对读取密集型工作负载设计,提供高吞吐量、可预测的低延迟(大部分操作无锁)和增量调整大小功能。
最后我们还介绍了线程安全的 vec
,解释了标准 Vec
不适用于并发修改的原因(违反借用规则,导致数据竞争)。与 HashMap
类似,可以通过 Arc<Mutex<Vec>>
或 Arc<RwLock<Vec>>
实现线程安全,RwLock
同样适用于读多写少的场景。
思考题
请使用 Arc<Mutex<T>>
或者 Arc<RwLock<T>>
写一个线程安全的数据访问例子。
期待你的分享。如果今天的内容对你有所帮助,也期待你转发给你的同事或者朋友,大家一起学习,共同进步。我们下节课再见!