使用Rust编写操作系统 - 4.1 - Async/Await
本文将探讨协作式多任务处理以及Rust的async/await特性。我们将详细研究async/await在Rust中的工作方式,包括Future
trait的设计,状态机转换和pinning(译注:内存固定)。然后,我们通过创建异步键盘任务和基本执行器,使得内核具备对async/await的基本支持。
这个博客是在GitHub上公开开发的。如果你有任何问题或疑问,请在那里开一个issue。你也可以在底部留言。这篇文章的完整源代码可以在post-12分支中找到。
多任务
多任务处理是大多数操作系统的基本特征之一,这是一种能够同时执行多个任务的功能。例如,在看这篇文章时,你可能会打开其他程序,例如文本编辑器或终端窗口。即使只打开一个浏览器窗口,也可能会有各种后台任务来管理桌面窗口,检查更新或为文件建立索引。
尽管所有任务看上去都是并行运行的,但实际在CPU内核上一次只能执行一个任务。为了产生任务可以并行运行的错觉,操作系统会在活动任务之间快速切换,以使得每个任务都可以有一些进展。由于计算机速度很快,因此大多数时候我们不会注意到这些切换。
单核CPU一次只能执行一个任务,而多核CPU可以真正并行地运行多个任务。例如,具有8个内核的CPU可以同时运行8个任务。我们将在以后的文章中解释如何设置多核CPU。在本文中,为简单起见,我们将重点介绍单核CPU。(值得注意的是,所有多核CPU都仅从单个活动核启动,因此我们目前仍然可以将它们视为单核CPU。)
多任务处理有两种形式:协作式多任务要求任务定期放弃对CPU的控制,以便其他任务可以继续执行。抢占式多任务使用操作系统功能通过强行暂停线程,以在任意时间点切换线程。在下文中,我们将更详细地探讨多任务的两种形式,并讨论它们各自的优缺点。
抢占式多任务
抢占式多任务处理的思路是使用操作系统控制何时切换任务,思路利用了操作系统在每个中断上可以重获CPU控制权这一机制。这样,只要系统有新输入可用,就可以切换任务。例如,当鼠标移动或网络数据包到达时,就可以执行任务切换。操作系统还可以通过配置硬件计时器在到达特定时间时发送中断,来确定允许任务运行的确切时间。
下图说明了硬件中断上的任务切换过程:
在第一行中,CPU正在执行程序A
的任务A1
,所有其他任务均被暂停。在第二行中,硬件中断到达CPU。就像硬件中断一文中介绍的那样,CPU立即停止执行任务A1
,并跳转到中断描述符表(IDT)中定义的中断处理程序。通过中断处理程序,操作系统现在可以再次控制CPU,从而使它可以切换到任务B1
,而不是继续执行任务A1
。
状态保存
鉴于任务会在任意时间点中断,这些任务可能正处于某些计算中。为了能够在以后恢复这些任务,操作系统必须备份任务的整个状态,包括其调用栈和所有CPU寄存器的值。此过程称为上下文切换。
由于调用栈可能会非常大,操作系统通常会为每个任务设置单独的调用栈,而不是在每次任务切换时备份调用栈的内容。具有单独调用栈的此类任务称为执行线程或简称线程。通过为每个任务设置单独的栈,上下文切换时仅需要保存寄存器的内容即可(包括程序计数器和栈指针)。这种方法可以最大程度地减少上下文切换的性能开销,这非常重要,因为上下文切换通常能够到达每秒100次。
讨论
抢占式多任务处理的主要优点是操作系统可以完全控制任务的执行时间。这样即使不要求任务间的相互协作,也能够确保公平分配各任务的CPU时间。在运行第三方任务时或在多个用户共享系统时,这一点尤其重要。
抢占式的缺点是每个任务都需要自己的调用栈。与共享调用栈相比,这会导致每个任务的内存使用量增加,并且通常会限制系统中的任务的总量。抢占式的另一个缺点是,即使任务仅使用了一小部分寄存器,操作系统在每个任务切换时仍必须保存全部的CPU寄存器状态。
抢占式多任务处理和线程是操作系统的基本要素,因为它们能够使操作系统能够运行不受信任的用户空间程序,我们将在以后的文章中详细讨论这些概念。在本文中,我们将专注于协作式多任务处理,这种方式也能够为内核提供有用的多任务功能。
协作式多任务
协作多任务处理并不是在任意时间点强行暂停正在运行的任务,而是让每个任务运行到自愿放弃对CPU的控制为止。这使任务可以在方便的时间点暂停自己,比如当该任务需要等待I/O操作时。
合作多任务通常使用在语言层,例如以协程或async/await的形式。该思路是由程序员或编译器都将yield操作插入到程序中,从而使该任务放弃对CPU的控制以允许其他任务运行。在复杂循环的每次迭代之后都可以插入一个yield。
协作式多任务通常与异步操作一起使用。与抢占式的一直等待直到操作完成并在期间阻止其他任务的执行,在协作式中若操作尚未完成,则异步操作将返回一个“未就绪”状态。在这种情况下,等待的任务可以执行一个yield操作以允许其他任务运行。
状态保存
由于任务会为自己定义暂停点,因此它们不需要操作系统来保存其状态。与抢占式相比,协作式可以在暂停之前只保存用以在继续时恢复操作的必要的寄存器状态,这通常可以提高性能。例如,刚完成复杂计算的任务可能只需要备份计算的最终结果,因为此时中间结果已经不再需要了。
由语言支持实现的协作任务甚至可以在暂停之前只备份调用堆栈的所需部分。例如,Rust的async/await实现会将所有需要的局部变量存储在自动生成的结构体中(见下文)。通过在暂停之前备份调用堆栈的相关部分,所有任务可以共享一个调用堆栈,从而减少了每个任务的内存消耗。这样就可以创建几乎无限的协作任务,而不用担心内存耗尽。
讨论
协作式多任务处理的缺点是,一个非协作的任务可能会无限期地运行下去。那么,恶意程序或有bug的任务可能会阻止其他任务的运行,并拖慢甚至阻塞整个系统。因此,当且仅当所有任务都支持协作时才能使用协作式多任务处理。而让操作系统依赖于任意用户级程序的协作并不是一个好主意。
但是,协作多任务的强大性能和内存优势,使其成为在程序内部使用的好方法,特别是与异步操作结合使用时。由于操作系统内核需要与异步硬件交互,性能至关重要,因此协作多任务似乎是一个实现并发的好方法。
Rust的async/await
Rust语言以async/await
的形式为协作式多任务提供了原生支持。在探讨什么是async/await及其工作原理之前,我们需要了解Rust中的futures和异步编程是如何工作的。
Futures
Future表示一个可能尚不可用的值。例如,这个值可能是由另一个任务计算的整数,也可能是从网络下载的文件。Future并不需要一直要等待到该值可用,它可以继续执行其他代码直到需要用到该值为止。
示例
最好用一个小例子说明futures的概念:
此序列图展示了一个main
函数,它从文件系统读取文件,然后调用foo
函数。图中以不同的方式执行两次此过程:一次是调用同步函数read_file
,一次是调用异步函数async_read_file
。
在同步调用中,main
函数需要一直等待,直到该文件从文件系统中加载完毕。只有这样,它才能继续调用foo
函数,并再次等待其执行结果。
通过异步async_read_file
调用,文件系统直接返回一个future并在后台异步加载文件。这使main
函数可以更早地调用foo
函数,然后foo
函数与文件加载并行运行。在此示例中,文件加载甚至在foo
返回之前就完成了,因此main
无需继续等待foo
返回,可以直接处理文件。
Rust中的Futures
在Rust中,futures由Future
trait表示,其定义如下:
1 | pub trait Future { |
关联类型Output
指定异步返回值的类型。例如,上图中的async_read_file
函数将返回一个Output
设置为File
的Future
实例。
调用poll
方法可以检查该值是否已经可用。它将返回一个枚举Poll
,如下所示:
1 | pub enum Poll<T> { |
如果该值已经可用(例如,已从磁盘中读取完整的文件),则将其封装在Ready
变量中返回。否则,将返回Pending
变量,以通知调用方该值尚不可用。
poll
方法采用两个参数:self: Pin<&mut Self>
和cx: &mut Context
上下文。前者的行为类似于普通的&mut self
引用,不同之处在于,Self
值pinned在一个固定的内存位置上。如果不先了解async/await的工作原理,就很难理解Pin
以及为什么需要Pin
。因此,我们将在后文中进行详细解释。
cx: &mut Context
参数用于将一个Waker
实例传递给异步任务,例如加载文件系统。这个Waker
允许异步任务发信号通知任务(或部分任务)已完成,例如该文件已从磁盘加载。由于主任务知道将在Future
就绪时将会收到通知,因此就不需要一遍又一遍地调用poll
了。我们会在后文中实现自己的waker类型,届时将更加详细地说明此过程。
使用Futures
现在,我们知道了如何定义future,了解了poll
方法的基本思路。但是,我们仍然不知道如何有效地使用futures。由于futures代表异步任务的结果,可能尚不可用。但实际上,我们经常立即需要这些值以进行进一步的计算。所以问题是:在需要时如何有效地检索一个future的值?
等待Futures
一种可能的答案是一直等待到future就绪为止。该过程看起来像这样:
1 | let future = async_read_file("foo.txt"); |
在上面的代码中,我们通过循环中积极地调用poll
来等待future完成。在这里poll
的参数并不重要,因此已被省略。尽管此方法可行,但是效率很低,因为包含poll的循环会一直用占用CPU直到该值可用为止。
一种较为高效的方法是阻塞当前线程,直到future可用为止。当然,这只有在有线程支持的情况下才可行,因此并不适用于我们的内核,至少目前还不行。即使在支持阻塞的系统上,通常也不会这么做,因为阻塞会将异步任务再次变为同步任务,从而失去了并行任务的性能优势。
Future组合器
另一种等待方式是使用future组合器。Future组合器是类似于map
的方法,它允许将futures链接并组合在一起,就像在Iterator
上做的那样。这些组合器不等待future,而是自己返回future,即为poll
应用了map操作。
下面举个简单的例子,一个用于将Future<Output = String>
转换为Future<Output = usize>
的string_len
组合器可能看起来像这样:
1 | struct StringLen<F> { |
该代码无法正常工作,因为尚未处理pinning,不过作为例子已经足够了。基本思路是string_len
函数将给定的实现了Future
trait的实例封装到新的StringLen
结构体中,而该结构体也实现了Future
trait。当poll封装的future时,即是poll其内部的future。如果该值尚未就绪,封装的future也将返回Poll::Pending
。如果该值已就绪,则从Poll::Ready
变量中获取字符串,并计算其长度。最后再将其封装在Poll::Ready
中返回。
通过string_len
函数,我们不需要等待一个异步字符串,就可以计算其长度。由于该函数也返回Future
,因此调用者无法直接操作返回的值,而需要再次使用组合器函数。这样,整个调用过程就变为异步的了,我们可以高效地在某个时刻一次等待多个future,例如 在main
函数上。
手动编写组合器函数比较困难,所以它们通常由库直接提供。尽管Rust标准库本身还没有提供官方组合器方法,但半官方(兼容no_std
)的future
crate可以。其FutureExt
特性提供了诸如map
或then
之类的高级组合器方法,可用于任意闭合操作结果。
优点
Future组合器最大的优点是能够使操作保持异步。这种方法和异步I/O
接口结合使用时性能非常高。实际上future组合器将被实现为具有trait的普通结构体,以使编译器能够对其做出进一步优化。有关更多详细信息,请参阅Rust的零成本future一文,就是这篇文章宣布了在Rust生态系统中添加future。
缺点
尽管future组合器可以编写出非常高效的代码,但由于类型系统和基于闭包的接口的限制,组合器可能会在某些情况下变得难以使用。例如,考虑下面的代码:
1 | fn example(min_len: usize) -> impl Future<Output = String> { |
代码先读取文件foo.txt
,然后使用then
组合器根据文件内容链接第二个future。如果内容长度小于给定的min_len
,我们将读取另一个bar.txt
文件,然后使用map
组合器将其附加到content
中。否则,只返回foo.txt
的content
。
我们需要在传递给then
的闭包上使用move
关键字,否则min_len
变量会产生生命周期错误。使用Either
封装的原因是让if
块和else
块始终具有相同的类型。由于我们在块中返回了不同的future类型,因此必须使用封装类型将它们统一为一个类型。ready
函数将立刻就绪的值封装到future中。这里需要使用该函数是因为Either
封装要求值实现Future
trait。
您可以想象,这种用法很快就会导致大型项目的代码变得非常复杂。尤其是再涉及借用和生命周期,就会变得更加复杂。因此,为了使异步代码从根本上更易于编写,我们投入了大量工作来为Rust添加对async/await的支持。
Async/Await模式
Async/Await的思路是让程序员以编写看起来像同步代码的方式编写异步代码,只不最后是由编译器将同步代码转换为异步代码。它基于两个关键字async
和await
。在函数签名中使用async
关键字,就可以将同步函数转换为一个返回future的异步函数:
1 | async fn foo() -> u32 { |
仅使用此关键字并没有那么有用。但是,在异步函数内部,可以使用await
关键字来取回future的异步值:
1 | async fn example(min_len: usize) -> String { |
将上面使用组合器实现的example
函数直接转换为async/await模式:使用.await
运算符就可以取回future的值,无需使用闭包或Either
类型。如此,我们就可以像编写普通的同步代码一样编写异步代码。
状态机转换
在这种场景中,编译器的作用就是将async
函数体转换为一个状态机,每个.await
调用代表一个不同的状态。对于上面的example
函数,编译器创建具有以下四个状态的状态机:
不同状态代表该函数的不同暂停点。”Start“和”End“状态代表函数在其执行的开始和结束时的状态。”Waiting on foo.txt“状态表示该函数目前正在等待第一个async_read_file
的结果。同样的,”Waiting on bar.txt“状态表示函数在等待第二个async_read_file
的结果的暂停点。
状态机通过将每个poll
调用都变为一个可能的状态转换来实现Future
trait:
图中使用箭头表示状态开关,并使用菱形表示条件路径。例如,如果foo.txt
文件尚未准备好,则采用标记为”no“的路径,并达到”Waiting on foo.txt“的状态。否则,就采用标记为“yes”路径。没有字的红色小菱形代表example
函数中if content.len() < 100
的条件分支。
我们看到第一个poll
调用启动了该函数并使它运行,直到遇到一个尚未就绪的future。如果路径上的所有future都已就绪,则该函数可以一直运行到”End“状态,并返回封装在Poll::Ready
中的结果。否则,状态机将进入等待状态并返回Poll::Pending
。然后在下一个poll
调用中,状态机从上一个等待状态开始重试其最后一次操作。
状态保存
为了能够从上一个等待状态中恢复,状态机必须在内部跟踪当前状态。此外,它还必须保存在下一个poll
调用中恢复执行所需的变量。这就是编译器真正发挥作用的地方:由于编译器知道在何时要使用哪些变量,因此它可以自动生成具有所需变量的结构体。
作为示例,编译器为上面的example
函数生成类似于下面这样的结构体:
1 | // 这是async版的`example`函数 |
在”Start“和”Waiting on foo.txt“状态下,需要存储min_len
参数,因为稍后与content.len()
做比较时需要使用该参数。”Waiting on foo.txt“状态还存储了一个foo_txt_future
,用来表示async_read_file
调用返回的future。状态机继续运行时会再次poll该future,因此需要将其保存。
“Waiting on bar.txt“状态包含content
变量,是因为在bar.txt
就绪后需要使用该变量进行字符串连接。该状态还存储了一个bar_txt_future
,用来表示正在加载中的bar.txt
。该结构体不包含min_len
变量,因为在content.len()
比较之后就不再需要该变量了。在”End“状态下,没有存储任何变量,因为此时函数已经运行完毕。
请记住,这只是编译器可能生成的代码的一个示例。结构体名称和字段布局是实现细节,可能会有所不同。
全状态机类型
尽管编译器生成的确切代码是实现细节,但这个示例还是有助于我们理解并想象example
函数生成的状态机可能的样子。我们已经定义了代表不同状态的结构体,并给出了其中包含的所需变量。为了基于这些结构体创建一个状态机,我们可以将它们组合成一个枚举:
1 | enum ExampleStateMachine { |
我们为每个状态定义一个单独的枚举变量,并将对应状态的结构体作为字段添加到每个变量。为了实现状态转换,编译器根据example
函数实现Future
trait:
1 | impl Future for ExampleStateMachine { |
该Future
的Output
类型为String
,即example
函数的返回类型。为了实现poll
函数,我们在loop
内的当前状态上使用match
语句。思路是我们尽可能长时间地切换到下一个状态,并在无法继续时显式的使用return Poll::Pending
。
为简单起见,这里仅给出简化的代码,且暂不处理pinning、所有权、生命周期等内容。因此,这里的代码和下面的代码应被看做伪代码,不能直接使用。当然,真正的编译器生成的代码可以正确处理所有内容,尽管可能使用了与我们不同的方式。
为了使示意的代码更简洁,我们将分别显示每个匹配分支的代码。从”Start”状态开始:
1 | ExampleStateMachine::Start(state) => { |
当状态机处于Start
状态时,其对应位置正是函数体的最开始。在这种情况下,我们将执行example
函数体中的所有代码,直到遇到第一个.await
。为了处理.await
操作,我们将self
状态机的状态修改为WaitingOnFooTxt
,并令状态中包含WaitingOnFooTxtState
结构体。
由于match self {…}
语句是在循环中执行的,因此该执行将跳至下一个分支WaitingOnFooTxt
:
1 | ExampleStateMachine::WaitingOnFooTxt(state) => { |
在这一匹配分支中,我们首先调用foo_txt_future
的poll
函数。如果尚未就绪,则退出循环并返回Poll::Pending
。由于在这种情况下self
仍位于WaitingOnFooTxt
状态,因此状态机的下一次poll
调用也将进入相同的匹配分支并重试foo_txt_future
。
当foo_txt_future
就绪时,我们将结果赋给content
变量,然后继续执行example
函数的代码:如果content.len()
小于状态结构体中保存的min_len
,则异步读取bar.txt
文件。我们再次将.await
操作转换为状态更改,而这次应转换为WaitingOnBarTxt
状态。由于我们是在循环内执行匹配,因此下一轮循环将直接跳转到新状态的匹配分支,然后在该状态下pollbar_txt_future
。
如果我们进入else
分支,则不会进行进一步的.await
操作。此时已到达函数的结尾,并将content
封装在Poll::Ready
中返回。我们还需要将当前状态更改为End
状态。
WaitingOnBarTxt
状态的代码如下所示:
1 | ExampleStateMachine::WaitingOnBarTxt(state) => { |
与WaitingOnFooTxt
状态类似,我们从pollbar_txt_future
开始。如果仍未就绪,则退出循环并返回Poll::Pending
。否则,我们就执行example
函数的最后一个操作:用content
变量与future的结果做字符串连接。我们将状态机更新为End
状态,然后将结果封装在Poll::Ready
中返回。
最后,End
状态的代码如下所示:
1 | ExampleStateMachine::End(_) => { |
Future返回Poll::Ready
后就不应再被poll了,因此,当我们已经处于End
状态时,如果再次调用poll
,就产生一个panic。
现在我们知道了编译器可能会生成怎样的状态机,以及怎样去给状态机实现Future
trait。但实际上,编译器会以不同的方式生成代码。(如果您感兴趣的话,该实现目前基于生成器,不过这只是实现细节。)
最后一步是为example
函数本身生成代码。记住,函数签名是这样定义的:
1 | async fn example(min_len: usize) -> String |
由于现在整个函数体是由状态机实现的,因此该函数唯一需要做的就是初始化状态机并将其返回。为此生成的代码如下所示:
1 | fn example(min_len: usize) -> ExampleStateMachine { |
该函数不再使用async
修饰符,因为它现在显式返回一个实现了Future
trait的ExampleStateMachine
类型。如预期的那样,状态机被初始化为Start
状态,并且使用min_len
参数初始化了对应的状态结构体。
请注意,此函数并不会直接启动状态机。这是Rust中future的一个基本设计决策:在第一次被poll之前什么也不做。
Pinning
在这篇文章中,我们已经遇到pinning很多次了。现在终于是时候看看究竟什么是pinning以及为什么需要pinning了。
自引用结构体
如上所述,状态机转换将每个暂停点的局部变量存储在结构体中。对于像example
函数这样的小例子就很简单,并不会导致任何问题。但是,当变量相互引用时,事情就会变得困难。例如,考虑以下函数:
1 | async fn pin_example() -> i32 { |
该函数创建一个包含1
、2
、3
的小数组。然后再创建对最后一个数组元素的引用,存储在element
变量中。接下来,该函数将转换为字符串的数字异步写入到foo.txt
文件中。最后返回由element
引用的数字。
由于该函数使用了一个await
操作,因此结果状态机具有三种状态:”Start”、”End”、”Waiting on write”。该函数没有参数,因此开始状态对应的结构体为空。像以前一样,结束状态对应的结构体也为空,因为该函数此时已完成。而”Waiting on write”状态对应的结构体就很有趣:
1 | struct WaitingOnWriteState { |
我们需要存储array
和element
变量,因为返回值需要element
,而element
又引用了array
。由于element
是一个引用,因此它存储指向所引用元素的指针(即内存地址)。我们在这里使用0x1001c
作为示例存储地址。而实际上,element
字段必须是array
字段最后一个元素的地址,所以这与该结构体在内存中的位置有关。由于此类结构体对自身的某些字段做了引用,因此这种具有内部指针的结构体也叫做自引用结构体。
自引用结构体的问题
自引用结构的内部指针将导致一个很基本的问题,尤其是在查看其内存布局时会更加明显:
array
字段从地址0x10014
开始,element
字段从地址0x10020
开始。它指向地址0x1001c
,即最后一个数组元素的地址。至此仍没什么问题。但是,当我们将此结构体移动到其他内存地址时,就会发生问题:
稍微移动该结构体,现在使其从地址0x10024
开始。这是有可能的,例如当我们将该结构体作为函数参数传递时,或是将其分配给其他栈变量时。问题在于,尽管最后一个数组元素现已位于地址0x1002c
,但element
字段仍指向地址0x1001c
。于是指针悬空,结果就是在下一个poll
调用中发生未定义的行为。
可行的解决方案
有三种解决指针悬空问题的基本方法:
- 在移动时更新指针:思路是每当结构体在内存中移动时都更新其内部指针,以使该指针在移动后仍然有效。不幸的是,这种方法需要对Rust进行大量更改,并可能导致巨大的性能损失。因为如果实现这种方法,就需要某种运行时持续跟踪结构体中各种类型的字段,并在每次移动发生时检查是否需要更新指针。
- 存储偏移量而不是自引用:为了不去更新指针,编译器可以尝试将自引用存储为从结构体开始地址算起的偏移量。例如,上面的
WaitingOnWriteState
结构的element
字段可以以element_offset
字段的形式存储,其值为8,因为该引用指向的数组元素位于该结构体的起始地址后的第8个字节处。由于在移动结构时偏移量保持不变,因此不需要字段更新。
问题是,如果实现这种方法,就必须让编译器检测所有自引用。这在编译时是不可能的,因为引用的值可能取决于用户输入,因此这又需要使用运行时系统来分析引用,从而正确地创建状态结构体。这不仅会导致运行时成本增加,而且还会阻止某些编译器优化,从而又会导致较大的性能损失。 - 禁止移动结构体:如上所示,仅当我们在内存中移动该结构体时,才会出现悬空指针。那么,通过完全禁止对自引用结构的移动操作,就可以避免该问题。这种方法的最大优点是可以实现在类型系统级别,并不会增加运行时成本。不过它的缺点是需要程序员自己处理在可能的自引用结构体上发生的移动操作。
为了遵守提供零成本抽象的原则(这意味着抽象不应产生额外的运行时成本),Rust选择了第三种解决方案。为此,在RFC 2349中提出了pinning API。在下文中,我们将简要概述此API,并说明它将如何与async/await和futures一起使用。
堆上的值
首先,很明显,堆分配的值在大多数情况下已经具有固定的内存地址。这些值是由allocate
调用创建,并由如类似Box<T>
的指针类型进行引用的。尽管这种指针类型可以移动,但指针所指向的堆值将始终位于相同的内存地址中,除非调用deallocate
将其释放否则地址将一直不变。
使用堆分配,我们可以尝试创建一个自引用结构体:
1 | fn main() { |
我们创建一个名为SelfReferential
的简单结构体,其中包含一个指针字段。首先,我们使用空指针初始化结构体,然后使用Box::new
将其分配到堆上。接下来,我们确定分配给堆的结构体的内存地址,并将其存储在ptr
变量中。最后,通过将ptr
变量分配给self_ptr
字段,将结构体变为自引用结构体。
在play rust上执行此代码时,会看到堆值的地址及其内部指针地址相同,这意味着self_ptr
字段是有效的自引用。由于heap_value
变量仅是一个指针,因此移动它(例如,通过将其传递给函数)不会更改结构体本身的地址,因此,即使移动了指针,self_ptr
也依然有效。
但是,还是有一种方法可以破坏这个例子:我们可以移出Box<T>
或替换其内容:
1 | let stack_value = mem::replace(&mut *heap_value, SelfReferential { |
这里我们使用mem::replace
函数,将堆分配内存中的原值替换为一个新结构体实例。这使我们可以将堆分配中的原值heap_value
移动到栈中,而现在该结构体的self_ptr
字段就是一个悬空指针了,它仍然指向旧的堆地址。当你尝试在play rust上运行该代码时,就会看到“value at:”行和“internal reference:”行确实打印了不同的指针地址。因此,堆分配的值并不能完全保证自引用的安全性。
导致上述现象的根本问题是Box<T>
允许我们获取对堆分配值的&mut T
引用。通过&mut
引用,就可以使用诸如mem::replace
或mem::swap
一类的方法来使堆分配的自引用值无效。要解决此问题,我们必须禁止创建对自引用结构的&mut
引用。
Pin<Box<T>>
与Unpin
Pinning API用Pin
封装类型和Unpin
标记trait的方式为&mut T
问题提供了解决方案。其思路是为Pin
类型中所有可用于从Unpin
trait上获取其封装值&mut
可变引用方法(例如get_mut
或deref_mut
)设置关卡。Unpin
trait是自动trait,即对于所有类型(显式声明退出的类型除外)都默认自动实现。而如果让自引用结构体显式声明退出Unpin
,就再也没有(安全的)方法能够从Pin<Box<T>>
类型中获取&mut T
可变引用了。如此便能够保证它们的内部自引用始终有效。
让我们更新上面的SelfReferential
类型以选择取消Unpin
:
1 | use core::marker::PhantomPinned; |
通过为结构体添加类型为PhantomPinned
的第二个字段_pin
来退出trait。这是一个零大小的标记类型,唯一目的就是退出Unpin
trait。鉴于自动trait的工作方式,只需一个非Unpin
的字段就可以使整个结构体退出Unpin
。
第二步是将示例中的Box<SelfReferential>
类型更改为Pin<Box<SelfReferential>>
类型。最简单的方法是使用Box::pin
函数代替Box::new
来创建堆分配的值:
1 | let mut heap_value = Box::pin(SelfReferential { |
除了将Box::new
改为Box::pin
,还需要在结构体初始字段中添加新的_pin
字段。由于PhantomPinned
是零大小的类型,因此我们只需要使用其类型名称即可进行初始化。
现在,当我们尝试在play rust中运行修改后的示例代码,会发现代码有编译错误:
1 | error[E0594]: cannot assign to data in a dereference of `std::pin::Pin<std::boxed::Box<SelfReferential>>` |
发生这两个错误都是由Pin<Box<SelfReferential>>
类型不再实现DerefMut
trait引起的。这正是我们期望的,因为DerefMut
trait会返回一个&mut
引用,而我们希望禁止该引用。这两个错误就是因为我们既退出了Unpin
,又将Box::new
更改为Box::pin
。
现在的问题是,编译器不仅禁止在第16行中移动该类型,而且还禁止在第10行中初始化self_ptr
字段。之所以发生这种情况,是因为编译器无法区分&mut
引用的有效使用与无效使用。为了修复初始化一行的报错,我们必须使用非安全的get_unchecked_mut
方法:
1 | // 此处操作是安全的,因为仅修改了结构体字段,而未进行整个结构体的移动 |
get_unchecked_mut
函数的参数应为Pin<&mut T>
而不是Pin<Box<T>>
,因此我们必须先使用Pin::as_mut
做转换。然后,我们就可以使用get_unchecked_mut
返回的&mut
引用来设置self_ptr
字段了。
现在剩下的错误就是我们期望的mem::replace
一行的错误。请记住,此操作尝试将堆分配的值移动到栈上,这会破坏存储在self_ptr
字段中的自引用。通过退出Unpin
并使用Pin<Box<T>>
,就可以在编译时禁止执行此类操作,从而可以安全地使用自引用结构体。如我们所见,编译器(目前)还不能检测创建的自引用结构体是否安全,因此我们需要使用非安全块并自己确保其正确性。
栈上的pinning与Pin<&mut T>
在上一节中,我们学习了如何使用Pin<Box<T>>
安全地创建堆分配上的自引用结构体。尽管这种方法可以很好地工作并且相对安全(除了非安全的初始化),但方法所要求的堆分配会带来一些性能损失。由于Rust始终希望尽可能的提供零成本抽象,因此pinning API还允许创建指向栈分配值的Pin<&mut T>
实例。
与拥有封装值所有权的Pin<Box<T>>
实例不同,Pin<&mut T>
实例仅临时借用其封装的值。这使事情变得更加复杂,因为它需要程序员自己确保引用带来的附加的条件。最重要的是,在被引用T
的整个生命周期中,Pin<&mut T>
必须保持在pin的状态,这对于基于栈的变量来说会更加难以检查。为了处理这种问题,的确存在像pin-utils
这样的crate,但是除非你真的知道自己在做什么,一般都不建议使用栈上的pinning。
要进一步阅读,请查看pin
模块和Pin::new_unchecked
方法的文档。
Pinning和Future
正如我们在本文中已经看到的那样,Future::poll
方法使用的pinning参数为Pin<&mut Self>
形式:
1 | fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> |
该方法采用self: Pin<&mut Self>
而不是常规&mut self
作参数的原因如前文所述,从async/await创建的future实例通常是自引用的。通过将Self
封装为Pin
,并使编译器令async/await生成的自引用future退出Unpin
,可以确保future在两次poll
调用之间不会在内存中移动。如此便可以确保所有内部引用仍然有效。
值得注意的是,在第一个poll
调用之前,移动future是可行的。这是因为future是惰性的,它在第一次poll之前什么也不做。所以生成状态机的”Start”状态仅包含函数参数,而没有内部引用。为了调用poll
,调用者必须先将future封装到Pin
中,以确保future不会在内存中移动。由于正确的使用栈pinning会更加困难,因此建议始终结合使用Box::pin
与Pin::as_mut
。
如果你有兴趣了解如何使用栈pinning来安全地实现future组合器函数,可以选择查看future
crate中map
组合器方法源码(该方法的源码相对简单),以及阅读有关pin文档中投影和结构的pinning部分。
执行器和唤醒器
使用async/await,可以最大限度的以完全异步的方式使用future。但是,正如我们从上面了解到的那样,future在被poll之前什么都不做。这意味着我们必须在某些时候发起对它们的poll
,否则异步代码将永远不会执行。
对于单个future,我们总是可以使用上述循环手动等待每个future。但是,这种方式效率很低,而且对于那些创建大量future的程序实际上并不可行。针对此问题的最常见解决方案是定义一个全局执行器,以负责轮询系统中的所有future,直到它们全部完成。
执行器
执行器的作用是将future生成为任务,通常是使用一些spawn
方法。然后,执行器负责轮询所有future,直到它们全部完成。集中管理所有future的最大好处是,只要future返回Poll::Pending
,执行器就可以切换到另一个future。所以异步操作可以并行运行,CPU也因此持续作业。
许多执行器的实现也能够利用多核CPU系统的优点。这些实现会创建一个线程池,如果提供足够多的任务,线程池就能利用所有内核,并使用像work stealing之类的技术来平衡核心间的负载。嵌入式系统还会有一些特殊的执行器实现,专门为降低延迟和减少内存开销做了优化。
为了避免重复轮询future的开销,执行器通常还利用Rust的future支持的唤醒器API。
唤醒器
唤醒器API的思路是,将特殊的Waker
类型封装在Context
类型中,传递给每一次poll
调用。这个Waker
类型是由执行器创建的,异步任务可以用该类型表示任务已(部分)完成。于是,除非相应的唤醒器通知执行器,否则执行器都不需要在先前返回Poll::Pending
的future上再进行poll
调用了。
举个小例子可以很好地说明这一点:
1 | async fn write_file() { |
此函数将字符串”Hello”异步写入foo.txt
文件。由于硬盘写入需要一些时间,因此该future上的第一次poll
调用可能会返回Poll::Pending
。硬盘驱动将在内部存储传递给poll
调用的Waker
,并在完成文件写入时使用它通知执行器。如此,执行器在收到唤醒器的通知前,都不需要再在这个future上尝试poll
了。
我们将在本文的实现一节创建具有唤醒器支持的执行器,届时将详细的看到Waker
类型的工作原理。
协作式多任务?
在本文的开头,我们讨论了抢先式多任务和协作式多任务。抢占式多任务依靠操作系统在运行中的任务间做强制切换,而协作式多任务则要求任务定期通过yield操作主动放弃对CPU的控制。协作式的最大优点是任务可以自己保存状态,从而更高效地进行上下文切换,并可以在任务间共享相同的调用栈。
这可能不够直观,不过future和async/await就是协作式多任务模式的实现:
- 添加到执行器的每个future本质上都是一个协作任务。
- Future不使用显式的yield操作,而是通过返回
Poll::Pending
(或完成时的Poll::Ready
)来放弃对CPU核心的控制。- 没有什么可以强制future放弃CPU。如果它们愿意,就能够永不从
poll
中返回,如在一个无限循环中打转。 - 由于每个future都可以阻止执行器中其他future的执行,因此我们需要信任它们不是恶意的。
- 没有什么可以强制future放弃CPU。如果它们愿意,就能够永不从
- Future在内部存储它在下一次
poll
调用时恢复执行所需的所有状态。使用async/await,编译器会自动检测所需的所有变量,并将其存储在生成的状态机中。- 仅保存恢复执行所需的最少状态。
- 由于
poll
方法在返回时会放弃调用栈,因此该栈可用于poll其他的future。
我们看到future和async/await完美契合协作式多任务,区别就是使用了一些不同的术语。因此,在下文中,我们将交替使用术语“任务”和“future”。
实现
现在,我们了解了Rust中基于future和async/await的协作式多任务的工作原理,是时候向我们的内核中添加对多任务的支持了。由于Future
trait是core
库的一部分,而async/await是Rust语言本身的功能,因此在#![no_std]
内核中使用无需其他操作就可以使用它们。唯一的要求是我们至少应使用Rust在2020-03-25之后的nightly版本,因为之前的async/await是不兼容no_std
的。
只要使用较近的nightly版本,我们就可以在main.rs
中使用async/await:
1 | async fn async_number() -> u32 { |
async_number
函数是一个async fn
,因此编译器将其转换为实现了Future
trait的状态机。由于该函数仅返回42
,因此生成的future将在首次poll
调用时直接返回Poll::Ready(42)
。像async_number
一样,example_task
函数也是一个async fn
。 它等待async_number
返回的数字,然后使用println宏将其打印出来。
要运行example_task
返回的future,我们需要一直对其调用poll
,到它通过返回Poll::Ready
告知其已完成为止。为此,我们需要创建一个简单的执行器类型。
任务
在实现执行器之前,我们要创建一个具有Task
类型的新模块task
:
1 | pub mod task; |
1 | use core::{future::Future, pin::Pin}; |
Task
结构体是一个新的类型封装器,它封装了一个内存固定的、堆分配的、以空类型()
作为输出的动态分发的future。让我们详细研究一下:
- 我们要求与任务关联的future返回
()
。这意味着任务不会返回任何结果,只是为了执行其中的副作用。例如,我们上面定义的example_task
函数也没有返回值,但是作为副作用,函数将一些信息打印在了屏幕上。 dyn
关键字告诉我们存放在Box
中的是一个trait对象。这意味着future上的方法是动态分发的,因此我们才可以在Task
类型中存储不同类型的future。这很重要,因为每个async fn
都有自己的future类型,而我们希望能够创建多个不同的任务。- 正如我们在pinning一节了解到的那样,
Pin<Box>
类型通过将值放在堆上并防止对值创建&mut引用来确保该值不会在内存中移动。这很重要,因为async/await生成的future可能是自引用的,即包含指向自身的指针,而该指针将在future移动时失效。
为了能够适应future创建Task
结构体,我们编写一个new
函数:
1 | impl Task { |
该函数接受输出类型为()
的任意future,然后通过Box::pin
函数将其固定在内存中。然后,函数将future封装在Task结构体中并返回。这里需要使用'static
生命周期,因为返回的Task
生存时间不确定,因此future也必须在该时间内保持有效。
我们还添加了一个poll
方法,使执行器可以轮询存储的future:
1 | use core::task::{Context, Poll}; |
由于Future
trait的poll
方法需要在Pin<&mut T>
类型上调用,因此我们先要使用Pin::as_mut
方法转换类型为Pin<Box<T>>
的self.future
字段。然后,在转换后的self.future
字段上调用poll
并返回结果。由于Task::poll
方法仅应由我们稍后创建的执行器调用,因此保持其为task
模块的私有函数。
简单执行器
由于执行器可能非常复杂,因此我们有意在开始仅创建一个非常基本的执行器,然后再渐进的实现更多功能。为此,我们先创建一个新的task::simple_executor
子模块:
1 | pub mod simple_executor; |
1 | use super::Task; |
该结构体包含一个VecDeque
类型的字段task_queue
,这是一个在两端都能执行push和pop操作的向量。使用这种类型的思路是,通过spawn
在队尾方法插入新任务,并从队首弹出要执行的下一个任务。这样,我们就得到了一个简单的FIFO队列(“先进先出”)。
假唤醒器
为了调用poll
方法,我们需要创建一个Context
类型,用来封装Waker
类型。简单起见,我们将首先创建一个不执行任何操作的假唤醒器。为此,我们创建了RawWaker
实例,用以定义不同Waker
方法的实现,然后使用Waker::from_raw
函数将其转换为Waker
:
1 | use core::task::{Waker, RawWaker}; |
from_raw
函数是非安全的,因为如果程序员不按照RawWaker
文档的要求使用,就可能导致未定义的行为。在查看dummy_raw_waker
函数的实现之前,我们首先尝试了解RawWaker
类型的工作方式。
RawWaker
RawWaker
类型要求程序员显式定义一个虚拟方法表(vtable),用以指定在克隆,唤醒、删除RawWaker
时应调用的函数。vtable的布局由RawWakerVTable
类型定义。每个函数都接收一个*const ()
参数,该参数本质上是一个指向某结构体的擦除类型的&self
指针,例如在堆上的分配。使用*const ()
指针而不是某适当引用的原因是RawWaker
类型应该是非泛型的,但仍需支持任意类型。作为参数传递给函数的指针值为给RawWaker::new
的data
指针。
通常,RawWaker
结构体是为封装在Box
或Arc
类型中的某些堆分配而创建的。对于这些类型,可以使用Box::into_raw
之类的方法将Box<T>
转换为*const T
指针。然后可以将该指针转换为匿名*const ()
指针,再传递给RawWaker::new
。由于每个vtable函数都使用相同的*const ()
作为参数,因此这些函数可以安全地将指针转换回Box<T>
或&T
以执行操作。可以想象,此过程非常危险,很容易出错导致未定义行为。因此,若非必要,则并不建议手动创建RawWaker
。
一个假的RawWaker
虽然不建议手动创建RawWaker
,但目前还没有其他方法可以创建不执行任何操作的假Waker
。幸运的是,不执行任何操作也会使得实现dummy_raw_waker
函数变得相对安全:
1 | use core::task::RawWakerVTable; |
首先,我们定义两个名为no_op
和clone
的内部函数。no_op
函数接受* const()
指针,不执行任何操作。clone
函数也接受* const()
指针,并通过再次调用dummy_raw_waker
返回一个新的RawWaker
。我们使用这两个函数来创建最小化的RawWakerVTable
:clone
函数用于克隆操作,no_op
函数用于其他所有操作。由于RawWaker
不执行任何操作,因此从clone
返回的是一个新RawWaker
还是一个真正的克隆其实并不重要。
创建vtable
之后,我们使用RawWaker::new
函数创建RawWaker
。传递的* const()
无关紧要,因为并没有任何vtable的函数会使用它。于是我们只传递了一个空指针。
一个run
方法
现在,我们有了创建Waker
实例的方法,就可以使用它在执行器上实现run
方法。最简单的run
方法是在循环中不停的poll所有队列中的任务,直到所有任务均已完成。这并不高效,因为它没有利用Waker
类型的通知功能,但是它是使示例能够运行起来的的最简单方法:
1 | use core::task::{Context, Poll}; |
该函数使用while let
循环来处理task_queue
中的所有任务。对于每个任务,我们首先把由dummy_waker
函数返回的Waker
实例封装为Context
类型。然后调用Task::poll
方法并使用该context
做参数。如果poll
方法返回Poll :: Ready
,则说明任务已完成,就可以继续执行下一个任务。如果任务仍然处于Poll::Pending
,则将其再次添加到队尾,以便在后续的循环中再次轮询它。
尝试运行
使用我们的SimpleExecutor
类型,现在就可以尝试在main.rs
中运行example_task
函数返回的任务了:
1 | use blog_os::task::{Task, simple_executor::SimpleExecutor}; |
运行代码,将看到预期的消息”async number: 42“打印到屏幕上:
我们来总结一下示例中执行的各个步骤:
- 首先,我们创建一个带有空
task_queue
的SimpleExecutor
类型实例。 - 接下来,我们调用异步
example_task
函数,该函数返回的是future。将这个future封装在Task
类型中,以将其移动到堆中并固定在内存中,然后通过spawn
方法将该任务添加到执行器的task_queue
中。 - 然后,我们调用
run
方法以开始执行队列中的单个任务。这涉及:
在task_queue
为空后,run
方法返回。之后,我们的kernel_main
函数将继续执行,并打印”It did not crash!“信息。
异步键盘输入
这个简单执行器并不能利用Waker
通知,只会循环遍历所有任务直到它们完成。对于我们的示例,这并不是问题,因为我们的example_task
在首次执行poll
时就可以直接完成。要能观察到一个正确的Waker
实现所带来的性能优势,我们首先需要创建一个真正异步的任务,即可能会在首次poll
调用中返回Poll::Pending
的任务。
其实我们的系统中已经存在了一些异步特性:硬件中断。正如我们在硬件中断一文中了解到的那样,硬件中断可以在任意时刻发生,这是由某些外部设备所决定的。例如,在经过一段预定义的时间后,硬件计时器会将中断发送到CPU。当CPU接收到中断时,它会立即将控制权转移给在中断描述符表(IDT)中定义的相应处理函数。
下面,我们将基于键盘中断创建一个异步任务。键盘中断是一个很好的选择,因为它既具有不确定性又对延迟有很高的要求。不确定性意味着无法预测下一次按键的发生时间,因为这完全取决于用户。低延迟是指我们要及时处理键盘输入,否则用户会感到滞后。为了以一种更高效的方式支持此类任务,执行器就需要对Waker
通知提供适当的支持。
键盘扫描码队列
我们当前仍是直接在中断处理程序中处理键盘输入。长远考虑这并不是一个好主意,因为中断处理程序应保持尽可能短,以免长时间中断重要工作。因此,中断处理程序应仅执行必要的最少量的工作(如键盘扫描码的读取),而将其余工作(如键盘扫描码的解释)留给后台任务。
将工作委派给后台任务的常见模式是创建某种队列。中断处理程序将工作单元推送到队列,而后台任务处理程序从队列中取出工作以进行处理。对于我们的键盘中断,就是中断处理程序仅从键盘读取扫描代码,并将其推送到队列,然后直接返回。键盘任务从队列的另一端取出扫描码,并对每个扫描码进行解释和处理:
可以用一个由互斥锁保护的VecDeque
来简单的实现该队列。但是,在中断处理程序中使用互斥锁并不是一个好主意,因为很容易导致死锁。例如,当用户在键盘任务锁定队列时又按下了某个键,中断处理程序将再次尝试获取该锁,这会导致死锁。这种方法的另一个问题是,当VecDeque
已满时,就会通过执行新的堆分配来自动增加队列容量。这可能又会导致死锁,因为我们的分配器还在内部使用了互斥锁。还有更深层次的问题,当堆碎片化时,堆分配可能会花费大量时间甚至会分配失败。
为避免这些问题,我们需要一个push
操作不涉及互斥量或堆分配的队列实现。这种队列可以通过使用无锁原子操作来实现元素的push和pop。如此,就可以创建只需要&self
引用即可使用的push
和pop
操作,因此也无需互斥锁。为了避免在push
时执行堆分配,可以通过预先分配的固定大小的缓冲区来支持队列空间扩展。尽管这会让队列有界(即有最大长度限制),但在实际操作中通常能够为队列长度定义一个合理的上限,这并不是一个大问题。
crossbeam
crate
以正确且高效的方式实现这样的队列是一件非常困难的事,因此建议使用经过良好测试的现有实现。crossbeam
是一个流行的Rust项目,它实现了多种用于并发编程的无互斥类型。该crate提供的ArrayQueue
正是我们需要的类型。更加幸运的是,该类型恰好与具有堆分配支持的no_std
crate完全兼容。
要使用该类型,我们需要添加crossbeam-queue
crate依赖:
1 | [dependencies.crossbeam-queue] |
该crate默认依赖标准库。为了使其与no_std
兼容,我们需要禁用其默认特性并启用alloc
特性。(请注意,在这里选择主crossbeam
crate作为依赖将不能工作,因为它缺少用于no_std
的queue
模块导出。我们提交了一个pull请求来解决此问题,但该修改尚未在crates.io上发布。)
队列实现
有了ArrayQueue
类型,我们就可以在新的task::keyboard
模块中创建全局的扫描码队列了:
1 | pub mod keyboard; |
1 | use conquer_once::spin::OnceCell; |
由于ArrayQueue::new
需要执行堆分配,而这在编译时是不可行的(到目前为止),所以我们不能直接初始化该静态变量。为此,我们使用了conquer_once
crate的OnceCell
类型,以能够安全的执行一次性静态变量的初始化。为了使用该crate,我们需要将其作为依赖项添加到Cargo.toml
中:
1 | [dependencies.conquer-once] |
我们确实也可以在这里使用lazy_static
宏来代替OnceCell
。不过OnceCell
类型的优点是能够确保初始化不会在中断处理程序中发生,从而阻止中断处理程序执行堆分配。
填充队列
为了填充扫描代队列,我们创建了一个新的add_scancode
函数,以从中断处理程序中调用:
1 | use crate::println; |
使用OnceCell::try_get
函数来获取对初始化队列的引用。如果队列尚未初始化,就忽略键盘扫描码并直接打印警告。要注意的是,不能在此函数中初始化队列,因为它将被中断处理程序调用,而我们不应该在中断处理的过程中执行堆分配。由于不应从main.rs
中调用此函数,因此我们使用pub(crate)
使该函数仅可在lib.rs
中使用。
ArrayQueue::push
方法只需&self
引用即可执行,这使得我们能够非常简单的从静态队列上调用该方法。ArrayQueue
类型本身执行所有必要的同步,因此这里不需要使用互斥类封装。如果队列已满,我们也会打印警告。
要在键盘中断上调用add_scancode
函数,就需要在interrupts
模块中更新keyboard_interrupt_handler
函数:
1 | extern "x86-interrupt" fn keyboard_interrupt_handler( |
我们从该函数中删除了所有键盘事件处理的代码,并添加了对add_scancode
函数的调用,其余代码保持不变。
正如预期,现在使用cargo run
运行内核时,每次按键不再显示在屏幕上,取而代之的是,每次按键时都会看到队列未初始化的警告。
扫描码流
为了初始化SCANCODE_QUEUE
并以异步方式从队列中读取扫描码,我们创建了一个新的ScancodeStream
类型:
1 | pub struct ScancodeStream { |
_private
字段用于阻止从本模块外构造结构体的行为。这使得new
函数成为构造该类的唯一方法。在函数中,我们首先尝试初始化SCANCODE_QUEUE
静态变量。如果它已初始化,我们就产生一个panic,如此确保只会存在一个ScancodeStream
实例。
为了使扫描码可用于异步任务,下一步是实现一个类似poll
的方法,以尝试从队列中弹出下一个扫描码。虽然这听上去似乎是我们应该为该类型实现Future
trait,但在这里实际上并不是。问题在于,Future
trait仅对单个异步值进行抽象,并且期望在它返回Poll::Ready
之后不会再次调用poll
方法。但是,我们的扫描码队列包含多个异步值,因此可以持续对其进行轮询。
Stream
trait
由于产生多个异步值的类型很常见,因此future
crate为此类提供了有用的抽象:Stream
trait。其定义如下:
1 | pub trait Stream { |
此定义与Future
trait非常相似,但有以下区别:
- 关联的类型叫做
Item
而不是Output
。 - 与返回
Poll<Self::Item>
的poll
方法不同,Stream
trait定义了一个返回Poll<Option<Self::Item>>
的poll_next
方法(请注意附加的Option
)。
此外还有一个语义上的区别:poll_next
可以被重复调用,直到返回Poll::Ready(None)
来表示流已完成。从这方面来看,该方法类似于Iterator::next
方法,即也会在返回最后一个值之后就只返回None
。
实现Stream
让我们为ScancodeStream
实现Stream
trait,以便使用异步方式提供SCANCODE_QUEUE
中的值。为此,我们首先需要添加对futures-util
crate的依赖,其中就包含Stream
类型:
1 | [dependencies.futures-util] |
我们禁用默认特性以使crate兼容no_std
,并启用alloc
特性以使其基于堆分配的类型可用(稍后将需要它)。(请注意,我们确实可以添加对主futures
crate的依赖,从而重新导出futures-util
crate,但这将产生更多的依赖和更长的编译时间。)
现在我们可以导入并实现Stream
trait:
1 | use core::{pin::Pin, task::{Poll, Context}}; |
我们首先使用OnceCell::try_get
方法来获取初始化的扫描码队列的引用。此操作应该不会失败,因为我们已在new
函数中执行了队列初始化,所以可以放心的使用expect
,以在未初始化队列时直接panic。接下来,我们使用ArrayQueue::pop
方法尝试从队列中获取下一个元素。如果成功,我们将返回封装在Poll::Ready(Some(…))
中的扫描代码。如果失败,则意味着队列已空。在这种情况下,我们返回Poll::Pending
。
唤醒器支持
与Futures::poll
方法类似,Stream::poll_next
方法要求异步任务在返回Poll::Pending
之后才就绪时通知执行器。如此,执行器就不需要再次轮询相同的任务,任务完成会自动通知执行器,这大大降低了等待任务时的性能开销。
要发送此类通知,任务应从参数Context
引用中获取Waker
并将其储存在某处。当任务就绪时,它应该在储存的Waker
上调用wake
方法,以通知执行者应该再次轮询该任务了。
原子化的唤醒器
要为我们的ScancodeStream
实现Waker
通知功能,就需要一个可以在两次轮询调用之间储存Waker
的地方。不能将其存储为ScancodeStream
的字段中,因为我们需要从add_scancode
函数进行访问。解决方案是使用由Futures-util
crate提供的AtomicWaker
类型创建静态变量。与ArrayQueue
类型类似,此类型基于原子化的指令,可以被安全地存放在静态变量中,并支持并发修改。
定义一个AtomicWaker
类型的静态变量WAKER
:
1 | use futures_util::task::AtomicWaker; |
思路是poll_next
的实现将当前的唤醒器存储在此静态变量中,当将新的扫描码添加到队列时,add_scancode
函数将在其上调用wake
函数。
储存唤醒器
poll
/poll_next
的协议要求任务在返回Poll::Pending
时为传递的Waker
注册唤醒。让我们修改poll_next
的实现以满足此要求:
1 | impl Stream for ScancodeStream { |
像以前一样,我们首先使用OnceCell::try_get
函数来获取对初始化的扫描码队列的引用。然后,我们乐观地尝试从队列中pop
任务并在成功时返回Poll::Ready
。如此,就可以避免在队列非空时注册唤醒其的性能开销。
如果对queue.pop()
的第一次调用并未成功,则该队列有可能为空。仅仅有可能为空,是因为中断处理程序也许是在轮询检查后立即异步填充了队列。由于此竞争条件可能在下一次轮询检查时再次发生,因此我们需要在第二次检查之前在WAKER
静态变量中注册Waker
。这样,尽管在Poll::Pending
返回之前也可能发起唤醒,但是可以保证我们能够收到所有在轮询检查后推送的扫描码。
使用AtomicWaker::register
函数注册Context
参数中包含的Waker
之后,我们再次尝试从队列中弹出任务。如果这次能成功,则返回Poll::Ready
。此时还需要使用AtomicWaker::take
删除这个注册的唤醒器,因为任务已完成就不再需要唤醒器通知了。如果queue.pop()
又失败了,我们仍将像以前一样返回Poll::Pending
,区别是这次注册了唤醒器。
请注意,对于没有(或尚未)返回Poll::Pending
的任务,可以通过两种方式进行唤醒。一种方法是在唤醒立刻发生于返回Poll::Pending
之前,那么就使用上面提到的竞争条件。另一种方法是由于注册了唤醒器使得队列不再为空时,返回Poll::Ready
。由于这些假唤醒是无法避免的,因此执行器必须能够正确处理它们。
唤醒储存的唤醒器
为了唤醒存储的Waker
,我们在add_scancode
函数中添加对WAKER.wake()
的调用:
1 | pub(crate) fn add_scancode(scancode: u8) { |
此处的唯一修改就是,如果成功推送到扫描码队列,就调用WAKER.wake()
。如果静态变量WAKER
中已经注册了一个唤醒器,则此方法将在其上调用同名的wake
方法,以通知执行器。否则,该操作将为空操作,即没有任何效果。
重要的是,我们必须在推送到队列后才能调用wake
,否则在队列仍然为空时可能会过早唤醒任务。一个可能的例子,就是一个多线程执行器在CPU的其他核心中并发的唤醒任务时。虽然现在内核还不支持线程,但也会尽快添加,我们当然不希望那时再出问题。
键盘任务
我们为ScancodeStream
实现了Stream
trait,现在就可以创建异步键盘任务了:
1 | use futures_util::stream::StreamExt; |
上面的代码与本文修改之前在键盘中断处理程序中使用的代码非常相似。唯一的区别是,我们不是从I/O端口读取扫描码,而是从ScancodeStream
中获取。为此,我们首先创建一个新的Scancode
流,然后重复使用StreamExt
trait所提供的next
方法来获取Future
,即为该流中的下一个元素。通过在其上使用await
运算符,我们可以异步等待future的结果。
使用while let
循环,直到流返回None
表示结束为止。由于我们的poll_next
方法从不返回None
,因此这实际上是一个无休止的循环,即print_keypresses
任务永远不会完成。
让我们在main.rs
中的执行器中添加print_keypresses
任务,以再次获得有效的键盘输入:
1 | use blog_os::task::keyboard; // new |
现在执行cargo run
,将看到键盘输入再次起作用:
如果你密切注意计算机的CPU使用率,就会发现QEMU
进程会令CPU持续繁忙。发生这种情况是因为我们的SimpleExecutor
在一个循环中不停地轮询任务。因此,即使我们没有按键盘上的任何键,执行器也会在print_keypresses
任务上不停的调用poll
,即使该任务并未取得任何进展而每次都会返回Poll::Pending
。
带有唤醒器支持的执行器
要解决性能问题,我们需要创建一个能够正确利用Waker
通知的执行器。这样,当下一个键盘中断发生时,将通知执行器,因此就不需要再不停的轮询print_keypresses
任务了。
任务ID
要创建能够正确支持唤醒器通知的执行器,第一步就是为每个任务分配唯一的ID。这是必需的,因为我们需要一种方法来指定希望唤醒的任务。首先,创建一个新的TaskId
封装类型:
1 |
|
TaskId
结构体其实是对u64
的简单封装。我们为它派生了许多trait,使其可打印、复制、比较、排序。其中可排序很重要,因为我们接下来将使用TaskId
作为BTreeMap
的键。
要生成一个新唯一ID,需要创建一个TaskID::new
函数:
1 | use core::sync::atomic::{AtomicU64, Ordering}; |
该函数使用AtomicU64
类型的静态变量NEXT_ID
来确保每个ID仅分配一次。fetch_add
方法以原子化的方式自增,并在同一个原子操作中返回上一个值。这意味着即使并行调用TaskId::new
方法,每个ID也会返回一次。Ordering
参数定义是否允许编译器在指令流中对fetch_add
操作重新排序。因为我们只要求ID是唯一的,所以在这种情况下,使用最弱的Relaxed
排序就足够了。
现在,我们可以使用附加的id
字段来扩展Task
类型:
1 | pub struct Task { |
新的id
字段使唯一地命名任务成为可能,这是唤醒指定任务所必需的。
Executor
类型
我们在task::executor
模块中创建新的Executor
类型:
1 | pub mod executor; |
1 | use super::{Task, TaskId}; |
我们没有像在SimpleExecutor
中那样将任务存储在VecDeque
中,而是存放在一个包含任务ID的task_queue
字段和包含实际Task
实例的BTreeMap
类型的tasks
字段中。该映射由TaskId
索引,以允许有效地恢复执行特定任务。
task_queue
字段是有任务ID组成的ArrayQueue
,并封装为实现了引用计数的Arc
类型。引用计数使得在多个所有者之间共享变量所有权成为可能。它在堆上分配变量,并计算对变量的活动引用数。当活动引用数达到零时,即不再需要该值,因此会将其释放。
我们将这种Arc<ArrayQueue>
类型用于task_queue
,以使其能够被执行者和唤醒者共享。这个思路是唤醒者将已唤醒任务的ID推送到队列中。而执行器位于队列的接收端,从task
映射中按其ID取回已唤醒的任务,并运行任务。使用固定大小的队列而不是无限制队列(如SegQueue
)的原因是,不应在中断处理程序中进行堆分配,将推送到此队列。
除了task_queue
和tasks
映射外,Executor
类型还具有一个waker_cache
字段,它也是一个映射。在创建任务后,此映射将缓存任务的Waker
。首先,可以使用同一个唤醒器对同一个任务做多次唤醒操作,这相较于每次唤醒均创建一个唤醒器而言更高效。其次,它确保唤醒器的引用计数不会在中断处理程序内被释放,而这可能会导致死锁(有关此问题的更多信息,请参见下文)。
要创建Executor
,我们可以编写一个简单的new
函数。我们为task_queue
选择100的容量,这对可预见的未来而言已经足够了。如果我们的系统在某个时刻真的保有超过100个并发任务,也可以轻松地修改这个值。
生成任务
与SimpleExecutor
类似,我们也在Executor
类型上编写一个spawn
方法,该方法将给定任务添加到tasks
映射,并通过将其ID推送到task_queue
中来立刻执行唤醒:
1 | impl Executor { |
如果映射中已经存在具有相同ID的任务,则BTreeMap::insert
方法将直接返回它。由于每个任务都有唯一的ID,理论上不应发生这种情况,因此若出现这种情况我们就产生panic,表明代码中存在错误。同样,当task_queue
填满时也会产生panic,因为如果设置的队列大小合适,就不应该发生这种情况。
运行任务
要执行task_queue
中的所有任务,我们创建一个私有的run_ready_tasks
方法:
1 | use core::task::{Context, Poll}; |
该函数的基本思路与SimpleExecutor
类似:遍历task_queue
中的任务,为每个任务创建唤醒器,并轮询该任务。但我们并不直接将待处理的任务添加到task_queue
的队尾,而是让TaskWaker
负责将唤醒的任务添加回队列。稍后将显示该唤醒器类型的实现。
让我们研究一下run_ready_tasks
方法的一些实现细节:
- 我们将
self
解构为三个字段,以避免某些借用检查器错误。即我们的实现需要从闭包内部访问self.task_queue
,而目前的闭包会尝试直接借用整个self
。这本质上是借阅检查器自己的问题,且将在实现RFC 2229时解决。 - 对于每个弹出的任务ID,我们从
tasks
映射中取出其对应任务的可变引用。由于我们的ScancodeStream
会在检查任务是否需要休眠之前就注册唤醒器,因此可能会发生为已经不存在的任务执行唤醒的情况。在这种情况下,我们只需忽略唤醒并继续执行队列中的下一个ID即可。 - 为了避免在每个轮询中新建唤醒器所带来的性能开销,我们在每个任务创建后将其唤醒器缓存到
waker_cache
映射中。为此,我们结合使用BTreeMap::entry
方法和Entry::or_insert_with
,若唤醒器尚不存在则新建一个,再获得其的可变引用。为了创建新的唤醒器,我们克隆了task_queue
并将其与任务ID一起传给TaskWaker::new
函数(其实现见下文)。由于task_queue
封装在Arc
中,因此clone
只会增加其引用计数,而仍指向同一堆分配的队列。请注意,并非所有唤醒器的实现都可以进行这种重用,但是我们的TaskWaker
实现是允许重复的。
任务完成时将返回Poll::Ready
。在这种情况下,我们使用BTreeMap::remove
方法从tasks
映射中删除该任务。如果该任务还有缓存的唤醒器,我们也会将其移除。
唤醒器设计
唤醒器用于将已唤醒的任务ID推送到执行器的task_queue
。我们创建一个新的TaskWaker
结构体,以存储任务ID和一个task_queue
引用:
1 | struct TaskWaker { |
由于task_queue
的所有权由执行器和唤醒器共享,因此我们使用Arc
封装类型来实现共享所有权的引用计数。
唤醒操作的实现非常简单:
1 | impl TaskWaker { |
我们将task_id
推送到引用的task_queue
中。由于对ArrayQueue
类型的修改仅需要共享的引用,因此我们都不需要&mut self
,仅用&self
即可实现。
Wake
trait
为了能够使用TaskWaker
类型轮询future,我们首先要将其转为Waker
实例。这是必需的,因为Future::poll
方法使用Context
的实例作参数,而该实例只能基于Waker
类型构造。尽管确实可以通过提供一个RawWaker
实现来做到这一点,但是使用基于Arc
的Wake
trait,然后再用标准库提供的From
trait的实现来构造Waker
,将会更简单且安全。
Wake
Trait实现如下所示:
1 | use alloc::task::Wake; |
由于唤醒器通常在执行器和异步任务之间共享,因此trait
方法要求将Self
的实例封装在Arc
类型中,以实现所有权的引用计数。这意味着我们必须将TaskWaker
放在Arc
中才能进行调用。
wake
和wake_by_ref
方法之间的区别在于,后者仅需要Arc
的引用,而前者需要Arc
的所有权,因此通常需要增加引用计数。并非所有类型都支持通过引用调用唤醒,因此方法wake_by_ref
的实现是可选的,不过由于该方法可以避免不必要的引用计数修改,所以性能较高。对我们来说,可以在两个trait方法中都简单地继续调用wake_task
函数,该函数仅需要一个共享的&self
引用。
创建唤醒器
由于所有Arc
封装的实现了Waker
trait的类型都支持使用From
转换,因此我们现在可以实现Executor::run_ready_tasks
方法所需的TaskWaker::new
函数了:
1 | impl TaskWaker { |
我们使用task_id
和task_queue
作为参数创建TaskWaker
。然后,我们将TaskWaker
封装在Arc
中,并使用Waker::from
将其转换为Waker
。此from方法负责为TaskWaker类型构造RawWakerVTable和RawWaker实例。这个from
方法用于TaskWaker
类型构造一个RawWakerVTable
和一个RawWaker
实例。如果你对它的详细工作方式感兴趣,请查看alloc
crate中的实现。
run
方法
在实现了唤醒器之后,我们终于可以为执行器添加一个run
方法了:
1 | impl Executor { |
此方法仅循环调用run_ready_tasks
函数。理论上,当tasks
映射为空时我们确实可以从令函数返回,不过这并不会发生因为keyboard_task
永远不会完成,因此一个简单的loop
就足够了。鉴于该函数永不返回,因此我们使用返回类型!
以告诉编译器此为发散函数。
现在,我们可以更改kernel_main
,以使用新的Executor
来代替SimpleExecutor
:
1 | use blog_os::task::executor::Executor; // new |
我们只需要更改导入类型和类型名称。由于run
是发散函数,编译器知道它永不返回,因此我们不再需要在kernel_main
函数末尾调用hlt_loop
。
当我们现在使用cargo run
运行内核时,将看到键盘输入依然有效:
不过,QEMU的CPU使用率并没有降低。这是因为我们仍然使CPU始终保持忙碌状态。现在虽然在任务唤通知醒前都不再执行轮询,但仍在循环中检查task_queue
。要解决此问题,我们需要在CPU没有其他工作时进入睡眠状态。
空闲时睡眠
基本思路就是在task_queue
为空时执行hlt
指令。该指令使CPU进入睡眠状态,直到下一个中断到达。CPU在中断后立即会再次激活,这确保了当中断处理程序向task_queue
推送任务时,CPU仍然可以直接做出反应。
为了实现这一点,我们在执行器中创建一个新的sleep_if_idle
方法,并从run
方法中调用它:
1 | impl Executor { |
由于我们在run_ready_tasks
之后直接调用sleep_if_idle
,这将一直循环直到task_queue
为空,因此似乎无需再次检查队列。但是,硬件中断可能紧接着在run_ready_tasks
返回之后发生,因此在调用sleep_if_idle
函数时队列中可能还有一个新任务。仅当队列仍然为空时,我们才通过x86_64
crate提供的instructions::hlt
函数执行hlt
指令,使CPU进入睡眠状态。
不幸的是,在此实现中仍然存在微妙的竞争条件。由于中断是异步的且可以随时发生,因此有可能在is_empty
检查和hlt
调用之间发生中断:
1 | if self.task_queue.is_empty() { |
万一此中断推送到task_queue
,即使现在确实有一个就绪的任务,CPU也会进入睡眠状态。在最坏的情况下,这可能会将键盘中断的处理,延迟到下一次按键或下一个定时器中断。那么我们应该如何预防呢?
答案是在检查之前禁用CPU上的中断,并与hlt
指令一起以原子化操作再启用中断。那么,此间发生的所有中断都将延迟到hlt
指令之后,从而不会丢失任何唤醒。为了实现该方法,我们可以使用x86_64
crate提供的interrupts::enable_and_hlt
函数。
修改后的sleep_if_idle
函数如下所示:
1 | impl Executor { |
为了避免竞争条件,我们在检查task_queue
是否为空之前禁用中断。如果队列确实为空,我们将通过一个原子化操作,使用enable_and_hlt
函数启用中断,同时让CPU进入睡眠。如果队列不再为空,则意味着在返回run_ready_tasks
之后,中断会唤醒任务。在这种情况下,我们将再次启用中断,且不再hlt
而直接继续让程序执行。
现在,执行器在没有任务的情况下能够正确地让CPU进入睡眠状态。我们可以看到,当再次使用cargo run
运行内核时,QEMU进程的CPU利用率要低得多。
可能的扩展
我们的执行器现在可以高效地运行任务。它利用唤醒器通知来避免持续轮询等待的任务,并在无任何工作时使CPU进入睡眠状态。但是,我们的执行器仍然非常基础,而且可以通过许多方式来扩展功能:
- 调度:目前,我们使用
VecDeque
类型为task_queue
实现先进先出(FIFO)策略,这通常也称为循环调度。此策略对于其上的所有任务负载可能并不是最高效的。例如,优先考虑延迟敏感的任务或执行大量I/O的任务可能更加高效。有关更多信息,请参见Operating Systems: Three Easy Pieces一书中的scheduling chapter章节,或Wikipedia上关于调度的文章。 - 任务生成:我们的
Executor::spawn
方法当前需要&mut self
引用,因此在启动run
方法之后将不再可用。为了解决这个问题,我们可以再创建一个额外的Spawner
类型,该类型与执行器共享某种队列,并允许从任务自己创建新任务。例如,队列可以直接使用task_queue
,也可以是一个在执行器中循环检查的单独的队列。 - 使用线程:我们尚不支持线程,但会在下一篇文章中添加。这将使我们能够在不同的线程中启动多个执行器实例。这种方法的优点是可以减少长时长任务带来的延迟,因为其他任务可以同时运行。这种方法还可以利用多个CPU内核。
- 负载均衡:添加线程支持时,如何在执行器间分配任务,以确保所有CPU核心都能够被用到将变得很重要。常见的技术是work stealing。
小结
在这篇文章的开始,我们介绍了多任务,并区分了抢占式多任务(强制性地定期中断正在运行的任务)和协作式多任务(使任务一直运行到自愿放弃对CPU的控制)之间的区别。
然后,我们探讨了Rust的async/await如何提供协作式多任务的语言级实现。Rust的实现基于基于轮询的Future
trait,该特质抽象了异步任务。使用async/await,就可以像编写普通同步代码一样处理future。不同之处在于异步函数会再次返回Future
,为了运行它,需要在某个时刻将其添加到执行器中。
在后台,编译器将async/await代码转换为状态机,每个.await
操作对应一个可能的暂停点。通过利用程序对自身执行步骤的了解,编译器就能够为每个暂停点保存最小化的状态信息,从而使每个任务的内存消耗极小。而其中的一个挑战是,生成的状态机可能包含自引用结构体,例如,当异步函数的局部变量相互引用时。为了防止指针失效,Rust使用Pin
类型来确保future在被第一次轮询后就不再能够在内存中移动了。
接下来介绍了如何实现,我们首先创建了一个非常简单的执行程序,它会在一个循环中持续轮询所有生成的任务,且完全不使用Waker
类型。然后,我们通过实现异步键盘任务来展示了唤醒通知的优点。该任务使用crossbeam
crate提供的无互斥的ArrayQueue
类型定义了一个静态变量SCANCODE_QUEUE
。现在,键盘中断处理程序不会再直接处理按键,而会将所有接收到的扫描代码放入队列中,然后唤醒已注册的Waker
,以发送信号表示有新的输入可用。在接收端,我们创建了一个ScancodeStream
类型,以提供一个用于获取队列中下一个scancode
的Future
。这样就可以创建一个异步print_keypresses
任务,以使用async/await
来解释并打印队列中的扫描码。
为了利用键盘任务的唤醒通知,我们创建了一个新的Executor
类型,该类型使用Arc
封装task_queue
,以使得就绪任务可共享。我们实现了一个TaskWaker
类型,以将唤醒的任务的ID直接推送到task_queue
,然后使用执行器进行轮询。为了在空闲时时节省CPU,我们利用hlt
指令添加了对CPU在空闲时睡眠的支持。最后,我们讨论了执行器的一些潜在扩展,例如,提供多核支持。
下期预告
我们利用async/await使内核具备了对协作多任务的基本支持。尽管协作式多任务处理非常有效,但是当单个任务运行太长时间并导致其他任务无法运行时,系统就会出现延迟。因此,在我们的内核中添加对抢占式多任务的支持也是合理的。
在下一篇文章中,我们将介绍抢占式多任务处理的最常见形式,线程。除了解决长时长任务的问题,线程还能够利用多个CPU核心,并支持运行不受信任的用户程序。
支持本项目
创建和维护这个博客和相关库是一项繁重的工作,但我真的很喜欢。通过支持我,您可以让我在新内容、新功能和持续维护上投入更多时间。
支持我的最好方式是在GitHub上赞助我,因为他们不收取任何中间费用。如果你喜欢其他平台,我也有Patreon和Donorbox账户。后者是最灵活的,因为它支持多种货币和一次性捐款。
感谢您的支持!