将我们的单线程服务器转变为多线程服务器


现在,服务器将依次处理每个请求,这意味着在第一个连接完成处理之前,它不会处理第二个连接。如果服务器收到越来越多的请求,则此串行执行将越来越不最佳。如果服务器收到处理时间较长的请求,则后续请求将不得不等待长请求完成,即使新请求可以快速处理。我们需要解决这个问题,但首先,我们要看看实际问题。


在当前 Server 实现中模拟慢速请求


我们将了解处理缓慢的请求如何影响对当前 server 实现发出的其他请求。示例 20-10 实现了对 /sleep 的请求,并模拟了慢响应,这将导致服务器在响应前休眠 5 秒。


文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}


示例 20-10:通过休眠 5 秒来模拟慢速请求


我们现在从 if 切换到 match,因为我们有三个 case。我们需要在 request_line 的切片上显式匹配,以模式匹配字符串文本值;match 不像 Equality 方法那样执行自动引用和取消引用。


第一个分支与示例 20-9 中的 if 块相同。第二个分支将请求匹配到 /sleep。收到该请求后,服务器将休眠 5 秒钟,然后呈现成功的 HTML 页面。第三个分支与示例 20-9 中的 else 块相同。


你可以看到我们的服务器是多么原始:真正的库会以一种不那么冗长的方式处理多个请求的识别!


使用 cargo run 启动服务器。然后打开两个浏览器窗口:一个用于 http://127.0.0.1:7878/http://127.0.0.1:7878/sleep。如果您像以前一样多次输入 / URI,您将看到它快速响应。但是,如果你输入 /sleep,然后加载 /,你会看到 / 会一直等到 sleep 在加载前已经休眠了整整 5 秒。


我们可以使用多种技术来避免请求在慢速请求之后备份;我们将实现的是线程池。


使用线程池提高吞吐量


线程池是一组正在等待并准备好处理任务的生成线程。当程序收到新任务时,它会将池中的一个线程分配给该任务,该线程将处理该任务。池中的其余线程可用于处理第一个线程正在处理时传入的任何其他任务。当第一个线程处理完其任务后,它将返回到空闲线程池中,准备处理新任务。线程池允许您并发处理连接,从而提高服务器的吞吐量。


我们将池中的线程数限制为较小的数量,以保护我们免受拒绝服务 (DoS) 攻击;如果我们让我们的程序为每个传入的请求创建一个新线程,那么有人向我们的服务器发出 1000 万个请求可能会耗尽我们服务器的所有资源并停止请求的处理,从而造成严重破坏。


因此,我们将有固定数量的线程在池中等待,而不是生成无限线程。传入的请求将发送到池中进行处理。池将维护传入请求的队列。池中的每个线程都将从此队列中弹出一个请求,处理该请求,然后向队列请求另一个请求。通过这种设计,我们可以同时处理多达 N 个请求,其中 N 是线程数。如果每个线程都在响应长时间运行的请求,则后续请求仍可以在队列中备份,但我们增加了在到达该点之前可以处理的长时间运行请求的数量。


此技术只是提高 Web 服务器吞吐量的众多方法之一。您可以探索的其他选项包括 fork/join 模型单线程异步 I/O 模型多线程异步 I/O 模型。如果您对此主题感兴趣,可以阅读有关其他解决方案的更多信息并尝试实施它们;对于像 Rust 这样的低级语言,所有这些选项都是可能的。


在开始实现线程池之前,让我们先谈谈使用线程池应该是什么样子的。当您尝试设计代码时,首先编写客户端界面有助于指导您的设计。编写代码的 API,使其按照您想要的调用方式进行结构;然后在该结构中实现功能,而不是实现功能,然后设计公共 API。


与我们在第 12 章的项目中使用测试驱动开发的方式类似,我们将在这里使用编译器驱动开发。我们将编写调用所需函数的代码,然后查看编译器中的错误,以确定下一步应该更改哪些内容才能使代码正常工作。但是,在执行此作之前,我们将探索不打算用作起点的技术。


为每个请求生成一个线程


首先,让我们探索一下如果代码确实为每个连接创建了一个新线程,它可能会是什么样子。如前所述,由于可能生成无限数量的线程的问题,这不是我们的最终计划,但这是首先获得正常工作的多线程服务器的起点。然后,我们将添加线程池作为改进,比较这两种解决方案会更容易。示例 20-11 显示了对 main 的更改,以生成一个新线程来处理 for 循环中的每个流。


文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}


示例 20-11:为每个流生成一个新线程


正如你在第 16 章中学到的,thread::spawn 将创建一个新线程,然后 在新线程中运行 Closure 中的代码。如果运行此代码并加载 /sleep 在浏览器中,然后在另外两个浏览器选项卡中 / 中,您确实会看到对 / 的请求不必等待 /sleep 完成。但是,正如我们所提到的,这最终将使系统不堪重负,因为您将无限制地创建新线程。


创建有限数量的线程


我们希望线程池以类似、熟悉的方式工作,因此从线程切换到线程池不需要对使用我们的 API 的代码进行大量更改。示例 20-12 显示了 ThreadPool 的假设接口 struct 而不是 thread::spawn


文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}


示例 20-12:我们理想的 ThreadPool 接口


我们使用 ThreadPool::new 创建一个具有可配置线程数的新线程池,在本例中为 4 个线程。然后,在 for 循环中,pool.execute 具有与 thread::spawn 类似的接口,因为它需要池应该为每个流运行的闭包。我们需要实现 pool.execute,以便它获取闭包并将其提供给池中的线程来运行。这段代码还不能编译,但我们会尝试让编译器指导我们如何修复它。


使用 Compiler Driven Development 构建 ThreadPool


将示例 20-12 中的修改改成 src/main.rs,然后让我们使用 cargo check 中的编译器错误来驱动我们的开发。这是我们得到的第一个错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error


伟大!此错误告诉我们需要一个 ThreadPool 类型或模块,因此我们现在将构建一个。我们的 ThreadPool 实现将独立于我们的 Web 服务器正在执行的工作类型。因此,让我们将 hello crate 从二进制 crate 切换到库 crate 来保存我们的 ThreadPool 实现。在我们切换到库 crate 之后,我们还可以将单独的线程池库用于我们想使用线程池做的任何工作,而不仅仅是用于服务 Web 请求。


创建一个包含以下内容的 src/lib.rs,这是我们目前可以拥有的 ThreadPool 结构体的最简单定义:


文件名: src/lib.rs


pub 结构 ThreadPool;


然后编辑 main.rs 文件,通过将以下代码添加到 src/main.rs 的顶部,将 ThreadPool 从库 crate 引入范围:


文件名: src/main.rs

use hello::ThreadPool;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}


这段代码仍然不起作用,但让我们再次检查它以获取我们需要解决的下一个错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error


此错误表示接下来我们需要创建一个名为 ThreadPool 的新增功能我们还知道 new 需要有一个可以接受 4 作为参数的参数,并且应该返回一个 ThreadPool 实例。让我们实现具有这些特征的最简单的 new 函数:


文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}


我们选择 usize 作为 size 参数的类型,因为我们知道 负数线程没有任何意义。我们也知道我们将使用它 4 作为线程集合中的元素数,即 usize 类型用于,如第 3 章的 “整数类型” 部分所述。


让我们再次检查代码:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error


现在出现错误是因为我们在 ThreadPool 上没有 execute 方法。回想一下“创建有限数量的 Threads“部分,我们决定我们的线程池应该有一个类似于 thread::spawn 的接口。此外,我们将实现 execute 函数,以便它接受给定的闭包并将其提供给池中的空闲线程来运行。


我们将在 ThreadPool 上定义 execute 方法,将闭包作为参数。回想一下 “Moving Captured Values out of the Closure 和 FnTraits“部分,我们可以将闭包作为具有三种不同特征的参数:FnFnMutFnOnce 的 Once。我们需要决定在这里使用哪种 closure 类型。我们知道我们最终会做一些类似于标准库 thread::spawn 的事情 实现,这样我们就可以看看 thread::spawn 的签名是什么 has 的参数。该文档向我们展示了以下内容:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,


F type 参数是我们在这里关注的参数;T 类型参数与返回值相关,我们不关心这一点。我们可以看到 spawn 使用 FnOnce 作为 F 上绑定的 trait。这可能是 我们想要的,因为我们最终会传递我们得到的参数 execute生成。我们可以进一步确信 FnOnce 是我们想要使用的 trait,因为运行请求的线程只会执行该请求的闭包一次,这与 FnOnce 中的 Once 匹配。


F 类型参数还具有 trait 绑定 Send 和 lifetime 绑定 'static,这在我们的情况下很有用:我们需要 Send 将闭包从一个线程转移到另一个线程,而 'static 因为我们不知道线程需要多长时间来执行。让我们 ThreadPool 将采用具有以下边界的 F 类型的泛型参数:


文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}


我们仍然在 FnOnce 后面使用 (),因为这个 FnOnce 表示一个不带参数并返回单元类型 () 的闭包。就像函数定义一样,签名中可以省略返回类型,但即使我们没有参数,我们仍然需要括号。


同样,这是 execute 方法的最简单实现:它什么都不做,但我们只尝试让我们的代码编译。让我们再检查一次:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s


它编译!但请注意,如果你尝试 cargo run 并在浏览器中发出请求,你会在浏览器中看到我们在本章开头看到的错误。我们的库实际上并没有调用传递给 execute 的闭包 还!


注意:关于具有严格编译器的语言(如 Haskell 和 Rust),您可能听说过一句话是“如果代码编译,它就会工作”。但这种说法并非普遍正确。我们的项目可以编译,但它绝对不做任何事情!如果我们正在构建一个真实、完整的项目,那么现在是开始编写单元测试以检查代码是否可编译具有我们想要的行为的好时机。


验证 new 中的线程数


我们没有对 newexecute 参数执行任何作。让我们用我们想要的行为来实现这些函数的主体。首先,让我们考虑一下 new。之前,我们为 size 选择了 unsigned 类型 参数,因为线程数为负数的池没有意义。 但是,线程为零的池也没有意义,但 Zero 是完美的 有效的 usize。在返回 ThreadPool 实例之前,我们将添加代码来检查大小是否大于零,并使用 assert!宏让程序在收到零时出现 panic,如示例 20-13 所示。


文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}


示例 20-13:实现 ThreadPool::new 以 panic if 大小为零


我们还为 ThreadPool 添加了一些带有文档注释的文档。请注意,我们遵循了良好的文档实践,添加了一节来指出我们的函数在 14 章中可能会 panic 的情况。尝试运行 cargo doc --open 并单击 ThreadPool 结构体,查看为 new 生成的文档是什么样子的!


我们不必像这里那样添加 assert!宏,而是可以将 new 放入 build 中并返回一个 Result,就像我们在示例 12-9 中的 I/O 项目中对 Config::build 所做的那样。但是我们已经决定,在这种情况下,尝试创建没有任何线程的线程池应该是一个不可恢复的错误。如果你雄心勃勃,可以尝试编写一个名为 build 的函数,并使用以下签名来与函数进行比较:

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {


创建空间以存储线程


现在,我们有了一种方法来知道我们在池中存储了有效数量的线程,我们可以创建这些线程,并在返回结构之前将它们存储在 ThreadPool 结构中。但是我们如何 “存储” 线程呢?让我们再看一下 thread::spawn 签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,


spawn 函数返回一个 JoinHandle<T>,其中 T 是闭包返回的类型。让我们也尝试使用 JoinHandle,看看会发生什么。在我们的例子中,我们传递给线程池的闭包将处理连接并且不会返回任何内容,因此 T 将是单元类型 ()。


示例 20-14 中的代码可以编译,但还没有创建任何线程。我们更改了 ThreadPool 的定义,以保存 thread::JoinHandle<()> 实例,将向量初始化为容量 size 中,设置一个 for 循环,该循环将运行一些代码来创建线程,并返回一个包含线程的 ThreadPool 实例。


文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}


示例 20-14:为 ThreadPool 创建一个 vector 来保存线程


我们已将 std::thread 引入库 crate 的范围,因为我们使用 thread::JoinHandle 作为 vector 中 ThreadPool 的 ThreadPool 中。


收到有效大小后,我们的 ThreadPool 会创建一个可以保存大小项的新 vector。with_capacity 函数执行与 Vec::new 但有一个重要的区别:它在 vector 中预先分配空间。因为我们知道我们需要在 vector 中存储 size 元素,所以提前进行这种分配比使用 Vec::new 效率略高,后者在插入元素时调整自身大小。


当您再次运行 cargo check 时,它应该会成功。


负责将代码从 ThreadPool 发送到线程的 Worker 结构体


我们在示例 20-14 的 for 循环中留下了一条关于线程创建的注释。在这里,我们将看看我们实际上是如何创建线程的。标准库提供 thread::spawn 作为创建线程的一种方式,并且 thread::spawn 期望获得一些代码,线程应在线程创建后立即运行。但是,在我们的示例中,我们希望创建线程并让它们等待我们稍后发送的代码。标准库的线程实现不包括任何执行此作的方法;我们必须手动实现它。


我们将通过在 ThreadPool 和将管理此新行为的线程。我们将此数据结构称为 Worker,这是池化实现中的常用术语。Worker 选取需要运行的代码,并在 Worker 的线程中运行代码。想想在餐厅厨房工作的人:员工等到客户收到订单,然后他们负责接受这些订单并完成订单。


我们将存储 Worker 结构体的实例,而不是在线程池中存储 JoinHandle<()> 实例的向量。每个 Worker 将存储一个 JoinHandle<()> 实例。然后,我们将在 Worker 上实现一个方法,该方法将需要代码的闭包来运行,并将其发送到已经运行的线程执行。我们还将为每个 worker 提供一个 id,以便我们可以在日志记录或调试时区分池中的不同 worker。


下面是创建 ThreadPool 时将发生的新过程。我们将在拥有 Worker 后实现将闭包发送到线程的代码 按以下方式设置:


  1. 定义一个 Worker 结构,该结构包含一个 id 和一个 JoinHandle<()>

  2. 更改 ThreadPool 以保存 Worker 实例的向量。

  3. 定义一个 Worker::new 函数,该函数接受 ID 号并返回一个 保存 id 的 worker 实例和用空闭包生成的线程。

  4. ThreadPool::new 中,使用 for 循环计数器生成一个 id,创建一个具有该 id 的新 Worker,并将该 worker 存储在 vector 中。


如果你准备好迎接挑战,请先尝试自己实现这些更改,然后再查看示例 20-15 中的代码。


准备?这是示例 20-15,其中一种方法可以进行上述修改。


文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}


示例 20-15:修改 ThreadPool 以保存 Worker 实例,而不是直接持有线程


我们已将 ThreadPool 上的字段名称从 threads 更改为 worker ,因为它现在保存的是 Worker 实例,而不是 JoinHandle<()> 实例。我们使用 for 循环中的 counter 作为参数 Worker::new,我们将每个新的 Worker 存储在名为 workers 的 vector 中。


外部代码(如我们在 src/main.rs 中的服务器)不需要知道有关在 ThreadPool 中使用 Worker 结构体的实现细节,因此我们将 Worker 结构体及其函数设为私有。这 Worker::new 函数使用我们给它的 id,并存储一个 JoinHandle<()> 实例,该实例是通过使用空闭包生成新线程创建的。


注意: 如果作系统因为没有足够的系统资源而无法创建线程,thread::spawn 将 panic。这将导致我们的 整个服务器都会导致 panic,即使创建某些线程可能会 成功。为了简单起见,这种行为很好,但在 Production 中 thread pool 实现,您可能希望使用 std::thread::Builder 及其 spawn 方法,该方法返回 Result


此代码将编译并存储我们指定为 ThreadPool::new 参数的 Worker 实例数。但是我们仍然没有处理我们在 execute 中获得的 closure。让我们看看接下来如何做到这一点。


通过通道向线程发送请求


我们要解决的下一个问题是,给 thread::spawn 的闭包确实 绝对没有。目前,我们得到了我们想要在 execute 方法。但是我们需要给 thread::spawn 一个闭包,以便在创建 ThreadPool 期间创建每个 Worker 时运行。


我们希望刚刚创建的 Worker 结构体从 ThreadPool 中保存的队列中获取要运行的代码,并将该代码发送到其线程运行。


我们在第 16 章中学到的通道 — 一种在两个线程之间通信的简单方法 — 将非常适合此用例。我们将使用一个通道作为作业队列,execute 会将作业从 ThreadPool 发送到 Worker 实例,后者会将作业发送到其线程。这是计划:


  1. ThreadPool 将创建一个通道并保留 sender。

  2. 每个 Worker 都将保留接收器。

  3. 我们将创建一个新的 Job 结构体,它将保存我们想要发送到通道的闭包。

  4. execute 方法将通过发送方发送它想要执行的作业。

  5. 在其线程中,Worker 将遍历其接收器并执行它接收到的任何作业的闭包。


让我们首先在 ThreadPool::new 中创建一个通道,并将发送者保存在 ThreadPool 实例中,如示例 20-16 所示。Job 结构体目前不包含任何内容,但将是我们在通道中发送的项类型。


文件名: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}


示例 20-16:修改 ThreadPool 以存储传输 Job 实例的通道的发送者


ThreadPool::new 中,我们创建新通道,并让池保存发送方。这将成功编译。


让我们尝试在线程池创建通道时将通道的接收器传递给每个 worker。我们知道我们想在 worker 生成的线程中使用 receiver,因此我们将在 closure 中引用 receiver 参数。示例 20-17 中的代码还不能完全编译。


文件名: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}


示例 20-17:将接收器传递给 worker


我们做了一些小而直接的更改:我们将接收器传递给 Worker::new,然后在闭包中使用它。


当我们尝试检查此代码时,我们收到以下错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error


该代码尝试将 receiver 传递给多个 Worker 实例。这是行不通的,因为你会记得第 16 章:Rust 提供的通道实现是多个生产者,单个消费者。这意味着我们不能只克隆 channel 的消费端来修复此代码。我们也不想向多个使用者多次发送消息;我们希望一个包含多个 worker 的消息列表,以便每条消息都处理一次。


此外,从通道队列中移除作业涉及更改 receiver,因此线程需要一种安全的方式来共享和修改 receiver;否则,我们可能会得到 race conditions (如 Chapter 16 所述)。


回想一下第 16 章中讨论的线程安全智能指针:要在多个线程之间共享所有权并允许线程更改值,我们需要使用 Arc<Mutex<T>>。Arc 类型将允许多个 worker 拥有接收器,而 Mutex 将确保一次只有一个 worker 从接收器获取作业。示例 20-18 显示了我们需要做的更改。


文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}


示例 20-18:使用 ArcMutex 在 worker 之间共享接收器


ThreadPool::new 中,我们将接收器放在 ArcMutex 中。对于每个新工作程序,我们克隆 Arc 以增加引用计数,以便工作程序可以共享接收方的所有权。


通过这些更改,代码将编译!我们快到了!


实现 execute 方法


最后,让我们在 ThreadPool 上实现 execute 方法。我们还将更改 Job 从一个结构体到一个 trait 对象的类型别名,该对象保存 execute 接收的闭包类型。如“创建类型同义词 with Type Aliases” 第 19 章 类型别名 部分允许我们缩短 易于使用。请看示例 20-19。


文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}


示例 20-19:为 Box 创建 Job 类型别名 保存每个闭包,然后将作业发送到通道


使用我们在 execute 中获得的闭包创建一个新的 Job 实例后,我们将该 Job 发送到通道的发送端。我们在 unwrap 上调用 send 表示发送失败。例如,如果我们停止执行所有线程,这意味着接收端已停止接收新消息,则可能会发生这种情况。目前,我们无法阻止线程执行:只要池存在,我们的线程就会继续执行。我们使用 unwrap 的原因是我们知道失败情况不会发生,但编译器不知道这一点。


但我们还没有完全完成!在 worker 中,我们的 closure 被传递给 thread::spawn 仍然只引用通道的接收端。相反,我们需要 closure 永远循环,向 channel 的接收端请求一个 job,并在得到一个 job 时运行 job。让我们将示例 20-20 中所示的更改更改为 Worker::new


文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}


示例 20-20:在 worker 线程中接收和执行作业


在这里,我们首先在接收器上调用 lock 来获取互斥锁,然后我们调用 unwrap 来对任何错误造成恐慌。如果互斥锁处于中毒状态,则获取锁可能会失败,如果其他线程在 持有锁而不是释放锁。在这种情况下,调用 unwrap 让这个线程 panic 是正确的作。请随意将此 unwrap 更改为 expect,并显示对您有意义的错误消息。


如果我们在互斥锁上获得锁,则调用 recv 从通道接收 Job。最后的 unwrap 也会跳过此处的任何错误,如果包含发送方的线程已关闭,则可能会发生这种情况,类似于 send method 如果接收器关闭,则返回 Err


recv 的调用会阻塞,因此如果还没有作业,则当前线程将等待,直到作业可用。Mutex<T> 确保只有一个 Worker 线程一次正在尝试请求作业。


我们的线程池现在处于工作状态!给它一个 cargo run 并提出一些请求:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
  --> src/lib.rs:48:5
   |
48 |     id: usize,
   |     ^^^^^^^^^

warning: field is never read: `thread`
  --> src/lib.rs:49:5
   |
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: `hello` (lib) generated 3 warnings
    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.


成功!现在,我们有一个异步执行连接的线程池。 创建的线程永远不会超过四个,因此我们的系统不会获得 如果服务器收到大量请求,则为 overloaded。如果我们向 /sleep,服务器将能够通过让另一个线程运行其他请求来为其他请求提供服务。


注: 如果同时在多个浏览器窗口中打开 /sleep,它们可能会以 5 秒的间隔一次加载一个窗口。出于缓存原因,某些 Web 浏览器会按顺序执行同一请求的多个实例。此限制不是由我们的 Web 服务器引起的。


在第 18 章中了解了 while let 循环后,你可能想知道为什么我们没有编写示例 20-21 所示的 worker 线程代码。


文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}


示例 20-21:的另一种实现 Worker::new 使用 while let


此代码会编译并运行,但不会导致所需的线程行为:慢速请求仍会导致其他请求等待处理。原因有点微妙:Mutex 结构没有公共 unlock 方法,因为锁的所有权基于 MutexGuard<T> 的生命周期,在 LockResult<MutexGuard<T>> 中, method 返回。在编译时,借用检查器可以强制执行该规则 除非我们持有 锁。但是,此实现也可能导致锁被持有 如果我们不注意 互斥守卫<T>.


示例 20-20 中使用 let job = receiver.lock().unwrap().recv().unwrap(); 的代码之所以有效,是因为使用 let 时,当 let 语句结束时,等号右侧的表达式中使用的任何临时值都会立即被删除。但是,while let (以及 if letmatch) 在关联块结束之前不会删除临时值。在示例 20-21 中,在调用 job() 的持续时间内,锁保持持有状态,这意味着其他 worker 无法接收 job。