使用消息传递在线程之间传输数据


确保安全并发的一种越来越流行的方法是消息 传递,其中线程或 Actor 通过相互发送包含数据的消息进行通信。这是 Go 语言中的口号中的想法 documentation: “不要通过共享内存进行通信;相反,通过通信来共享内存。


为了实现消息发送并发,Rust 的标准库提供了通道的实现。通道是一个通用的编程概念,通过它将数据从一个线程发送到另一个线程。


您可以将编程中的通道想象成一个定向水通道,例如溪流或河流。如果你把橡皮鸭之类的东西放进河里,它会顺流而下到水道的尽头。


通道有两半:发射器和接收器。发射器的一半是将橡皮鸭放入河中的上游位置,而接收器的另一半是橡皮鸭最终进入下游的位置。代码的一部分使用要发送的数据调用发送器上的方法,另一部分检查接收端是否有到达的消息。如果发射机或接收机的一半被丢弃,则称频道关闭。


在这里,我们将创建一个程序,该程序有一个线程生成值并通过通道发送它们,另一个线程将接收值并将其打印出来。我们将使用通道在线程之间发送简单值来说明该功能。熟悉该技术后,您可以将通道用于需要相互通信的任何线程,例如聊天系统或许多线程执行计算部分并将这些部分发送到聚合结果的一个线程的系统。


首先,在示例 16-6 中,我们将创建一个 channel,但不对它做任何事情。请注意,这还不会编译,因为 Rust 无法判断我们想通过 channel 发送什么类型的值。


文件名: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}


示例 16-6:创建一个通道并将两半分配给 txrx


我们使用 mpsc::channel 函数创建一个新通道;mpsc 代表 多个生产者、单个使用者。简而言之,Rust 的标准库实现 channels 的方式意味着一个 channel 可以有多个产生值的发送端,但只有一个接收端使用这些值。想象一下多条溪流汇成一条大河:顺着任何一条溪流流下的一切都会在最后汇入一条河。我们现在从单个 producer 开始,但当我们开始此示例时,我们将添加多个 producer。


mpsc::channel 函数返回一个元组,其第一个元素是发送端 (发送器),第二个元素是接收端 (接收器)。传统上,缩写 txrx 在许多字段中分别用于发射器接收器,因此我们以这种方式命名变量以表示每一端。我们使用的 let 语句具有解构元组的模式;我们将在第 18 章讨论 let 语句中模式的使用和解构。现在,要知道以这种方式使用 let 语句是提取 mpsc::channel 返回的元组片段的便捷方法。


让我们将发送端移动到一个 spawned thread 中,让它发送一个字符串,这样 spawned thread 就会与主线程通信,如示例 16-7 所示。这就像在上游的河里放一只橡皮鸭子,或者从一个线程向另一个线程发送聊天消息。


文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}


示例 16-7:将 tx 移动到一个生成的线程并发送 “hi”


同样,我们使用 thread::spawn 创建一个新线程,然后使用 movetx 移动到闭包中,以便生成的线程拥有 tx。生成的线程需要拥有 transmitter 才能通过 Channel 发送消息。发送器有一个 send 方法,它接受我们想要发送的值。send 方法返回 Result<T, E> 类型,因此,如果接收方已被删除,并且没有位置可以发送值,则 send作将返回错误。在此示例中,我们调用 unwrap 以在出现错误时出现 panic。但是在实际应用程序中,我们会正确处理它:返回第 9 章,回顾正确处理错误的策略。


在示例 16-8 中,我们将从主线程中的接收器获取值。这就像从河尽头的水中捞出橡皮鸭或接收聊天消息。


文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}


示例 16-8:在主线程中接收值 “hi” 并打印它


接收器有两种有用的方法: recvtry_recv。我们使用 recv,即 receive 的缩写,它将阻止主线程的执行并等待值通过通道发送。发送值后,recv 将在 Result<T, E> 中返回该值。当发射器关闭时,recv 将返回一个错误,表示不会有更多值。


try_recv 方法不会阻止,而是返回 Result<T, E> immediately:一个 Ok 值,如果消息可用,则保存一条消息,以及一个 Err 值(如果这次没有任何消息)。如果此线程在等待消息时有其他工作要做,则使用 try_recv 很有用:我们可以编写一个循环,该循环每隔一段时间调用 try_recv,如果消息可用,则处理消息,否则会执行其他工作一段时间,直到再次检查。


为简单起见,我们在此示例中使用了 recv;除了等待消息之外,我们没有其他工作让主线程做,因此阻塞主线程是合适的。


当我们运行示例 16-8 中的代码时,我们将看到从主线程打印的值:


得到: 你好


完善!


渠道和所有权转移


所有权规则在消息发送中起着至关重要的作用,因为它们可以帮助您编写安全的并发代码。防止并发编程中的错误是在整个 Rust 程序中考虑所有权的优势。让我们做一个实验来展示 channel 和 ownership 如何协同工作以防止出现问题:在将 val 值发送到 channel ,我们将尝试在生成的线程中使用 val 值。尝试编译示例 16-9 中的代码,看看为什么不允许这段代码:


文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}


示例 16-9:在我们向通道发送 val 后尝试使用 val


在这里,我们尝试在通过 tx.send 将 val 发送到通道后打印 val。允许这样做将是一个坏主意:一旦该值被发送到另一个线程,该线程就可以在我们再次尝试使用该值之前修改或删除它。由于数据不一致或不存在,其他线程的修改可能会导致错误或意外结果。但是,如果我们尝试编译示例 16-9 中的代码,Rust 会给我们一个错误:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                          ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error


我们的并发错误导致了编译时错误。send 函数获取其参数的所有权,当值被移动时,接收方获得它的所有权。这可以防止我们在发送该值后再次意外使用该值;所有权系统检查一切正常。


发送多个值并看到接收方正在等待


示例 16-8 中的代码编译并运行,但它没有清楚地告诉我们两个独立的线程正在通过通道相互通信。在示例 16-10 中,我们做了一些修改,以证明示例 16-8 中的代码是并发运行的:生成的线程现在将发送多条消息,并在每条消息之间暂停一秒钟。


文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}


示例 16-10:发送多条消息并在每条消息之间暂停


这一次,生成的线程具有我们要发送到主线程的字符串向量。我们迭代它们,单独发送它们,并通过调用 Duration 值为 1 秒的 thread::sleep 函数在每个函数之间暂停。


在主线程中,我们不再显式调用 recv 函数:相反,我们将 rx 视为迭代器。对于收到的每个值,我们都会打印它。当通道关闭时,迭代将结束。


运行示例 16-10 中的代码时,您应该会看到以下输出,每行之间有 1 秒的停顿:

Got: hi
Got: from
Got: the
Got: thread


因为我们在主线程的 for 循环中没有任何暂停或延迟的代码,所以我们可以看出主线程正在等待从生成的线程接收值。


通过克隆发射机创建多个生产者


前面我们提到 mpsc多个生产者的首字母缩写词, 单一消费者。让我们使用 mpsc 并扩展示例 16-10 中的代码,以创建多个线程,这些线程都向同一个接收器发送值。我们可以通过克隆发射器来实现,如示例 16-11 所示:


文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}


示例 16-11:从多个生产者发送多条消息


这一次,在我们创建第一个生成的线程之前,我们在发射器上调用 clone。这将为我们提供一个新的发射器,我们可以传递给第一个生成的线程。我们将原始 transmitter 传递给第二个生成的线程。这为我们提供了两个线程,每个线程向一个接收器发送不同的消息。


当您运行代码时,您的输出应如下所示:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you


您可能会按其他顺序看到值,具体取决于您的系统。这是 是什么让并发既有趣又困难。如果你试验了 thread::sleep,在不同的线程中给它不同的值,每次运行都会更加不确定,每次都会产生不同的输出。


现在我们已经了解了通道的工作原理,让我们看看不同的并发方法。