PingCap的Rust训练课程5:异步编程
前言
任务:通过自定义协议,使用异步网络创建一个多线程、持久化键/值存储的服务端和客户端。
目标:
- 了解在Rust中编写future时使用基本模式
- 了解future的错误处理
- 学会调试类型系统
- 使用Tokio运行时执行异步网络
- 使用boxed future处理棘手的类型系统问题
- 使用
impl Trait
创建匿名Future
类型
关键词:异步、future、tokio、impl Trait
。
介绍
注意:本文仅仅是本项目的提纲,并没有写完。如果您已经读到这里了,请电子邮件至brian@pingcap.com以提醒我,我将尽快写写完本文。
在本项目中,您将创建一个使用自定义协议通信的简单键/值服务端和客户端。该服务端将基于Tokio运行时构建的异步网络。键/值引擎的日志文件读写部分依然使用同步模式,在基础线程池上进行调度工作,并提供一个异步接口。在此过程中,您将尝试使用多种方法多种定义和使用future类型。
因为学习使用Rust中的futures编程有较大的挑战性,而且相关文档目前仍然较少,因此该项目的范围相对有限,相关的解释也比之前的项目更加直接。
请务必阅读有关此项目的背景资料。而如果您在本项目中感觉力不从心,那就让自己放松,休息一下,然后再试试。对于每个人来说,在Rust使用异步编程都是比较困难的。
项目需求规格
cargo项目kvs
建立了一个名为kvs-client
的命令行键值存储客户端,和一个名为kvs-server
的键值存储服务端,二者又都调用了一个名为kvs
的库。客户端通过一个自定义协议与服务端通信。
CLI参数与上一个项目中的相同。引擎实现大致相同,即通过线程池分发同步的文件I/O作业。
本项目的不同之处在于,所有的网络操作都是异步执行的。
为了实现异步操作,KvsClient
将提供基于future的API,而KvsEngine
trait也将提供基于future的API,尽管它是通过线程池使用同步(阻塞)I/O实现的。
您的KvsServer
将基于tokio运行时,tokio将负责把异步作业分发给自己的多个线程(tokio自带的线程池)。这意味着您的架构中实际包含了两层线程池,第一层用来异步的处理网络请求,每个线程占用一个核心;第二层用来同步的处理文件I/O,使用充足的线程以保持负责处理网络的线程持续忙碌。
这种架构带来的变化,就是作业将从多个线程中生成到您的线程池,您的ThreadPool
trait及其实现将变为实现了Clone + Send + 'sync
的共享类型,就像KvsEngine
那样。
因为您将实验多种不同的future返回方式,这里不再一一列出定义,而是在需要时定义。
更具体地说,您将使用以下所有函数签名:
Client::get(&mut self, key: String) -> Box<Future<Item = Option<String>, Error = Error>
Client::get(&mut self, key: String) -> future::SomeExplicitCombinator<...>
Client::get(&mut self, key: String) -> impl Future<Item = Option<String>, Error = Error>
Client::get(&mut self, key: String) -> ClientGetFuture
项目设置
继续上一个项目,删除之前的测试目录,并复制本项目的测试目录。这个项目应该包含一个名为kvs
的库,以及两个可执行文件,kvs-server
和kvs-client
。
Cargo.toml
中需要以下dev-dependencies:
1 | [dev-dependencies] |
与之前的项目不同,不必费心编写足够的类型定义来编译测试套件。因为这样做将会一次向前跳过多个步骤。文本将明确指出何时使用测试套件。
知识背景:深入思考Rust中的future
- 为什么使用future?网络 vs 文件/io,阻塞 vs 非阻塞,同步 vs 异步
- 从用户角度看future(不是以poll为核心的实现)
- 关于执行器和运行时的技术细节,不需要思考过多
- 考虑函数调用链及其如何转换为future类型
- 如何调试Rust类型
Result
vsFuture
vsFutureResult
- future的错误处理
- 确定的future vs 智能指针future vs 匿名future
- 关于future 0.1和future 0.3 的注意事项(我们将使用future 0.1)
- 关于async/await的注意事项
第1部分:将tokio引入客户端
最终我们要将客户端和服务器都转换为future,由于客户端非常简单,我们就从这里入手。我们将首先在现有的同步KvsClient上,介绍tokio运行时。
这
对于客户端,我们将在保持同步KvsClient
的同时引入异步运行时,然后再改造KvsClient
。
KvsClient
的connect
方法。
请注意,作为一个库,KvsClient
可以提供基于futures的最高效率,但是我们的kvs-client
可执行文件并没有利用它,所以这个可执行文件运行单个future就立刻退出的行为看起来有点傻。
TODO @sticnarf - 看看您是否可以编写与具体未来类型无关的测试用例,以便它们与以下所有策略一起使用。
第2部分:将KvsClient
转换为智能指针future
转为future类型时最省事的方式。
第3部分:具有显式future类型的KvsClient
只是为了体验一下它是多么不靠谱。
第4部分:具有匿名future类型的KvsClient
最终的解决方案
第5部分:使ThreadPool
可共享
第6部分:将KvsEngine
转换为future
对于服务器,我们将做在客户端中相反的事,为KvsEngine
编写一个异步接口。这将表明future和底层运行时是相互独立的,并提供了一系列经验。
第7部分:使用tokio驱动KvsEngine
请注意,即使我们自己编写的异步代码很少,tokio本身也在num_cpus
个线程间分发异步作业。权衡将CPU密集型作业直接放在网络线程或文件线程上的利弊,例如,将序列化操作放在哪里?
扩展1:使用tokio-fs替换同步文件I/O
干得漂亮,朋友。休息一下吧。
1 Tokio概述
Tokio是Rust的异步运行时,提供了编写网络应用程序所需的模块。Tokio可灵活部署在大多数系统中,从具有数十个内核的大型服务器到小型嵌入式设备。
概括的说,Tokio提供了几个主要组件:
- 用于执行异步代码的多线程运行时。
- 标准库的异步版本。
- 一个庞大的相关库生态系统。
Tokio 在您的项目中的角色
当您以异步方式编写应用程序时,您可以通过降低同时做许多事情的成本来提升性能。但是,异步Rust代码不会自行运行,因此您必须选择一个运行时来执行它们。Tokio库是应用最广泛的运行时,其使用量超过了所有其他运行时的总和。
此外,Tokio提供了许多有用的实用工具。编写异步代码时,不能使用Rust标准库提供的普通阻塞API,而必须使用它们的异步版本。这些替代版本由Tokio提供,在有意义时这些API将于Rust标准库API保持一致。
Tokio的优势
本章将概述Tokio的一些优点。
快速
Tokio很快,基于Rust,而Rust本身也很快。遵循Rust精神,即目标是用户不应该通过手动编写等效代码来提高性能。
Tokio可扩展,基于async/await语言特性,本身就是可扩展的。在处理网络时,由于延迟,处理连接的速度受到限制,因此扩展的唯一方法是一次处理多个连接。使用async/await语言特性,可以非常方便的增加并发操作的数量,允许您扩展到大规模并发任务。
可靠
Tokio基于Rust编写,而Rust是一种可以让开发者编写可靠且高效软件的语言。大量研究表明,大约70%的高危安全漏洞是由内存安全引起的。使用Rust可以消除应用程序中的这一类错误。
Tokio还非常注重提供一致的行为。Tokio的主要目标是允许用户部署行为可预测的软件,这些软件将日复一日地执行,具有可靠的响应时间,杜绝不可预测的延迟峰值。
简单
借助Rust的async/await特性,编写异步应用程序的复杂性已大大降低。结合Tokio的实用工具和充满活力的生态系统,编写应用程序变得轻而易举。
Tokio在有意义的情况下遵循标准库的命名约定,以使用户可以轻松地将使用标准库编写的代码转换为使用Tokio编写的代码。借助Rust的强大类型系统,轻松交付正确代码的能力是无与伦比的。
灵活
Tokio提供了多种运行时架构。从多线程的工作窃取运行时,到轻量级的单线程运行时,应有尽有。这些运行时中的每一个都带有许多可配置参数,以允许用户根据自己的需要调整它们。
不适用Tokio的场景
尽管Tokio对于许多需要同时做很多事情的项目很有用,但也有一些用例不适合Tokio。
- 通过在多个线程上并发执行以提速的CPU密集型计算。Tokio专为IO密集型应用程序而设计,其中每个单独的任务大部分时间都在等待IO。如果您的应用程序唯一要做的就是并发计算,那么您应该使用rayon。不过,如果您两种任务都做,仍然可以混合使用Tokio。
- 读取大量文件。尽管Tokio看上去对于只需要读取大量文件的项目很有用,但与普通线程池相比,Tokio在这里没有任何优势。因为操作系统通常不提供异步文件API。
- 发送单个Web请求。Tokio的优势是在需要同时做很多事情时。如果你需要使用一个用于异步Rust的库,例如reqwest,但你不需要一次做很多事情,你应该更喜欢那个库的阻塞版本,因为它会让你的项目更简单。当然,使用Tokio仍然有效,但与阻塞API相比并没有真正的优势。如果库不提供阻塞API,请参阅桥接同步代码章节。
获得帮助
在任何时候,如果您遇到困难,您总是可以在Discord或GitHub讨论中获得帮助。请不要担心问“初学者”问题。我们都是从某个地方开始,并乐于提供帮助。
2 环境设置
本教程将带您逐步完成构建Redis客户端和服务器的过程。我们将从使用Rust进行异步编程的基础知识开始,并从那里开始构建。我们将实现Redis命令的一个子集,以全面了解Tokio。
Mini-Redis
您将在本教程中构建的项目在GitHub上以Mini-Redis 的形式提供。Mini-Redis的主要目标是学习Tokio,也得到了很好的评价,但这也意味着Mini-Redis缺少一些您在真正的Redis库中需要的功能。您可以在crates.io上找到可部署于生产环境的Redis库。
我们将在教程中直接使用Mini-Redis,也就是在教程后面部分实现Mini-Redis之前就提前使用。
先修课
我们假设读者熟悉Rust。《Rust Book》就是很好的启蒙读物。
虽然不是必须的,但使用Rust标准库或其他语言编写网络代码的经验将会有所帮助。
我们不需要读者熟悉Redis。
Rust
在教程开始之前,您应该确保安装了Rust工具链。如果没有,最简单的安装方法是使用rustup。
教程需要不低于1.45.0的Rust版本,建议使用最新的Rust版本。
检查计算机上安装的Rust版本,执行:
1 | rustc --version |
您将会看到类似rustc 1.46.0 (04488afe3 2020-08-24)
的输出。
Mini-Redis server
接下来,安装Mini-Redis server,以在我们完成客户端编写时进行测试。
1 | cargo install mini-redis |
启动服务器以确定安装成功:
1 | mini-redis-server |
然后,在另一个终端窗口中,尝试使用mini-redis-cli
获取键foo
的值。
1 | mini-redis-cli get foo |
你应该会看到输出为(nil)
。
万事俱备
如此,一切就绪。阅读下一章以编写您的第一个异步Rust应用程序。
3 Hello Tokio
作为开胃菜,我们将编写一个非常基础的Rust应用。它将连接到Mini-Redis server,将键Hello
的值设置为World
,然后再读回键。这将使用Mini-Redis client库完成。
代码
初始化新crate
让我们从生成一个新的Rust App开始:
1 | cargo new my-redis |
添加依赖
然后,向Cargo.toml
的[dependencies]
中添加以下依赖:
1 | tokio = { version = "1", features = ["full"] } |
编写代码
在main.rs
编写如下代码:
1 | use mini_redis::{client, Result}; |
确保Mini-Redis server正在运行,在单独的终端中执行:
1 | mini-redis-server |
如果你还没有安装mini-redis,执行:
1 | cargo install mini-redis |
现在,运行my-redis:
1 | cargo run |
执行成功!
你可以在这里找到全部代码。
代码详解
让我们花一些时间来理解刚刚做的事情。虽然没有太多代码,但执行了很多动作。
1 | let mut client = client::connect("127.0.0.1:6379").await?; |
client::connect
函数由mini-Redis
Crate提供。它将异步的与指定的远端地址建立TCP连接,并在链接成功建立时返回client
句柄。虽然这里执行了一个异步操作,但我们编写的代码看起来跟同步代码没什么区别。只有.await
动作表明这个操作是异步的。
什么是异步编程?
大多数计算机程序会以与代码编写相同的顺序执行。先执行第一行,再执行下一行,依此类推。在同步编程中,当程序遇到无法立即完成的操作时,它将被阻塞直到操作完成。例如,建立TCP连接需要与网络上的其他节点交换数据,这可能需要大量时间,而在此期间,线程将被阻塞。
而在异步编程中,无法立即完成的操作在后台挂起。该线程没有被阻止,并且可以继续执行其他任务。一旦操作完成,挂起的任务就会恢复,并从挂起的位置继续执行。我们之前的示例中只有一个任务,因此在挂起时什么都没有发生,但是真正的异步程序通常具有大量此类任务。
尽管异步编程可以大幅提升应用程序的执行速度,但通常也会提升程序的复杂度。要求程序员在异步操作完成后跟踪所有可能的状态以恢复工作。以程序员的经验来说,这是一项繁琐且容易出错的任务。
编译时绿色线程
Rust使用称为async/await
的特性来实现异步编程。执行异步操作的函数将用async
关键字标记。在我们的示例中,connect
函数是这样定义的:
1 | use mini_redis::Result; |
async fn
定义看起来像是常规的同步函数,但其实执行的是异步操作。Rust在编译时将async fn
转换为一个异步运行的例程。在async fn
中的任何.await
动作都会将控制权转回线程,以使线程再异步操作挂起在后台的时间里继续执行其他工作。
如果这个解释不够明确,请不要担心。在整个指南中,我们将探索更多关于async/await
的内容。
使用async/await
在Rust中调用异步函数与调用普通函数没什么区别。但是,仅调用这些函数并不会执行函数体,而是会返回一个值用来表示async fn
的操作。这在概念上类似于一个无参闭包。要实际运行该操作,您应该在返回值上使用.await
运算符。
以下面的代码为例:
1 | async fn say_world() { |
输出为:
1 | hello |
async fn
的返回值是一个实现了Future
trait的匿名类型。
异步main
函数
用于启动异步应用程序的main
函数,与大多数Rust crate中的常用main
函数不同。
- 这是一个
async fn
- 带有
#[tokio::main]
标记
当我们想要进入异步上下文时,需要使用async fn
。但是,异步函数必须由一个运行时执行。该运行时包含异步任务调度器,提供事件I/O、定时器等特性。运行时不会自动启动,所以需要main函数启动它。
#[tokio::main]
函数是一个宏。它将async fn main()
转换为同步fn main()
,初始化一个运行时实例并执行异步main函数。
例如,下面的代码:
1 |
|
将转换为:
1 | fn main() { |
后面将会介绍关于Tokio运行时的详细信息。
Cargo特性
本教程依赖于Tokio,并启用了full
特性:
1 | tokio = { version = "1", features = ["full"] } |
Tokio有很多功能(TCP、UDP、Unix sockets、计时器、同步实用程序、多种调度程序等)。并非所有应用程序都需要完整的Tokio功能。在尝试优化编译时间或最终程序的大小时,用户可以只选择需要使用的功能。
在本教程中,选择Tokio依赖时将使用full
功能。
4 生成并发
我们将更换目标,开始研究Redis server端。
首先,将上一章客户端的SET
/GET
代码移至example文件。这样,就可以在我们的服务端上进行测试了。
1 | mkdir -p examples |
然后创建一个新的空src/main.rs
文件。
接受sockets
我们的Redis服务端需要做的第一件事是接受入站TCP sockets,即使用tokio::net::TcpListener
。
TcpListener
绑定到6379
端口,并在循环中接受套sockets。每个socket都将被处理然后被关闭。现在,我们将读取命令,将其打印到标准输出并响应为一个错误。
1 | use tokio::net::{TcpListener, TcpStream}; |
现在运行这个循环:
1 | cargo run |
在另一个终端里运行hello-redis
的示例(即上一章那个SET
/GET
命令的例子):
1 | cargo run --example hello-redis |
输出应为:
1 | Error: "unimplemented" |
在服务端的终端窗口输出应为:
1 | GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")]) |
并发
我们的服务端仍有个小问题(除了只响应错误)。它一次仅处理一个入站请求,当一个连接被接受时,服务器将停留在接受循环代码块内,直到响应完全写入socket。
我们希望Redis服务端能够并发处理大量请求,为此,我们需要添加一些并发性。
为了并发处理连接,我们将为每个入站连接生成一个新任务,并在此任务上处理连接。
接受连接的循环变为:
1 | use tokio::net::TcpListener; |
任务
Tokio任务是一个异步绿色线程,通过将async
块传给tokio::spawn
创建。tokio::spawn
函数返回一个JoinHandle
,调用者可以使用它与生成的任务进行交互。async
块可能会有返回值。调用者可以在JoinHandle
上使用.await
获取返回值。
例如:
1 |
|
等待JoinHandle
返回一个Result
。当任务在执行过程中遇到错误时,JoinHandle
会返回一个Err
,这将在任务panic、或任务因运行时关闭被强制取消时发生。
调度程序以任务为单位管理执行,生成任务将提交给Tokio调度程序,调度程序将确保在有工作要做时执行该任务。生成的任务可以在其生成的线程上执行,也可以在运行时其他线程上执行,该任务也可以在产生后在线程间移动。
Tokio中的任务非常轻量。实际上在底层,任务只需要一次64字节的内存分配。应用程序大可放心生成数千个乃至数百万个任务。
'static
限制
当您在Tokio运行时生成任务时,其类型的生命周期必须是'static
。这意味着生成的任务不得引用任务作用域外的任何数据。
比如下面的代码将无法通过编译:
1 | use tokio::task; |
尝试编译将会产生如下错误:
1 | error[E0373]: async block may outlive the current function, but |
发生这种情况是因为默认情况下,变量所有权不会移动到异步代码块中。v
向量仍然归主函数所有,而println!
一行却借用v
。rust编译器向我们解释这一点,甚至建议将第7行更改为task::spawn(async move {
,以告诉编译器将v
移动到生成的任务中。如此,该任务拥有其需要的所有数据,使其成为'static
。
如果必须从多个任务同时访问单个数据,则该数据需要通过诸如Arc
之类的同步原语来共享。
注意到错误消息说,参数类型超过了'static
生命周期。这个术语可能会相当迷惑,因为'static
生命周期一直持续到程序结束,所以如果超过它,不就是有内存泄漏吗?实际上这个消息说的是参数的类型,而不是其值必须超过'static
生命周期,并且值可能在其类型不再有效之前就被销毁。
当我们说一个值是'static
时,意味着永远保持该值不会是错的。这很重要,因为编译器无法推断新生成的任务会保留多长时间。我们必须确保允许任务永远存在,以便Tokio可以在需要时让任务运行起来。
上面信息框中链接的文章使用术语“受'static
限制”,而不是“它的类型比'static
寿命长”或“值是'static
”来指代T: 'static
。这些都意味着同样的事情,但与&'static T
中的“用'static'
标识”不同。
Send
限制
tokio::spawn
生成的任务必须实现Send
。这允许Tokio运行时在任务被.await
挂起时,在线程之间移动它们。
当.await
调用保存的所有数据都是Send
时,任务也是Send
的,这有点微妙。当调用.await
时,任务将控制权转交给调度程序,在下次执行任务时,它会从上次转交的点恢复。为了确保这个策略生效,在.await
之后使用的所有状态都必须由任务保存。如果这些状态为Send
的,即可以跨线程移动,则任务本身就可以跨线程移动。相反,如果状态不是Send
的,那么任务也不是。
这里的代码是有效的:
1 | use tokio::task::yield_now; |
这里则是无效的:
1 | use tokio::task::yield_now; |
尝试编译会得到如下错误:
1 | error: future cannot be sent between threads safely |
我们将在下一章更深入地讨论这个错误的一个特例。
保存值
我们现在将实现process
函数来处理传入的命令,使用HashMap
来存储值。SET
命令将值插入到HashMap
中,而GET
值将读取这些值。此外,我们将使用循环从而让每个连接能够接受多个命令。
1 | use tokio::net::TcpStream; |
现在,启动服务:
1 | cargo run |
在另一个终端里运行hello-redis
示例:
1 | cargo run --example hello-redis |
现在的输出应为:
1 | got value from server; result=Some(b"world") |
我们现在可以查询和设置键值了,但是有一个问题:这些键值并不能在连接之间共享。如果另一个套接字连接并尝试GET
键hello
,它将查询不到任何东西。
你可以在这里找到完整的代码。
在下一章中,我们将为所有套接字实现数据持久化。
5 共享状态
到目前为止,我们有了键值服务端,但却存在一个缺陷:状态并未在连接之间共享。本文将解决这个问题。
策略
在Tokio中共享状态通常有以下几种方式。
- 使用互斥锁封装共享状态。
- 再生成一个任务,专门用于管理状态,并使用消息传递操作状态。
第一种方法通常用于简单数据,而第二种方法用于需要异步工作(例如I/O原语)的场景。在本章中,共享状态是一个hashmap
,并且操作是insert
和get
。这些操作都不是异步的,因此我们将使用互斥锁Mutex
。
下一章将介绍后一种方法。
添加bytes
依赖
Mini-Redis crate将使用bytes
crate的Bytes
代替Vec<u8>
。Bytes
专门用于为网络编程提供强大的字节阵列结构。相较于Vec<u8>
,它添加了强大的浅拷贝功能。换句话说,在Bytes
实例上调用clone()
将不会复制底层数据,只会让实例的引用计数器加一,即Byte
实例仅是某个底层数据的引用计数器句柄,类似于添加了新功能的Arc<Vec<u8>>
。
要使用bytes
,请在Cargo.toml
的[dependencies]
中添加以下内容:
1 | bytes = "1" |
初始化HashMap
为了让hashmap
能够在许多任务和许多线程中共享,需要将其封装在Arc<Mutex<_>>
中。
首先,为方便使用,在use
之后给该类型起别名。
1 | use bytes::Bytes; |
然后,更新main
函数以初始化Hashmap
并将Arc
的句柄传给process
函数。使用Arc
允许从许多任务中同时引用Hashmap
,并可能在许多线程上运行。在整个Tokio中,术语句柄指的是那些可以访问某些共享状态的值。
1 | use tokio::net::TcpListener; |
关于使用std::sync::Mutex
注意这里使用了std::sync::mutex
而不是tokio::sync::Mutex
来封装Hashmap
。一个常见的错误是在异步代码中无条件使用tokio::sync::mutex
。异步互斥锁用于锁住所有.await
操作。
同步互斥锁在等待获取锁定时会阻塞当前线程,于是,将阻止其他任务处理。而如果使用tokio::sync::Mutex
通常没有作用,因为异步互斥锁的在内部也使用同步互斥锁。
根据经验,只要竞争不高,并且不会在调用.await
时保持锁,则使用异步代码内的同步互斥锁也是可以的。此外,也可以考虑使用parking_lot::mutex
作为std::sync::Mutex
的更快替代方案。
修改process()
process()
函数不再需要初始化Hashmap
,现在,它将接受一个共享的句柄作为参数,并且需要在使用前锁定该Hashmap
。要注意的是Hashmap
的值现在是Bytes
类型(可以高效的复制),因此也需要修改。
1 | use tokio::net::TcpStream; |
任务、线程与竞争
当竞争较少时,使用互斥锁阻保护少量关键变量是可行的策略。当出现锁竞争时,执行任务的线程将会阻塞并等待互斥锁释放。这不仅会阻塞当前任务,而且还将阻止在当前线程上调度任何其他任务。
默认情况下,Tokio运行时使用多线程调度程序。任务被安排在由运行时管理的任意数量的线程上。如果需要调度执行大量任务,并且他们都需要访问互斥锁,则将产生竞争。不过,如果启用了current_thread
运行时,则永远不会产生互斥锁竞争。
即使同步互斥锁竞争成为问题,也很少使用Tokio的互斥锁,而是应当考虑:
- 使用一个专用任务,以管理状态并使用消息传递。
- 互斥锁分片。
- 重组代码以避免使用互斥锁。
在我们的例子中,由于各个键相互独立,因此可以选择将互斥锁分片。因此,我们将用N
个不同的实例代替例子中的单个Mutex<HashMap<_, _>>
。
1 | type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>; |
如此,找到任何给定键就需要两个步骤:先是要确定该键在哪个分片中,再在这个HashMap
中查找该键。
1 | let shard = db[hash(key) % db.len()].lock().unwrap(); |
上面的这段简单实现要求确定的分片数量,而且一旦创建了分片映射分片数量就不能更改。dashmap
crate提供了一个更复杂的分片哈希映射的实现。
在.await
上持有MutexGuard
你可能会编写这样的代码:
1 | use std::sync::{Mutex, MutexGuard}; |
当你尝试并发调用那些调用此函数的代码时,将遇到以下错误消息:
1 | error: future cannot be sent between threads safely |
这是因为std::sync::MutexGuard
类型不可Send
,即你不能将互斥锁发送到另一个线程,于是这将导致错误,因为Tokio运行时可以在每个.await
时在线程间移动任务。为避免这种情况,你应该重组代码,使互斥锁在.await
之前析构:
1 | // This works! |
请注意这样也不行:
1 | use std::sync::{Mutex, MutexGuard}; |
目前,编译器仅根据作用域推断future是否为Send
。将来,编译器可能会支持显式析构,但现在,您只能使用显式作用域。
请注意,这里讨论的错误也在并发一章的Send
限制小节中讨论过。
如果您想尝试某种方式,通过生成不需要Send
的并发任务来绕过此问题,是不可行的。因为如果Tokio在任务持有锁时将任务挂起在.await
处,则可能会在同一线程上调度运行其他任务,而这里的其他任务也可能尝试获取该互斥锁,这将导致死锁,即等待获取互斥锁的任务会阻塞持有互斥锁的任务释放锁。
下面我们将讨论一些上述错误消息的方法:
重构你的代码,不跨.await
持锁
我们已经在上面的代码片段中看到了一个例子,但是还有一些更健壮的代码实现。例如,你可以将互斥锁包装在一个结构体中,并只在非异步方法中锁定互斥锁:
1 | use std::sync::Mutex; |
这种模式保证你不会遇到Send
错误,因为MutexGuard
不会出现在异步函数中。
生成一个任务来管理状态并使用消息传递对其进行操作
这是本章开头提到的第二种方法,通常在共享的资源是I/O资源时使用。有关详细信息,请参阅下一章。
使用Tokio的异步互斥锁
也可以使用Tokio提供的tokio::sync::Mutex
类型。Tokio互斥锁的主要特性是它可以跨.await
保存而不会出现任何问题。也就是说,异步互斥锁比普通互斥锁开销更大,因此一般建议从前面两种方法中选一种实现。
1 | use tokio::sync::Mutex; // note! This uses the Tokio mutex |
6 消息通道
现在我们已经了解了一些关于Tokio的并发性,现在让我们在客户端上应用它。把我们之前写的服务端代码放到一个显式的二进制文件中:
1 | mkdir src/bin |
创建一个包含客户端代码的新二进制文件:
1 | touch src/bin/client.rs |
在此文件中,您将编写本章的代码。要记住在运行客户端前,必须首先在单独的终端中启动服务端:
1 | cargo run --bin server |
再用单独的终端里运行客户端:
1 | cargo run --bin client |
让我们开始编码!
假设我们要并发执行两个Redis命令。我们可以为每个命令生成一个任务,然后这两个命令将同时发生。
起初,我们可能会进行如下尝试:
1 | use mini_redis::client; |
上面的代码无法通过编译,因为这两个任务都需要以某种方式访问client
。由于Client
没有实现Copy
,所以如果没有一些代码来实现共享,就无法编译。此外,Client::set
采用&mut self
,这意味着需要互斥存取才能调用。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用std::sync::Mutex
因为.await
需要在持有锁的情况下调用。我们可以使用tokio::sync::Mutex
,但这只会允许一个运行中的请求。如果客户端实现了流水线,异步互斥锁还将导致连接不能被充分利用。
消息传递
解决方案是使用消息传递。该模式需要生成一个专门的任务来管理client
资源。任何希望发出请求的任务,都会向client
任务发送消息。client
任务代理发送者发出请求,并将响应返回给发送者。
使用此策略,可以只建立一个连接。client
管理任务能够获得互斥存取权限,以便调用get
和set
。此外,通道也起到了缓冲区的作用。即使client
任务忙时,操作也可能会被发送到client
任务。当client
任务可用于处理新请求时,它就会从通道中获取下一个请求。这可以带来更好的吞吐量,并可扩展以支持连接池。
Tokio的通道原语
Tokio提供了许多通道,每个通道都有不同的用途。
- mpsc:多生产者,单消费者通道。可以发送许多值。
- oneshot:单生产者,单消费者渠道。可以发送单个值。
- broadcast:多生产者,多消费者。可以发送许多值,每个消费者都能看到每个值。
- watch:单生产者,多消费者。可以发送许多值,但不保留任何历史记录。消费者只能看到最新的值。
如果您需要一个多生产者多消费者通道,其中只有一个消费者可以看到每条消息,您可以使用async-channel
crate。在异步Rust之外还有一些通道可以使用,例如std::sync::mpsc
和crossbeam::channel
。这些通道在等待消息时将阻塞线程,这在异步代码中是不允许的。
在本节中,我们将使用mpsc和oneshot,其他类型的消息传递通道将在后面的章节中探讨。你可在这里找到本节的完整代码。
定义消息类型
在大多数情况下,当使用消息传递时,接收消息的任务会响应多个命令。在我们的例子中,任务将响应GET
和SET
命令。为了对此建模,我们首先定义一个Command
枚举并为每个命令指定一个成员变量。
1 | use bytes::Bytes; |
创建通道
在main
函数中新建一个mpsc
通道:
1 | use tokio::sync::mpsc; |
mpsc
通道用于向管理redis连接的任务发送命令,多生产者特性允许从多个任务发送消息。创建通道返回两个值,一个发送端和一个接收端,两个句柄各有用处。他们可能会被转移到不同的任务中。
创建通道的容量为32,如果消息的发送速度快于接收速度,通道将存储它们。一旦有32条消息存储在通道中,调用send(...).await
就将进入睡眠状态,直到接收方取走了一条消息为止。
从多个任务发送是通过克隆Sender
来完成的。例如:
1 | use tokio::sync::mpsc; |
两条消息都发送到同一个Receiver
句柄。mpsc
通道的接收端无法被克隆。
当所有Sender
超出作用域或是被析构时,就无法再向通道发送更多消息。此时,在Receiver
上调用recv
会返回None
,表示所有发送端都不在了,通道已关闭。
在我们管理Redis连接的任务中,当通道关闭时该任务就将关闭Redis连接,因为该连接将不再被使用。
生成管理任务
接下来,生成一个专门处理通道的消息的任务。首先,客户端建立到Redis服务端的连接;然后,通过Redis连接发送收到的命令。
1 | use mini_redis::client; |
现在可以修改任务,使用通道发送命令,以代替直接使用Redis连接发送命令。
1 | // The `Sender` handles are moved into the tasks. As there are two |
在main
函数最后,在join句柄上使用.await
,以确保命令在进程退出前能够执行完毕。
接收响应
最后一步是从管理任务接收响应。GET
命令需要获取值,而SET
命令需要知道操作是否成功完成。
我们使用oneshot
通道,以传递响应。oneShot
通道是一种单次生产、单次消费的通道,专门针对对发送单个值的场景进行了优化。在我们的例子中,这单个值就是响应。
和MPSC
类似,oneShot::Channel()
返回一对发送端、接收端句柄。
1 | use tokio::sync::oneshot; |
与mpsc
不同的是,它无需指定通道数量,因为始终是一,此外,这两个句柄都不能克隆。
要从管理任务接收响应,在发送命令之前,就应创建oneshot
通道。通道的发送端包含在命令中传给管理任务,接收端则用于接收响应结果。
首先,修改Command
以包含Sender
。为了方便起见,使用类型别名来指代Sender
。
1 | use tokio::sync::oneshot; |
然后,修改管理任务以能够发送包含oneshot::Sender
的命令。
1 | let t1 = tokio::spawn(async move { |
最后,修改管理任务,以使用oneshot
通道返回响应。
1 | while let Some(cmd) = rx.recv().await { |
调用oneshot::Sender
的send
会立即完成,不需要使用.await
。这是因为在oneshot
通道上send
总是立刻失败或成功,不会有任何等待。
在oneshot
通道上发送值时,如果接收端被析构则将返回Err
,以表明接收端不再关注响应。在我们的例子中,接收端可以不接收响应,因此无需处理resp.send(...)
返回的Err
。
你可以在这里找到完整代码。
背压调节和通道限制
当引入并发或队列时,一定要确保队列是有限的,以使系统能优雅地处理负载。无限的队列最终将耗尽所有可用的内存,并导致系统以不可预测的方式宕机。
Tokio能够避免定义不明确的队列,主要也是因为Tokio中的异步操作是惰性求值。考虑以下代码:
1 | loop { |
如果异步操作立即求值,则此循环会不停的将新的async_op
排入队列,且并不关心之前的操作是否完成,如此则会隐式的产生无限制队列。基于回调和立即求future的系统尤其容易受到上述影响。
但是,使用Tokio和异步Rust,上述代码片段的async_op
根本不会运行,因为.await
从未被调用。如果修改上述代码,加入.await
,则循环会等待操作完成才进入下一次循环。
1 | loop { |
使用并发和队列时必须明确引入。这样做的方法包括:
tokio::Spawn
select!
join!
mpsc::channel
这样做时,请注意确保并发的总数有上限。例如,在编写接受TCP连接的循环时,请确保打开的sockets的总数有上限。使用mpsc::channel
时,选择合适的通道容量。针对特定的应用程序选择特定的约束限制。
编写可靠的Tokio应用程序的一个关键,就是留意并挑选良好的限制值。
7 I/O
Tokio中的I/O以与std
中的运行方式几乎一样,只不过是异步的。有一个读trait(AsyncRead
)和一个写trait(AsyncWrite
)。一些类型已经良好的地实现了这些trait(TCPStream
、File
,Stdout
)。许多数据结构也实现了AsyncRead
和AsyncWrite
(如Vec<u8>
和&[u8]
),这使得我们可以在需要用到reader和writer的地方使用字节数组。
本章将通过一些例子演示使用Tokio时的基本I/O读写操作。下一章江更加深入的介绍高级的I/O示例。
AsyncRead
和AsyncWrite
这两个trait为异步读写字节流提供了基础工具。我们通常不直接调用这些trait上的方法,类似于你不会手动调用future
trait的poll
方法,而是使用AsyncReadExt
和AsyncWriteExt
提供的工具方法。
让我们简要看一下其中的一些方法,所有这些函数都是异步的,必须与配合.await
使用。
async fn read()
AsyncReadExt::read
提供了一种将数据读取到缓冲区的异步方法,返回值为读到的字节数。
注意:当read()
返回Ok(0)
时,意味着该流已关闭。任何调用read()
的future江立即完成并返回Ok(0)
。比如对TcpStream
的实例,这表示socket的读端已关闭。
1 | use tokio::fs::File; |
async fn read_to_end()
AsyncReadExt::read_to_end
将读取流中的所有字节,直到遇到EOF。
1 | use tokio::io::{self, AsyncReadExt}; |
async fn write()
AsyncWriteExt::write
将一个缓冲区写入writer,返回值为写入的字节数。
1 | use tokio::io::{self, AsyncWriteExt}; |
async fn write_all()
AsyncWriteExt::write_all
将整个缓冲区写入writer。
1 | use tokio::io::{self, AsyncWriteExt}; |
这两个trait都包括许多其他有用的方法,详情请参阅API文档。
帮助函数
此外,就像std
,tokio::io
模块也包含许多实用的工具函数,以及用于标准输入、标准输出和标准错误的API。例如,使用tokio::io::copy
异步将读端的所有内容复制到写端。
1 | use tokio::fs::File; |
留意到上例实际上利用了实现了AsyncRead
的字节数组。
回音服务端
让我们上手试一下异步I/O,来编写一个简单的回音服务端。
回音服务端绑定一个TcpListener
,使用一个循环接收入站连接。对于每个入站连接,从socket读取数据,然后立即将读到的数据写回socket。客户端将数据发送到服务器,然后接收完全相同的数据。
我们将使用两种稍微不同的策略分别实现回音服务端。
使用io::copy()
首先,我们将使用io:copy
实现回音逻辑。
您可以在新的二进制文件中写入此代码:
1 | touch src/bin/echo-server-copy.rs |
您可以通过以下方式启动(或仅检查编译):
1 | cargo run --bin echo-server-copy |
您将可以直接使用标准命令行工具(例如telnet
)测试服务端,或使用tokio::net::TcpStream
的文档中示例的简单客户测试服务端。
TCP服务端使用循环接受请求,每接受一个socket就生成一个新任务。
1 | use tokio::io; |
之前提到过,该工具函数将接受一个读端和一个写端,并将数据从一端拷贝到另一端,但此处我们只有一个Tcpstream
,而这一个值同时实现了AsyncRead
和AsyncWrite
。由于io::copy
要求读端和写端均为&mut
,如果这两个参数都写成socket的话就不能通过编译。
1 | // This fails to compile |
拆分读端和写端
要解决问题,我们需要将socket拆分为读句柄和写句柄,特定的类型有特定的最佳读/写端拆分方式。
任何读端+写端的类型,均可用io::split
工具函数拆分,函数接受一个参数,返回一对读句柄/写句柄。这两个句柄可以独立使用,比如用在不同的任务中。
例如,回音客户端可以按如下的方式处理的并发读取:
1 | use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; |
io::split
能够接受任何实现了AsyncRead + AsyncWrite
的类型并返回独立的句柄,是因为io::split
内部使用一个Arc
和一个Mutex
。我们可以使用TcpStream
避免使用此开销。TcpStream
提供了两个专用的拆分函数。
TcpStream::split
接受一个流的引用,返回一对读/写句柄。由于使用了引用,因此两个句柄都必须呆在这个调用split()
的任务中。这种专用拆分为零成本,不需要Arc
或Mutex
。TcpStream
还提供了into_split
函数,以仅使用一个Arc
为代价,让句柄能够在任务间移动。
由于调用io::copy()
的任务就拥有TcpStream
的所有权,因此我们可以使用TcpStream::split
。服务端中处理回音逻辑的任务变成:
1 | tokio::spawn(async move { |
你可以在这里找到完整的代码。
手动拷贝
现在,让我们来了解一下如何通过手动复制数据的方式编写回音服务端。这里,我们需要使用AsyncReadExt::read
和AsyncWriteExt::write_all
。
完整的回音服务端如下:
1 | use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; |
(您可以将此代码放入src/bin/echo-server.rs
中,并使用cargo run --bin echo-server
运行)。
我们来逐行理解代码。首先,由于使用AsyncRead
和AsyncWrite
实用工具,因此必须引入这些扩展trait。
1 | use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; |
分配缓冲区
我们的策略是将一些从socket中读取的数据放入缓冲区,然后将缓冲区的内容写回到socket中。
1 | let mut buf = vec![0; 1024]; |
这里显式禁止使用栈缓冲区。回想前面,我们注意到,跨.await
调用存活的所有任务数据都必须保存在任务中。上面的代码中,buf
跨.await
调用使用。所有任务数据都存储在这单个分配中。您可以将其视为一个enum
,其中每个成员变量都是需要保存在任务中以便特定的.await
调用的数据。
如果缓冲区是一个栈数组,则为每个接受的socket生成的任务内部结构可能看起来像:
1 | struct Task { |
如果将栈数组当做缓冲区,则它将内联存储在任务结构体中。这将使任务结构体变得非常大。另外,缓冲的大小通常是页大小,于是,这将使任务大小变得很尴尬:$页大小 + 一些额外字节
。
当然,编译器将异步代码块进行了优化,性能远超基本的enmu
。实际上,变量并不会像enum
可能需要的那样,在成员间移动。但是,任务结构体大小至少与最大的变量一样大。
因此,为缓冲区使用专用的分配通常能更加高效。
处理EOF
当关闭了TCP流的读端后,再调用read()
将返回Ok(0)
,此时必须让读循环终止。忘记在已经EOF的内容上继续循环读取,是一个常见的错误。
1 | loop { |
忘记停止读循环,通常会导致CPU使用率100%的无限循环。当socket关闭时,socket.read()
将立即返回,然后不停的重复循环。
你可以在这里找到完整的代码。
8 组帧
现在,我们将使用我们刚刚学到的I/O相关知识,实现Mini-Redis组帧层。组帧是获取字节流并将其转换为帧流的过程,帧是两节点间传输数据的单位。Redis协议帧定义如下:
1 | use bytes::Bytes; |
请留意该帧定义是如何使用没有语义的数据组成的,命令解析和实现位于更上层。
对于HTTP,帧可能看起来像:
1 | enum HttpFrame { |
为了实现Mini-Redis的帧,我们将实现Connection
结构体,里面封装了一个TcpStream
用以读/写mini_redis::Frame
。
1 | use tokio::net::TcpStream; |
你可以在这里找到Redis Wire协议的详细信息,可以在这里找到完整的代码。
缓冲读
read_frame
方法会等待收到整个帧才返回,而调用一次TcpStream::read()
返回的数据长度不确定,可能包含整个帧,或是部分帧,也可能包含了多个帧。如果收到的是部分帧,则应缓冲数据并从socket中读取更多数据。如果收到的是多个帧,则应返回第一帧,缓冲其余数据,以等待下一次read_frame
调用。
为了实现上述功能,需要给Connection
添加一个读缓冲区字段,以数据将从socket读取到读缓冲区。解析帧时,再从缓冲区中返回并删除相应的数据即可。
我们选用BytesMut
作为缓冲区类型,即Bytes
的可变版本。
1 | use bytes::BytesMut; |
接下来实现read_frame()
方法:
1 | use tokio::io::AsyncReadExt; |
让我们逐行分析这段代码。read_frame
方法内是一个循环:首先调用self.parse_frame()
,以将尝试从self.buffer
中解析出一个完整的redis帧,如果有足够的数据来解析出一个完整帧,则将该帧返回给read_frame()
的调用者;反之,则尝试将从socket读更多的数据取到缓冲区中。读取更多数据后将再次调用parse_frame()
,此时,如果收到足够的数据,解析可能会成功。
从流中读取数据时,返回值为0
表示将不能从对端收到更多数据,此时,如果读缓冲区仍有数据,则表明收到了部分帧,但该连接突然终止。这是一个错误,应返回Err
。
Buf
trait
从流中读取数据时调用了read_buf
。此读函数接受的参数,需要实现bytes
crate的BufMut
trait。
这里先思考如果使用read()
实现相同的读循环,该怎么写。(可以先用Vec<u8>
代替上面的BytesMut
)。
1 | use tokio::net::TcpStream; |
再给Conneciont
添加read_frame()
函数:
1 | use mini_redis::{Frame, Result}; |
使用字节数组read
时,需要手动维护一个游标用来跟踪缓冲了多少数据,还必须确保将缓冲区仍为空的部分传给read()
;否则,将覆盖掉缓冲的数据。而当缓冲区用完时,还要手动扩大缓冲区以能够继续读到缓冲区。在parse_frame()
(尚未提到)中,还需要指定解析位于self.buffer[..self.cursor]
中的数据。
因为于将字节数组与游标搭配使用的场景很常见,bytes
crate提供了表示字节数组和游标的抽象。Buf
trait由那些可以从中读取数据的源头类型来实现。BufMut
由那些可以进将数据写入的目的类型来实现。当把T: BufMut
的参数传给read_buf()
时,缓冲区内部的游标将被read_buf()
自动更新。因此,在我们的read_frame
,不再需要自己手动管理游标。
另外,使用vec<u8>
时,必须初始化缓冲区。vec![0; 4096]
将分配4096字节的数组,并将每个字节都置为零。调整缓冲区大小时,新增部分还必须使用零来初始化。而初始化操作是有代价的。使用BytesMut
和BufMut
时,缓冲区是未初始化的。BytesMut
抽象会防止我们从未初始化的内存中读取数据,这就避免了初始化操作。
解析帧
现在,让我们看一下parse_frame()
,解析分两个步骤进行。
- 确保全帧被缓冲,并索引到到帧的末尾。
- 解析帧。
mini-redis
crate为我们提供了这两个步骤所需的函数:
这里又需要使用到Buf
抽象。一个实现了Buf
的对象会被传递给Frame::check
,而在check
函数检查传入的缓冲对象时,内部的游标会随着移动。当check
返回时,缓冲区内部的游标就自动指向帧的末端了。
我们将使用std::io::Cursor<&[u8]>
类型实现Buf
。
1 | use mini_redis::{Frame, Result}; |
完整的Frame::check
函数可以在这里找到,此处就不贴上它的完整代码了。
值得注意的是Buf
使用了“字节迭代器”风格的API,以读取数据并移动内部游标。例如,解析帧时,检查第一个字节以确定帧的类型,使用的函数是Buf::get_u8
,可以获取游标当前位置的字节,并将游标后移一位。
Buf
trait还有更多有用的方,可以查看API文档获取更多信息。
缓冲写
帧相关API的另一半是write_frame(frame)
函数,它会将一个完整的帧写入socket。为了最大程度地减少write
系统调用,写将被缓冲。我们维护一个写缓冲区,并在写入socket之前将帧编码到该缓冲区。不过,与read_frame()
不同,完整的帧并不总是在写入socket前缓冲到字节数组。
比如对一个大块流帧,要写入的值为Frame::Bulk(Bytes)
。大块帧的的物理格式是一个帧头,即$
符后接以字节计算的数据长度。帧的主体是内容的Bytes
。如果数据量很大,则将其复制到中间缓冲区将非常耗资源。
为了实现缓冲写,我们将使用BufWriter
结构体,它用T: AsyncWrite
初始化并自己实现AsyncWrite
。当在BufWriter
上调用write
时,并非直接调用内部的写操作,而是提交给缓冲区。当缓冲区满时,内容才一次性交给内部写操作,然后清理缓冲区。当然,在某些情况下,还有一些可以绕过缓冲区的优化操作。
这里的教程不会贴上完整的write_frame()
实现了,你可以在这里找到完整代码。
首先,修改Connection
结构体:
1 | use tokio::io::BufWriter; |
然后实现write_frame()
:
1 | use tokio::io::{self, AsyncWriteExt}; |
这里使用的诸多函数均由AsyncWriteExt
trait提供,该trait也被TcpStream
实现,但是不建议在没有中间缓冲区的情况下发送写单个字节的命令(译注:即直接在TcpStream
上调用这些write_*
函数会产生大量系统调用,建议的做法是在有缓冲包装的写对象上调用,如文中的stream
是一个BufWriter<TcpStream>
对象)。
write_u8
将单个字节送入写端。write_all
将整个切片送入写端。write_decimal
由mini-redis实现。
该函数结束时将调用self.stream.flush().await
。由于BufWriter
在中间缓冲区保存写内容,因此调用write
不能保证数据立即写入socket。在返回之前,我们希望将帧写入socket,手动调用flush()
以将缓冲区中等待的所有数据写入socket。
另一种实现是在Connection
上提供一个flush()
方法,来代替在write_frame()
中调用flush()
。这将允许调用者将多个小帧排队写入写缓冲区,然后只用一个write
系统调用将它们全部写入socket。不过这样会使Connection
的API复杂化。简单也是Mini-Redis的目标,因此我们决定在fn write_frame()
中调用flush().await
。
9 深入异步
至此,我们已经较为全面的了解了关于异步Rust和Tokio的知识。现在,让我们更深入了研究Rust的异步运行时模型。在本教程的最开始,我们暗示异步Rust采用了一种独特的实现方法。本章,我们就解释一下这意味着什么。
Futures
我们用一个非常基本的异步函数,来快速回顾一下了解到的异步相关知识,在这个例子中并没有超出前几章的知识点。
1 | use tokio::net::TcpStream; |
我们调用该函数,它返回一些值。我们又在该值上调用.await
。
1 |
|
my_async_fn()
的返回值是一个future。实现了标准库std::future::Future
trait的值就是future。这些值里包含了正在执行的异步计算。
std::future::Future
trait的定义为:
1 | use std::pin::Pin; |
关联类型Output
是future执行完毕后返回的类型。Pin
类型使Rust能够支持异步函数内的借用。有关更多详细信息,请参阅标准库文档。
与用其他语言实现future的方式不同,Rust的future并不是正在后台执行的计算过程,而是计算本身。future的所有者负责通过轮询未来来推进计算。这是通过调用Future::poll
来实现的。
(译注,本教程因主要关注tokio的应用,所以本章关于深入Rust异步机制的介绍篇幅并不大,更深入的讨论可以参考 使用Rust编写操作系统 - 4.1 - Async/Await)
实现Future
让我们实现一个非常简单的future。 这个future将:
- 等待,直到某个特定的时刻才执行。
- 将一些文本输出到标准输出。
- 生成一个字符串。
1 | use std::future::Future; |
将异步函数当做Future
在main
函数中,我们实例化future并在其上调用.await
。从异步函数中,我们可以在任何实现了Future
的值上调用.await
。反过来,调用async
函数会返回一个实现了Future
的匿名类型。在async fn main()
的情况下,生成的future大致为:
1 | use std::future::Future; |
Rust future实际上是状态机。这里用一个包含了三种future可能状态的enum
来表示MainFuture
。future状态机始于State0
状态。当调用poll
时,future会尽可能地推进其内部状态。如果future能够完成,则返回包含了异步计算输出的Poll::Ready
。
如果future无法完成,通常是由于它正在等待的资源尚未准备好,则返回Poll::Pending
。若调用者收到Poll::Pending
,则表明future将在稍后完成,调用者应该稍后再次调用poll
。
我们还看到future也包含了其他future。调用外部future的poll
会导致调用内部future的poll
函数。
执行器
异步Rust函数返回future,而future必须依靠不断的poll
调用来推进器状态。Future包含了其他future,那么问题来了,是谁在最外层的future调用poll
呢?
回想一下,要运行异步函数,要么必须传递给tokio::spawn
,要么使用带有#[tokio::main]
标识的main函数。这实际上这是将生成的外层future提交给了Tokio执行器。执行器负责在外部future
上调用Future::poll
,以驱动异步计算完成执行。
迷你Tokio
为了更好地理解这一切是如何组合在一起的,让我们实现我们自己的最小版本的Tokio!你可以在这里找到完整的代码。
1 | use std::collections::VecDeque; |
这里执行了一个异步代码块。使用给定的延迟创建了一个Delay
实例,并处于等待状态。但是现在的实现存在一个问题,即执行器不会休眠。这个执行器会在生成的所有future上不断调用轮询。大多数时间里,future都不会就绪,只会继续返回Poll::Pending
。这一过程将吃满CPU周期,所以通常效率不高。
理想情况下,我们希望mini-tokio只在future就绪时才去调用轮询。这种情况发生在任务所需的被阻塞资源转为可用,从而能够执任务所求情的操作时,例如,若任务想从TCP socket读取数据,则我们只希望在TCP socket收到数据时才进行轮询。在mini-tokio中,任务在给定的Instant
时间内被阻塞,所以理想情况下,mini-tokio应只在任务给定的Instance
时间过去后,再执行一次轮询。
为了实现这一目标,当轮询了某资源,且尚未就绪时,该资源应在过度到就绪状态后立即发送通知。
唤醒器
现在我们还缺少唤醒器,即用于在资源就绪时,通知等待的任务资源已就绪,可以继续后面的操作。
再次观察Future:poll
的函数签名:
1 | fn poll(self: Pin<&mut Self>, cx: &mut Context) |
poll
的Context
参数有一个waker()
方法,它返回一个绑定当前任务的Waker
。Waker
有一个wake()
方法,调用此方法将通知执行器以安排执行相关任务。当资源过度到就绪状态时就会调用wake()
,通知执行器轮询该任务以推进作业的执行。
修改Delay
给Delay
代码加上唤醒器:
1 | use std::future::Future; |
现在,一旦等待到了指定的延时,就会通知调用的任务,执行器就可以再次安排轮询任务了。接下来我们修改mini-tokio的代码,从而能够接受唤醒通知。
这里的Delay
实现仍存在一些问题,我们会稍后修复。
回忆我们的第一版Delay
,是这样实现future的:
1 | impl Future for Delay { |
在返回Poll:Pending
之前,我们调用了cx.waker().wake_by_ref()
。这是为了符合future协议。由于返回了Poll::Pending
,我们得负责通知唤醒器。由于此时我们还没实现计时器线程,所以我们在内部通知了唤醒器。这将导致该future被立刻安排轮询,从而再次执行,并且此刻的future依然没有就绪。
这里可以注意到,你确实可以比非必要不通知的理想状态,更频繁的通知唤醒器。比如上面的代码中,我们确实在未就绪的情况下通知了唤醒器。这也没什么大问题,只是浪费了点CPU周期,不过这种实现确实使得循环更加繁忙。
修改Mini Tokio
接下来是修改Mini Tokio的代码,以接收唤醒器通知。我们希望执行器仅在任务唤醒时运行它们,为此,需要给Mini Tokio实现唤醒器。调用唤醒器时,其关联的任务将被排入执行队列。Mini-Tokio在轮询future时会将该唤醒器传给该future。
新的Mini Tokio将使用通道存储排入计划的任务。通道能够让排入队列的任务在任意线程上执行。唤醒器必须是Send
且Sync
的,所以我们使用crossbeam提供的通道,因为标准库的通道不是Sync
的。
将以下依赖项添加到您的Cargo.toml
,引入通道。
1 | crossbeam = "0.8" |
再更新MiniTikio
结构体:
1 | use crossbeam::channel; |
Wakers
既Sync
又可被克隆。调用wake
时,任务必须被安排执行。为了实现这一点,我们建立一个通道。当唤醒器调用wake()
时,任务被推送到通道的发送端,我们的Task
结构体将实现这个唤醒逻辑。为此,它需要同时包含生成的future和通道的发送端。
1 | use std::sync::{Arc, Mutex}; |
为了安排任务,将Arc
克隆并使用通道发送。现在,我们需要将schedule
函数与std::task::Waker
挂钩。标准库提供了一个低级API,让我们能够手动构造vtable以执行此操作。这种策略为实现者提供了最大的灵活性,但需要大量非安全的样板代码。这里,我们将使用futures
crate提供的ArcWake
实用程序,以替代直接用 RawWakerVTable
。这允许我们通过实现一个简单的trait来将Task
结构体呈现为一个唤醒器。
将以下依赖项添加到Cargo.toml
中以引入futures
。
1 | futures = "0.3" |
然后,实现futures::task::ArcWake
。
1 | use futures::task::{self, ArcWake}; |
当上面的定时器线程调用waker.wake()
时,任务被推送到通道。接下来,我们在MiniTokio::run()
函数中实现任务的接收和执行。
1 | impl MiniTokio { |
以上代码做了很多事,首先,首先,实现MiniTokio::run()
,该函数执行一个从通道接收计划任务的循环。由于任务在被唤醒后推送到通道,因此这些任务在执行时能够取得进展。
此外,MiniTokio::new()
和MiniTokio::spawn()
函数现在改为使用通道,而不是之前的VecDeque
。当生成新任务时,它们会得到一份通道发送端的克隆,从而让任务在在运行时自行使用。
Task::poll()
函数使用futures
crate中的ArcWake
实用程序创建唤醒器。唤醒器用于创建task::Context
,task::Context
将传递给poll
。
小结
我们现在已经看到了异步Rust如何工作的端到端示例。Rust的async/await
特性通过多个trait共同实现。这允许第三方crate使用自定义的执行细节,如Tokio。
- 异步Rust操作是惰性的,需要被调用者轮询。
- 唤醒器被传递给future,使得future能够找到到调用它的任务。
- 当资源尚未就绪时,返回
Poll::Pending
并记录任务的唤醒器。 - 当资源准备就绪时,通知任务的唤醒器。
- 执行器收到通知并安排任务执行。
- 再次轮询任务,这次资源就绪,任务取得进展。
一些未解决的问题
回忆实现Delay
future时,我们提到还有一些事情需要解决。Rust的异步模型允许单个future跨任务执行。考虑以下情形:
1 | use futures::future::poll_fn; |
poll_fn
函数使用闭包创建一个Future
实例。上面的代码片段创建了一个Delay
实例,轮询它一次,然后将Delay
实例发送到新任务并执行.await
。在这个例子中,Delay::poll
被不同的Waker
实例调用了不止一次。发生这种情况时,您必须确保在最近一次调用poll
的Waker
上调用wake
。
在实现future时,应当假设每次调用poll
都可能返回不同的Waker
实例,轮询函数必须使用新的唤醒器代替旧的唤醒器。
我们早期的Delay
实现在每次轮询时都会生成一个新线程。这没什么问题,但如果轮询过于频繁,效率可能会非常低(例如,如果您select!
该future和其他future,只要其中之一有事件发生,则两者都会被轮询)。一种方法是记住你是否已经生成了一个线程,并只有在未生成线程时才执行生成。但是如果这样做,则必须确保在之后调用轮询时更新线程的唤醒器,否则你执行唤醒时将可能使用旧的唤醒器。
为了修复之前的实现,我们可以这样做:
1 | use std::future::Future; |
这有点复杂,不过基本思路是,在每次调用poll
时,future检查提供的唤醒器是否与先前保存的唤醒器一致,若不一致,则必须保存新的唤醒器。
Notify
实用程序
我们演示了如何使用唤醒器手动实现Delay
future。唤醒器是是异步Rust工作的基础。通常,没有必要深入到那个程度。例如,在Delay
中,我们完全可以通过使用async/await
配合tokio::sync::Notify
实用程序来实现。该实用程序提供了一个基础的任务通知机制。它负责处理唤醒器的相关细节,包括确保保存的唤醒器与当前任务的唤醒器一致。
使用Notify
,我们可以像这样使用async/await
实现delay
函数:
1 | use tokio::sync::Notify; |
10 Select
现在,当想要向系统添加并发时,我们就生成一个新任务。下面,我们将介绍一些使用Tokio并发执行异步代码的其他方法。
tokio::select!
tokio::select!
宏允许等待多个异步计算,并在某个计算完成时返回。
例如:
1 | use tokio::sync::oneshot; |
对于两个oneshot通道,任一通道都可能先完成。select!
语句在两个通道上等待并将val
绑定到任务返回的值。当tx1
或tx2
完成时,执行相关的代码块。
未完成的分支将被丢弃。在示例中,程序等待每个通道的oneshot::Receiver
。未完成通道的oneshot::Receiver
会被丢弃。
取消
异步Rust的取消操作,是通过放弃future来执行的。回顾“深入异步”章节,异步Rust使用future实现,而future是惰性的,操作只有被轮询时才会进行。如果如果future被丢弃,由于所有相关状态都已删除,因此无法进行操作。
也就是说,有时异步操作会生成后台任务,或启动在后台运行的其他操作。例如,在上面的示例中,生成了一个任务以发送消息。通常,任务将执行一些计算以生成值。
Future或其他类型可以实现Drop
以清理后台资源。Tokio
的oneshot::Receiver
通过向Sender
端发送一条关闭消息来实现Drop
。发送端可以收到此消息并取消进行中的操作。
1 | use tokio::sync::oneshot; |
future
的实现
为了更好的理解select!
是如何工作的,让我们看看一个假想的Future
实现会是什么样子。这是一个简化版本,现实中的select!
还包含其他功能,例如随机选择首先轮询的分支。
1 | use tokio::sync::oneshot; |
MySelect
future包含两个分支的future。轮询MySelect
时,将轮询第一个分支。若就绪,则使用该值,MySelect
完成。在.await
收到future的输出后,future将被丢弃,这导致两个分支的futures均被删除。由于一个分支没有完成,操作实际上被取消了。
回忆上一节的内容:
MySelect
实现中并没用显式使用Context
参数,而是通过将cx
传给内部的future来满足唤醒器的要求。通过仅在从内部future收到Poll::Pending
时才返回Poll::Pending
,来使内部future也满足唤醒器的要求,MySelect
也满足唤醒器的要求。
语法
select!
宏可以处理多个分支,目前的限制为64个分支。每个分支的结构如下:
1 | <pattern> = <async expression> => <handler>, |
当select宏求值时,所有<async expression>
都会聚合并同时执行。当表达式完成时,结果与<pattern>
匹配,若结果匹配,则取消所有剩余的异步表达式并执行<handler>
。<handler>
表达式可以访问<pattern>
绑定的变量。
<pattern>
的基本用法是给一个变量名赋值,即异步表达式的结果绑定到该变量名,使<handler>
可以访问该变量。这就是为什么在上面的示例中,val
用于<pattern>
,<handler>
能够访问val
。
如果<pattern>
与异步计算的结果不匹配,则剩余的异步表达式将继续并发执行,直到下一个完成。这时,上面的逻辑继续应用作用于该结果。
select!
中可以使用任意异步表达式,因此可以定义更复杂的选择计算。
下面的例子中,我们在oneshot
通道的输出和TCP
连接上进行选择操作。
1 | use tokio::net::TcpStream; |
下面的例子中,我们在oneshot
通道和TcpListener
socket间进行选择操作。
1 | use tokio::net::TcpListener; |
循环将一直运行,直到遇到错误,或rx
收到值。_
模式表示我们对异步计算的返回值不感兴趣。
返回值
tokio::select!
宏返回<handler>
表达式求值的结果。
1 | async fn computation1() -> String { |
因此,每个分支的<handler>
表达式的求值结果应为相同的类型。如果某个select!
表达式不需要输出,最好将表达式的求值结果写为()
。
错误
使用?
运算符从表达式中传播错误,具体的行为取决于?
是用在异步表达式(<async expression>
)中,还是用在处理程序(<handelr>
)中。在异步表达式中使用?
会将错误传播到异步表达式之外,这使得异步表达式的输出为Result
类型;在处理程序中使用?
会立刻将错误传播到select!
表达式之外。让我们再看一遍上面接受socket循环的例子:
1 | use tokio::net::TcpListener; |
注意listener.accept().await?
,?
运算符将错误从该表达式传播到res
绑定变量。出现错误时,res
将被设置为Err(_)
。处理程序中再次使用了?
运算符,res?
语句将错误传播到main
函数之外。
模式匹配
回忆select!
宏的分支定义语法:
1 | <pattern> = <async expression> => <handler>, |
到目前为止,我们只在<pattern>
处用了变量绑定。实际上,我们可以使用任何Rust模式匹配。例如,假设我们从多个MPSC通道接收数据,就可以这样写:
1 | use tokio::sync::mpsc; |
在这个例子中,select!
表达式等待从rx1
和rx2
接收值。如果一个通道关闭,recv()
返回None
,则无法满足匹配,此分支将失效。于是select!
表达式继续等待剩余的分支。
注意这个select!
表达式包含一个else
分支,即select!
表达式必须返回一个值。使用模式匹配时,可能没有分支能够匹配其的关联模式,如果发生这种情况,则由else
分支求值。
借用
当生成任务时,产生异步表达式必持有其所需数据的所有权,而select!
宏并没有这种限制,每个分支的异步表达式可以借用数据且同时运行。遵循Rust的借用规则,多个异步表达式可以同时对某块数据进行不可变借用,或者,仅一个异步表达式可以对某块数据进行可变借用。
下面的例子中,我们同时将相同的数据发送到两个不同的TCP目的地址。
1 | use tokio::io::AsyncWriteExt; |
两个异步表达式都使用了data
变量这一不可变借用。当其中一个操作成功完成时,另一个操作将被丢弃。因为我们需要匹配Ok(_)
,如果一个表达式失败,另一个则将继续执行。
当执行到每个分支的<handler>
时,select!
保证只会有一个<handler>
执行。因此,每个<handler>
可以对同一块数据进行可变借用。
下面的例子中,两个处理程序都尝试对out
进行修改:
1 | use tokio::sync::oneshot; |
循环
select!
宏经常用在循环中,本节将介绍一些例子,以展示在循环中使用select!
宏的常见方法。第一个例子是在多个通道上进行选择:
1 | use tokio::sync::mpsc; |
上面的例子在三个通道的接收端上选择,当任一通道的接收端收到信息,就输出到STDOUT。当一个通道关闭时,recv()
返回None
,由于模式匹配,select!
宏将继续等待其余通道。当所有通道关闭时,else
分支将求值从而终止循环。
select!
宏以随机的方式决定首先检查哪个分支是否就绪,当多个通道的值均未就绪时,将随机选择一个通道接受信息。这是为了处理接收循环的处理速度,慢于消息推入通道的速度的情况,这意味着通道可能被填满。如果select!
首选检查的分支不是随机的,而是每次循环都首选检查rx1
,若rx1
始终都有一条新消息,则其他通道将永远不会被检查。
如果选择时! 进行评估,多个通道有未决消息,只有一个通道弹出一个值。 所有其他频道都没有受到影响,他们的消息一直在这些频道中,直到下一次循环迭代为止。 没有消息丢失。
恢复一个异步操作
下面的例子将展示如何在多个select!
调用间执行同一个异步操作。在此示例中,我们有一个类型为i32
的MPSC通道和一个异步函数。我们希望一直运行异步函数,直到任务完成,或在通道上接收到一个偶数。
1 | async fn action() { |
注意这里是如何再循环外,而不是在select!
宏内调用action()
的。aciont()
的返回值被赋给operation
,且不执行.await
。然后我们在operation
上调用tokio:pin!
。
在select!
循环内,我们传入&mut operation
,而不是operation
。operation
变量正在跟踪执行中的异步操作,循环的每次迭代都使用同一个变量,而不是调用一个新的action()
。
另一个select!
分支从通道接收数据,如果是一个偶数,则终止循环,否则再次select!
。
此处是我们第一次使用tokio::pin!
,这里我们并不介绍关于内存固定的细节,只需要注意,如果要.await
一个引用,被引用的值必须是被固定的,或是实现了Unpin
的。
如果我们删除tokio::pin!
一行并尝试编译,就会收到以下错误:
1 | error[E0599]: no method named `poll` found for struct |
虽然我们在上一章介绍了Future
,但仍然不能明白这个错误。如果你尝试在一个引用上调用.await
时,遇到了这种关于Future
未被实现的错误,那么你可能需要固定该future。
修改一个分支
再看一个稍微复杂一点的循环,其中有:
- 一个
i32
类型的通道。 - 在
i32
值上执行的异步操作。
我们想要实现的逻辑是:
- 在通道上等待一个偶数。
- 使用该偶数作为输入以启动异步操作。
- 等待操作完成,但同时监听通道上的其他偶数。
- 如果在现有操作完成之前收到新的偶数,则终止现有操作并使用新偶数重新开始。
1 | async fn action(input: Option<i32>) -> Option<String> { |
我们使用与上一个例子类似的策略,在循环外调用异步函数并将结果赋给operation
,再将operation
变量固定。循环中的选择操作同事在operation
上与通道接收端上进行。
留意action
是如何将Option<i32>
作为参数的。在我们收到第一个偶数之前,我们需要将operation
实例化,此处通过接收一个Option
并返回一个Option
来创建action
,而如果传入为None
则返回也为None
。第一次循环迭代,operation
返回None
后立即完成。
例子中还使用了一些新的语法,第一个分支中的, if !done
,即设置分支的前置条件。在解释它是如何工作之前,让我们看看如果删掉前置条件会发生什么。删除, if !done
再运行例子会产生以下输出:
1 | thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55 |
在operation
完成后再次尝试调用就会发生此错误。通常,在使用.await
时,被等待的值会被使用掉,而在这个例子中,我们等待的是一个引用。这意味着operation
在完成后仍然存在。
为避免这种panic,我们必须要在操作完成后禁用第一个分支,此处done
变量用于跟踪操作是否完成。一个select!
分支可能包含一个前置条件,select!
在等待分支之前会检查这个前提条件,如果前置条件求值为false
,则分支被禁用。done
变量初始化时为false
,当操作完成时,done
设置为true
,在下一个循环迭代中将禁用operation
分支。当从通道接收到的消息为偶数时,operation
将被重置,done
也将置为false
。
单任务并发
tokio::spawn
和select!
都可以运行异步并发操作,但是,它们运行并发操作时使用的策略并不相同。tokio::spawn
函数接受一个异步操作后生成一个新任务以运行该操作,即Tokio运行时调度的对象是任务。Tokio分别安排了两个不同的任务,它们可能运行在不同的系统线程上,因此,生成任务与生成线程具有相同的限制:禁止借用。
而select!
宏在同一任务上同时运行所有分支。由于select!
宏的所有分支在同一个任务上执行,它们永远不会同时运行。即slect!
宏在单个任务上复用了异步操作。
流
流,就是一系列异步值,是Rust的std::iter::Iterator
的异步版本,用Stream
trait表示。在异步函数中可以迭代遍历流,也可以使用适配器转换流。Tokio在StreamExt
trait上提供了许多通用适配器。
Tokio在一个独立的crate中提供流支持:tokio-stream
。
1 | tokio-stream = "0.1" |
迭代
目前,Rust语言还不支持异步for
循环,代替方案是使用while let
循环配合StreamExt::next()
在流上进行迭代。
1 | use tokio_stream::StreamExt; |
与迭代器类似,next()
方法返回Option<T>
,T
为流内值的类型,若收到None
则表示流迭代终止。
Mini-Redis广播
让我们来再看一个使用Mini-Redis客户端的稍微复杂的例子。
你可以在这里找到完整的代码。
1 | use tokio_stream::StreamExt; |
生成一个任务以在Mini-Redis服务器的“number”频道上发布消息。然后,在主任务中订阅“numbers”频道并打印收到的消息。
订阅后,在返回的订阅者上调用into_stream()
。如此将消耗掉该Subscriber
,并返回一个能够在消息到达时产生消息的流。请注意,在开始迭代消息之前,我们用tokio::pin!
将流固定在栈上,因为在流上调用next()
需要固定该流。into_stream()
函数返回一个未固定的流,我们必须显式固定后才能迭代它。
固定值的一个关键属性是可以将指针指向固定数据,并且调用者可以确信指针保持有效。 async/await 使用此功能来支持跨 .await 点借用数据。
如果我们忘记固定流,就会看到类似下面的错误:
1 | error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned |
如果你遇到类似上面这样的错误,请尝试固定该值。
在运行上面的代码之前,先启动Mini-Redis服务端:
1 | mini-redis-server |
再尝试运行代码,我们就会在STDOUT中看到如下输出:
1 | got = Ok(Message { channel: "numbers", content: b"1" }) |
由于订阅和发布之间存在竞争,一些早期消息可能会被丢弃。该程序永远不会退出,只要服务端保持活动状态,对Mini-Redis频道的订阅就会保持活动状态。
让我们看看如何使用流来扩展这个程序。
适配器
接受一个Stream
并返回另一个Stream
的函数通常称为“流适配器”,因为这属于“适配器模式”。常见的流适配器包括map
、take和filter。
更新Mini-Redis以使其可以退出。我们希望收到三个消息后就停止遍历。这可以使用take
完成,此适配器限制流最多只能产生n
条消息。
1 | let messages = subscriber |
再次运行程序,输出为:
1 | got = Ok(Message { channel: "numbers", content: b"1" }) |
而这一次程序执行完毕后退出。
现在,让我们将流中的信息限制为一位数,我们将通过检查消息长度来实现这一点。这次使用filter
适配器,以丢弃任何与此断言不匹配的消息。
1 | let messages = subscriber |
再次运行程序,输出为:
1 | got = Ok(Message { channel: "numbers", content: b"1" }) |
请注意,适配器的应用顺序很重要。先调用filter
再调用take
与先调用take
再调用filter
是不同的。
最后,我们将整理输出,即删除输出中的Ok(Message { ... })
部分,这次使用map
。因为这是在filter
之后应用的,我们知道消息是Ok
,因此可以使用unwrap()
。
1 | let messages = subscriber |
此时的输出为:
1 | got = b"1" |
另一种选择是使用filter_map
将filter
和map
合并在一个调用中。
还有更多可用的适配器,请查看此列表。
实现Stream
Stream
trait与Future
trait非常相似。
1 | use std::pin::Pin; |
Stream::poll_next()
函数很像Future::poll
,不同在于你可以重复调用它,以从流中接收许多更多值。正如我们在深入异步中看到的那样,当流的返回值尚未就绪时,将返回Poll::Pending
,此时任务的唤醒器已注册,一旦要再次轮询流,唤醒器就会收到通知。
size_hint()
方法的使用方式与其在迭代器上的使用方式相同。
通常,在手动实现Stream
时,是通过组合futures和其他流来完成的。作为一个例子,让我们再次以深入异步中实现的Delay
future为基础构建。这次实现为以10毫秒为间隔产生三次()
的流。
1 | use tokio_stream::Stream; |
async-stream
使用Stream
特性手动实现流可能很乏味。然而,Rust尚不支持使用async/await
语法定义流,该特性正在实现,只是尚未完成。
可将async-stream
crate当作临时解决方案。该crate提供了一个stream!
宏可以将输入转换为流。使用该crate,上面的interval可以这样实现:
1 | use async_stream::stream; |