共享状态并发
消息传递是处理并发的一种很好的方法,但它并不是唯一的方法。另一种方法是让多个线程访问相同的共享数据。再次考虑 Go 语言文档中的这一部分口号:“不要通过共享内存进行通信”。
通过共享内存进行通信会是什么样子?此外,为什么消息传递爱好者会警告不要使用内存共享呢?
在某种程度上,任何编程语言中的通道都类似于单一所有权,因为一旦你把一个值转移到一个通道上,你就不应该再使用这个值。共享内存并发类似于多个所有权:多个线程可以同时访问同一个内存位置。正如您在第 15 章中看到的那样,智能指针使多个所有权成为可能,而多个所有权可能会增加复杂性,因为需要管理这些不同的所有者。Rust 的类型系统和所有权规则极大地帮助了这种管理的正确性。例如,让我们看看互斥锁,这是共享内存更常见的并发基元之一。
使用互斥锁允许一次从一个线程访问数据
Mutex 是互斥的缩写,例如,Mutex 在任何给定时间只允许一个线程访问某些数据。要访问互斥锁中的数据,线程必须首先通过请求获取互斥锁来表示它想要访问。锁是一种数据结构,它是互斥锁的一部分,用于跟踪当前谁对数据具有独占访问权限。因此,互斥锁被描述为通过锁定系统保护它保存的数据。
互斥锁以难以使用而闻名,因为您必须记住两个规则:
在使用数据之前,必须尝试获取锁。
当您完成互斥锁保护的数据后,您必须解锁数据,以便其他线程可以获取锁。
对于互斥锁的真实比喻,请想象在只有一个麦克风的会议上进行小组讨论。在答疑者可以发言之前,他们必须询问或表示他们想要使用麦克风。当他们拿到麦克风时,他们可以随心所欲地说话,然后将麦克风交给下一位请求发言的嘉宾。如果答疑者在用完麦克风后忘记交出麦克风,则其他人将无法发言。如果共享麦克风的管理出错,面板将无法按计划工作!
正确管理互斥锁可能非常棘手,这就是为什么这么多人对频道充满热情的原因。但是,由于 Rust 的类型系统和所有权规则,您不会错误地进行锁定和解锁。
Mutex的 API<T>
作为如何使用互斥锁的示例,让我们从在单线程上下文中使用互斥锁开始,如示例 16-12 所示:
文件名: src/main.rs
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 6; } println!("m = {m:?}"); }
示例 16-12:为简单起见,在单线程上下文中探索 Mutex<T>
的 API
与许多类型一样,我们使用关联的函数 new
创建一个 Mutex<T>
。要访问互斥锁内的数据,我们使用 lock
方法来获取锁。此调用将阻止当前线程,因此在轮到我们获得锁之前,它无法执行任何工作。
如果另一个持有该锁的线程出现紧急情况,则对 lock
的调用将失败。在
那样的话,没有人能够得到锁,所以我们选择了
unwrap
并让这个线程 panic 如果我们处于那种情况。
获取锁后,我们可以将返回值(在本例中名为 num
)视为对内部数据的可变引用。类型系统确保我们在使用 m
中的值之前获取一个锁。m
的类型是
Mutex<i32>
,而不是 i32
,因此我们必须调用 lock
才能使用 i32
价值。我们不能忘记;类型系统不允许我们访问内部的 i32
否则。
正如您可能怀疑的那样,Mutex<T>
是一个智能指针。更准确地说,对 lock
的调用返回一个名为 MutexGuard
的智能指针,该指针包装在
LockResult
,我们使用对 unwrap
的调用来处理。MutexGuard
智能指针实现 Deref
以指向我们的内部数据;智能指针还有一个 Drop
实现,当
MutexGuard
超出范围,这发生在内部范围的末尾。因此,我们不会冒着忘记释放锁并阻止互斥锁被其他线程使用的风险,因为锁释放是自动发生的。
删除锁后,我们可以打印互斥锁值,并看到我们能够将内部 i32
更改为 6。
在多个线程之间共享互斥锁<T>
现在,让我们尝试使用 Mutex<T>
在多个线程之间共享一个值。
我们将启动 10 个线程,并让每个线程将 counter 值增加 1,因此
计数器从 0 到 10。示例 16-13 中的下一个例子将具有
编译器错误,我们将使用该错误来了解有关使用
Mutex<T>
以及 Rust 如何帮助我们正确使用它。
文件名: src/main.rs
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
示例 16-13:10 个线程每个线程增加一个由 Mutex<T>
保护的计数器
我们创建一个 counter
变量来将 i32
保存在 Mutex<T>
中,就像我们在示例 16-12 中所做的那样。接下来,我们通过迭代一系列数字来创建 10 个线程。我们使用 thread::spawn
并给所有线程相同的闭包:一个将计数器移动到线程中的闭包,通过调用 lock
方法获取 Mutex<T>
上的锁,然后将 1 添加到互斥锁中的值。当一个线程完成运行其闭包时, num
将超出范围并释放锁,以便另一个线程可以获取它。
在主线程中,我们收集所有联接句柄。然后,就像我们在示例 16-2 中所做的那样,我们在每个 handle 上调用 join
以确保所有线程都完成。此时,主线程将获取锁并打印此程序的结果。
我们暗示此示例不会编译。现在让我们找出原因!
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
--> src/main.rs:21:29
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8 | for _ in 0..10 {
| -------------- inside of this loop
9 | let handle = thread::spawn(move || {
| ------- value moved into closure here, in previous iteration of loop
...
21 | println!("Result: {}", *counter.lock().unwrap());
| ^^^^^^^ value borrowed here after move
|
help: consider moving the expression out of the loop so it is only moved once
|
8 ~ let mut value = counter.lock();
9 ~ for _ in 0..10 {
10 | let handle = thread::spawn(move || {
11 ~ let mut num = value.unwrap();
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error
错误消息指出 counter
值在循环的上一次迭代中被移动。Rust 告诉我们,我们不能将 counter
的所有权移动到多个线程中。让我们用我们在 Chapter 15 中讨论的 multiple-ownership 方法修复编译器错误。
具有多个线程的多个所有权
在第 15 章中,我们使用智能指针给出了一个 multiple owners 的值
Rc<T>
创建引用计数值。让我们在这里做同样的事情,看看会发生什么。在示例 16-14 中,我们将 Mutex<T>
包装在 Rc<T>
中,并在将所有权移动到线程之前克隆 Rc<T>
。
文件名: src/main.rs
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
示例 16-14:尝试使用 Rc<T>
允许多个线程拥有 Mutex<T>
再一次,我们编译并获取...不同的错误!编译器教会了我们很多东西。
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
--> src/main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ------------- ^------
| | |
| ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
| | |
| | required by a bound introduced by this call
12 | | let mut num = counter.lock().unwrap();
13 | |
14 | | *num += 1;
15 | | });
| |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
|
= help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`, which is required by `{closure@src/main.rs:11:36: 11:43}: Send`
note: required because it's used within this closure
--> src/main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ^^^^^^^
note: required by a bound in `spawn`
--> /rustc/eeb90cda1969383f56a2637cbd3037bdf598841c/library/std/src/thread/mod.rs:688:1
For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error
哇,那个错误信息太冗长了!以下是需要关注的重要部分:
`Rc<Mutex<i32>>` cannot be sent between threads safely
。编译器还告诉我们原因: the trait `Send` is not implemented for `Rc<Mutex<i32>>`
。我们将在下一节中讨论 Send
:它是确保我们与线程一起使用的类型用于并发情况的 trait 之一。
遗憾的是,跨线程共享 Rc<T>
并不安全。当 Rc<T>
管理引用计数,它会增加每次对 clone
的调用的计数,并在删除每个克隆时从计数中减去计数。但它不使用任何并发基元来确保对 count 的更改不会被另一个线程中断。这可能会导致错误的计数 — 细微的错误反过来又可能导致内存泄漏或在完成之前删除值。我们需要的是一个与 Rc<T>
完全相同的类型,但以线程安全的方式更改引用计数。
使用 Arc<T>
进行原子引用计数
幸运的是,Arc<T>
是类似于 Rc<T>
的类型,可以在并发情况下安全使用。a 代表 atomic,意味着它在 atom
引用计数类型。原子是另一种并发原语,我们不会在这里详细介绍:有关更多详细信息,请参阅 std::sync::atomic
的标准库文档。此时,你只需要知道原子的工作方式与基元类型类似,但可以安全地跨线程共享。
然后,您可能想知道为什么所有基元类型都不是原子的,为什么默认情况下没有实现标准库类型以使用 Arc<T>
。原因是线程安全会带来性能损失,您只想在真正需要时支付。如果您只是对单个线程中的值执行作,则如果代码不必强制执行原子提供的保证,则代码可以运行得更快。
让我们回到我们的示例:Arc<T>
和 Rc<T>
具有相同的 API,因此我们通过将 use
行、调用更改为 new
和调用
clone
的 clone 中。示例 16-15 中的代码最终将编译并运行:
文件名: src/main.rs
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
示例 16-15:使用 Arc<T>
包装 Mutex<T>
能够在多个线程之间共享所有权
此代码将打印以下内容:
结果 : 10
我们成功了!我们从 0 数到 10,这看起来可能不是很令人印象深刻,但它确实教会了我们很多关于 Mutex<T>
和线程安全的知识。您还可以使用此程序的结构来执行更复杂的作,而不仅仅是增加计数器。使用此策略,您可以将计算划分为独立的部分,将这些部分拆分为多个线程,然后使用 Mutex<T>
让每个线程使用其部分更新最终结果。
请注意,如果你正在执行简单的数值运算,则有一些类型比 Mutex<T>
类型更简单,这些类型由
标准库。这些类型提供对原始类型的安全、并发、原子访问。在此示例中,我们选择将
Mutex<T>
与基元类型一起使用,以便我们可以专注于 Mutex<T>
的工作原理。
RefCell<T>
/Rc<T>
和 Mutex<T>
/Arc<T>
之间的相似之处
你可能已经注意到 counter
是不可变的,但我们可以获得对其中值的可变引用;这意味着 Mutex<T>
提供内部可变性,就像 Cell
系列一样。就像我们在第 15 章中使用 RefCell<T>
来允许我们改变 Rc<T>
中的内容一样,我们使用 Mutex<T>
更改 Arc<T>
内的内容。
另一个需要注意的细节是,当你使用 Mutex<T>
时,Rust 无法保护你免受各种逻辑错误的影响。回想一下第 15 章,使用 Rc<T>
会带来创建参考循环的风险,其中两个 Rc<T>
值相互引用,从而导致内存泄漏。同样,Mutex<T>
也存在造成死锁的风险。当作需要锁定两个资源,并且两个线程各自获取了其中一个锁,导致它们永远等待对方时,就会发生这种情况。如果您对死锁感兴趣,请尝试创建一个具有死锁的 Rust 程序;然后研究任何语言中互斥锁的死锁缓解策略,并尝试在 Rust 中实现它们。Mutex<T>
和 MutexGuard
的标准库 API 文档提供了有用的信息。
在本章中,我们将讨论 Send
和 Sync
特征,以及如何将它们与自定义类型一起使用。