PingCap的Rust训练课程4:并发与并行
前言
任务:使用自定义协议,创建一个具有同步网络的多线程、持久化键/值存储的服务端和客户端。
目标:
- 写一个简单的线程池
- 使用通道进行跨线程通信
- 利用锁共享数据结构
- 无锁执行读操作
- 对单线程与多线程版本进行基准测试
关键词:线程池、通道、锁、无锁数据结构、原子化、参数化基准测试。
介绍
在这个项目中,您将创建一个使用自定义协议进行通信的简单键/值服务端和客户端。服务端将使用同步网络,并将使用较为复杂的并发实现来响应多个请求。内存索引将改为并发数据结构,由所有线程共享,而压缩操作将在专用的线程上完成,以减少单个请求的延迟。
项目需求规格
cargo项目kvs
建立了一个名为kvs-client
的命令行键值存储客户端,和一个名为kvs-server
的键值存储服务端,二者又都调用了一个名为kvs
的库。客户端通过一个自定义协议与服务端通信。
命令行规格与前一个项目相同。本项目与前一个项目的的不同之处在于并发实现,在我们实现时会对其进行描述。
库的规格几乎相同,除了以下两点:一是这一次所有的KvsEngine
、KvStore
等类型的方法都使用&self
而不是&mut self
,我们将实现Clone
trait,这在并发数据结构中很常见。但为什么呢?其实并不是说我们将不再编写不可变代码,尽管它们将会在线程之间共享。那为什么要在方法签名中避免使用&mut self
?也许您现在还不清楚,但在本项目结束时,它会变得显而易见。
二是本项目中的库包含一个新的trait:ThreadPool
。它包含以下方法:
ThreadPool::new(threads: u32) -> Result<ThreadPool>
创建一个新的线程池,立即生成指定数量的线程。
如果未能生成线程,则返回错误。所有先前产生的线程都被结束。ThreadPool::spawn<F>(&self, job: F) where F: FnOnce() + Send + 'static
在线程池中运行一个函数。
运行操作总是成功的,但如果函数发生panic,线程池仍将继续以相同数量的线程运行 — 线程数不会减少,线程池也不会被析构、损坏或失效。
到这个项目结束时,该特性将有几个实现,您将再次执行基准测试来比较它们。
本项目完全不需要对客户端代码进行任何修改。
项目设置
继续上一个项目,删除之前的测试目录,并复制本项目的测试目录。这个项目应该包含一个名为kvs
的库,以及两个可执行文件,kvs-server
和kvs-client
。
Cargo.toml
中需要以下开发依赖项:
1 | [dev-dependencies] |
与以前的项目一样,添加足够的定义以使测试套件通过编译。
添加新模块thread_pool
:
1 | . |
在lib.rs
中添加thread_pool
模块:
1 | pub mod thread_pool; |
在mod.rs
中声明三种线程池:
1 | //! This module provides various thread pools. All thread pools should implement |
三种线程池都用最简单的标准线程做实现,以通过编译。这里以naive.rs
为例,其他两个也使用这样的实现即可:
1 | use std::thread; |
知识背景:阻塞和多线程
到目前为止,您已经在单线程上处理了所有请求,包括读取和写入(例如”get”和”set”)。换句话说,数据库中的所有请求都是串行的。使用我们将在这个项目中重复的图表,时间流向如下所示:
1 | thread |
读取和写入操作都可能需要阻塞。阻塞是指线程在等待访问资源时停止执行(例如等待访问文件中的数据,或等待访问受锁保护的变量)。当一个线程在一个任务上被阻塞时,便可不能执行另一个任务。因此,在I/O密集型系统中,任何请求可能都在花费大量时间等待操作系统和内存从磁盘读取数据或将数据写入磁盘:
1 | +---------+----------------------------+---------+ |
The simplest way to put the CPU back to work while one request is blocked is to service requests on multiple threads, so that ideally our requests are all processed concurrently, and — if we have enough CPUs — in parallel:
在一个请求被阻塞时,让CPU能够继续工作的最简单方法,是创建多个线程一起提供服务。因此理想情况下,我们的请求都是并发处理的 — 如果我们有足够的 CPU — 即并行处理:
1 | thread |
所以这将是本项目的重点 — 并行处理请求。
第1部分:多线程
鉴于我们首次引入并发概念,就用最简单的方法:为每个传入的连接创建一个新线程,并响应该连接上的请求,然后让线程退出。如此,将工作分发到多个线程将提供怎样的性能收益?您预计延迟会受到怎样的影响?吞吐量呢?
第一步是为这种简单方法编写一个ThreadPool
实现,其中ThreadPool::spawn
将为每个生成的作业创建一个新线程。我们称之为NaiveThreadPool
(实际上它甚至不能算是一个真正的线程池,因为这个实现不会在各作业间重用线程,但仍需要符合我们的trait规范以供之后的基准测试比较)。
我们现在不关注更复杂的实现,因为即使将这个简单方法集成到我们现有的设计中仍需要不少努力。请注意,ThreadPool::new
构造函数接受一个threads
参数,用于指定池中的线程数。在此实现中,该参数将处于未使用状态。
现在来实现这个版本的ThreadPool
,然后我们将它集成到新的KvStore
中。
需要通过的测试用例:
thread_pool::naive_thread_pool_*
经过“项目设置”一节的修改,现在的代码已经能够通过cargo test --test thread_pool
的所有测试(实现时再修改)。
1 | ... |
第2部分:创建可共享KvsEngine
在我们将NaiveThreadPool
集成到KvsServer
之前,我们必须创建KvsEngine
trait并实现KvStore
(现在您可以忽略上一个项目中的 SledKvsEngine
,当然,您可以重新实现它作为这个项目的附加作业)。
回想一下项目需求规格规范,在本项目中,KvsEngine
会使用&self
作为self
,而不是上一个项目中的&mut self
。此外,还需要为每个实现,显式编写Clone
,并为它们隐式添加Send + 'static
。具体定义如下:
1 | pub trait KvsEngine: Clone + Send + 'static { |
这个trait签名提供了很代码实现相关的信息。首先想想当我们要使用多线程实现时,为什么需要引擎实现Clone
。参考Rust中其他并发数据类型的设计,例如Arc
。再想想为什么需要使用&self
,而不是&mut self
。您对共享可变状态了解多少?在这个项目结束时,请确保你理解这里的含义 — 这就是我们使用Rust的意义。
在这个模型中,KvsEngine
的行为类似于另一个对象的句柄,并且由于该对象需要在线程之间共享,它可能需要保持在堆上,并且因为该共享状态不能是可变的,它需要由一些同步原语保护。
因此,使用线程安全的共享指针类型将KvsEngine
、KvStore
中的数据移动到堆上,并将其保护在您选择的锁之后。
由于SledKvsEngine
实现了KvsEngine
,它可能也需要更改。
此时,您的单线程kvs-server
应该可以再次工作,但现在有了一个可以在以后跨线程共享的KvsEngine
。
需要通过的测试用例:
kv_store::concurrent_*
Arc
会将内容物放在堆上,并可以使用clone
创建指向同一个地址的引用;- 之所以修改trait定义之后可以不使用
mut
引用,是因为实现时将以类似self.writer.lock().unwrap().set(key, value)
的方式使用KvsEngine
的写操作,而读操作封装在RefCell
中即可共享; - 将
KvStore
中的内存索引改为可并发的Arc<SkipMap<...>>
方式,并将写对象封装在Arc<Mutex<...>>
中,以便在多个线程中执行写操作; - 创建一个单线程的
KvStoreReader
,将日志文件句柄封装在RefCell
中以便共享读操作。继续负责从gen
日志文件中读取CommandPos
处的json数据并解析为Command
返回; - 创建一个跨线程的
KvStoreWriter
接管原KvStore
中的写日志操作,包括set
、remove
、compact
;
第3部分:向KvsServer
添加多线程
让我们在这里快速回顾一下我们的架构:KvsServer
设置一个TCP套接字并开始监听;当收到一个请求时,将反序列化并调用KvsEngine
trait的一些实现来存储或检索数据;最后返回响应。KvsEngine
如何工作的细节与KvsServer
无关。
因此,在上一个项目中,您可能模糊地创建了一个循环,例如:
1 | let listener = TcpListener::bind(addr)?; |
现在您只需要做类似的事情,然后把循环内的工作都放在NaiveThreadPool
中执行。这会将数据库查询和响应工作放在与TCP监听不同的线程中,从而将大部分繁重的工作转移到其他线程,以允许接收线程处理更多请求。如此便能够增加吞吐量,至少在多核机器上是这样。
于是,您现在仍然有一对有效的客户端/服务端键值存储,只不过现在是多线程的。
- 在循环中
clone
KvsEngine
,然后将KvsEngine
和ThreadPool
一起传给线程执行函数serve
; - 此时
serve
由于有了KvsEngine
的clone
引用,也不需要通过self
调用引擎了,因此可以不再当做类函数使用; - 在
bin/kvs-server.rs
中创建服务时需要加上ThreadPool
参数,可以在Cargo.toml
中引入num_cpus
库,以在创建线程池时自动获取主机cpu个数。
第4部分:创建真正的线程池
所以现在你已经有了你的多线程架构,是时候编写一个真正的线程池了。您可能不会在实践中编写自己的线程池,因为可以使用已经过充分测试的线程池crate,不过通过自己编写线程池可以更有效的学习并发相关经验。在本项目接下来的部分中,您将像我们在上一个项目中对引擎所做的那样,抽象线程池,并将您的实现与现有线程池进行性能比较。
那么,什么是线程池?
其实也没什么复杂的。为了不给每个要执行的多线程作业都创建一个新线程,我们选择来维护一“池”的线程,并不停重用这些线程以避免不停的创建新线程。
那么,为什么?
因为可以提高性能。重用线程可以节省出少量性能,而在编写高性能应用程序时,每一点性能都很重要。想象一下创建一个新线程需要什么:
您必须有一个调用栈才能运行该线程,从而必须为该调用栈分配空间。虽然分配空间已然相当简单,但仍不如不分配来的简单。调用栈的分配方式取决于操作系统和运行时的细节,但可能涉及锁和系统调用。同样的,虽然系统调用也很简单,但是当我们处理Rust级别的性能时它们也就算不上简单了 — 减少系统调用是简单且常见的优化方式。然后必须仔细初始化该栈,以便第一个栈帧包含适当的基指针值以及栈的初始化函数序言中所需的任何其他值。在Rust中,栈需要配置一个保护页来防止栈溢出,从而保护内存安全。这需要另外两个系统调用,mmap
和mprotect
(尽管Linux上避免了这两个系统调用)。
而这只是设置调用栈。创建新线程至少也需要一个系统调用,而内核必须在内部对新线程进行计算。
在Rust中,C库libpthread库负责处理大多数这种复杂工作。
然后在某个时刻,操作系统在新栈上执行上下文切换,线程运行。当线程终止时,所有工作都需要再次撤消。
当使用线程池时,仅有池里的线程需要初始化开销,后续作业只是简单的上下文切换到池中的线程。
那么如何构建线程池?
有许多策略和权衡,但对于练习项目,您只需将使用一个共享队列将工作分配给空闲线程即可。这意味着您的“生产者”,即接受网络连接的线程,将作业发送到这个队列(或通道);而“消费者”,即池中的每个空闲线程,从该队列(通道)读取等待作业执行。这是最简单的工作调度策略,而且可能非常高效。那么,这种方法有什么缺点?
这里有三个重要因素需要考虑:
- 使用哪种数据结构来分配工作 — 应是一个队列,且应有一个发送者(“生产者”)负责监听TCP连接,同时应用许多接收者(“消费者”)即池中的线程。
- 如何处理panic的作业 — 线程池将运行任意作业。如果一个线程发生panic,线程池需要以某种方式恢复。
- 如何应对退出 — 即当
ThreadPool
对象超出作用域时,它需要关闭每个线程,一定不能不管这些线程。
这些问题都是相互交织的,因为每一个问题都可能涉及线程间的通信和同步。有些解决方案会很简单,每个问题的解决方案都可以优雅地协同工作;有些解决方案会很复杂,这些问题的解决方案可能相互独立独立且相互交织。仔细选择您的数据结构,聪明的运用这些数据结构的特点。
您将通过在某些并发队列类型上发送消息来分发工作(Rust中的并发队列通常是具有两种连接类型的数据结构:发送者类和接收者类;任何实现了Send
+ 'static
的类型都可以在这两个类之间传递)。
Rust中的消息通常表示为枚举类型,每个可能发送的消息都有相应变量,例如:
1 | enum ThreadPoolMessage { |
这往往是一种更简单、更有效的解决方案,而不是为了不同目的而试图“兼顾”多个通道。当然,如果只有一种类型的消息,则不需要枚举。现在,上面的示例并不一定是管理线程池所需消息集的全集,这具体取决于设计。比如,如果您的队列返回值表明发送方已被销毁,则通常可以隐式shutdown。
现在有许多种的多线程队列。在Rust中最常见的是mpsc
通道,就包含在Rust标准库中。这是一个多生产者、单消费者队列,因此将其用于单队列线程池将需要某种锁。在这里使用锁有什么缺点?Rust中还有许多其他并发队列类型,每种都有优缺点。如果您愿意同时锁定生产者和消费者,那么您甚至可以使用Mutex<VecDeque>
,不过如果存在更好的解决方案,就没有理由在生产中这么做了。
历史趣事:Rust标准库中包含通道这件事有些奇怪,并且被一些人认为是一个错误,因为它背离了Rust的理念 — 保持最小化标准库、专注抽象操作系统,并让crate生态提供高级数据结构。它们的存在是Rust开发史中刻意为之的,可能来源于Go这样消息传递语言。其他库(例如crossbeam
)提供了更复杂的替代方案,为不同场景提供更合适的选项😉。
您的线程池将需要处理作业函数产生panic的情况 — 放任panic销毁线程可能会使池中的线程快速耗尽。因此,如果池中的某个线程panic,您需要确保线程总数不会减少。那该怎么办?您至少有两个选择:当有线程销毁时立刻新建另一个线程,或者捕获panic并保持现有线程运行。这需要您做出权衡并选择其中一种方式,在您的代码中注释您的选择。
您可能会用到的工具有:thread::spawn
、thread::panicking
、catch_unwind
、mpsc
通道、Mutex
、crossbeam的MPMC通道、thread
的JoinHandle
。按照您的需求选择合适的工具。
创建SharedQueueThreadPool
类,实现ThreadPool
。
需要完成的测试用例:
shared_queue_thread_pool_*
将KvsServer
中用到的NaiveThreadPool
替换为SharedQueueThreadPool
。同样,您的kvs-server
应该仍然像以前一样工作,只不过这次的多线程模型更高效一些。您应当使用恰当的数量构造线程池,这里推荐使用num_cpus
crate,以为每个CPU创建一个线程。稍后我们将再次讨论线程个数。
- 利用
crossbeam::channel
创建一对tx
/rx
(发送者/接收者),tx
用于在线程池的spawn
中向channel
发送作业,rx
负责接收并执行这些作业; - 我们目前的策略为放任线程panic,并立刻创建新线程。要实现这一策略需要将
rx
封装在新类TaskReceiver
中,然后控制该类的Drop
行为,当发生thread::panicking()
时,克隆本TaskReceiver
实例并放入新建的线程中以继续接收新的作业; - 线程的执行函数,只需使用
loop
持续等待作业并执行作业中的函数即可。
1 | use std::thread; |
第5部分:抽象线程池
与在之前的项目中抽象出KvsEngine
来比较不同的实现一样,现在您应抽象出ThreadPool
来做类似的事。
如果您还没有这么做,请向KvsServer
添加第二个类型参数以表示ThreadPool
实现,构造函数以线程池作为第二个参数,并使用该线程池来分发作业。
最后,使用rayon
crate中的ThreadPool
创建另一个ThreadPool
,以实现RayonThreadPool
。
Rayon
的线程池使用一种称为“工作窃取”的更复杂的调度策略,预计它的性能会比我们的实现更好,不过在我们尝试之前谁知道呢!
1 | use super::ThreadPool; |
第6部分:评估您的线程池
现在您将编写6个基准测试,一个写入繁重的工作负载比较SharedQueueThreadPool
在不同线程数下的性能,一个读取繁重的工作负载比较SharedQueueThreadPool
在不同线程数下的性能;再写两个测试,类似前面两个只不过用来测试RayonThreadPool
;最后还有两个将RayonThreadPool
与SledKvsEngine
结合使用。
看上去要写6个其实并没有那么多 — 因为后四个基本上是复制前两个。
注意:接下来的两节描述了一组相当复杂的基准测试。它们确实可以实现出来(可能……还没有人写过),但有效理解和高效编写都比较难。两节确实介绍了一些有用的criterion
特性,但如果对你来说内容过于庞杂,可以选择跳过(也可以告诉我们有哪些内容不适合你)。不过,这里的困难可能是一个很好的学习机会。最后,实现这些基准测试需要实现以编程方式关闭KvsServer
的方法(即不发送SIGKILL
并让操作系统执行退出),我们之前还没有讨论过。
其中一部分工作就是让将前一个项目中的SledKvsEngine
在本项目的多线程环境中再次工作。这应该不难,因为sled
可以被克隆并在线程间发送,就像您编写的引擎一样。
希望结果会很有趣。
您将再次使用criterion
。
这些将是参数化基准测试,即使用不同参数多次运行的单个测试,而criterion
支持使用参数作为基准测试的输入。这里的基准测试参数将是线程池中的线程数。
您将尝试测试服务器在各种条件下的吞吐量。您将同时发送许多请求,等待响应,然后结束。您可能会好奇,服务器CPU数量与线程总数的关系,是如何影响吞吐量的;您的线程池与rayon
相比性能如何;以及在多线程环境下您的KvStore
与SledKvsEngine
的比较。
由于您的KvsClient
(可能会)被阻塞,即请求后等待响应,这会使测试变的复杂。如果是非阻塞的,那么您可以发送许多请求而无需等待响应,然后再收集响应。而使用阻塞的KvsClient
,您将需要在独立的线程中发送每个请求,以使服务饱和。
在进行基准测试时,一定要清楚您希望评估哪些代码,并尽可能只去测试那一部分代码。像criterion
这样的基准测试库在一个循环中多次运行一段代码,测量它通过每个循环所花费的时间。因此,应当只将您想要评估的代码放入循环中,并将无关代码尽可能的留在循环外。
因此,以这个带有输入的简单criterion
为例:
1 | let c = Criterion::default(); |
iter
多次调用您的闭包,测量每次迭代。但是如此就需要事先设置大量线程,您并不希望这么做。如果只需要设置一次,就能够在多次迭代中使用,则应该将设置过程放在闭包之外,例如:
1 | let c = Criterion::default(); |
只评估b.iter
闭包中的代码,其余的环境设置代码都放在前面。
如果设置无法放在循环之前,那么另一种策略是使设置占用的工作量小于想要评估的代码的工作量,例如添加循环。也要考虑基准测试中的“析构”部分,通常指运行drop
的成本。
如果您有一个阻塞客户端,则客户端将需要许多线程,而在执行循环之前,您只有一次机会创建这些线程。因此,在基准测试执行迭代前,您需要设置一堆可复用的线程。幸运的是,SharedQueueThreadPool
就是一个很好的工具。为每个请求设置一个线程,并将其与某个通道配对,以报告收到响应,这就成了一个合适的基准测试工具。
现在开始编写前两个基准测试
前面提到这是一个参数化基准测试,参数就是服务器线程池中要使用的CPU核数。我们想看看只有1个、2个、4个等每个偶数一直到CPU核数的2倍时,吞吐量都是什么表现。至于为什么是2倍,也许拥有比内核更多的线程可能会有好处,您将通过实验来发现。
对于密集写入的作业,在环境设置期间(即调用b.iter(...)
之前)先创建KvsServer<KvStore, SharedQueueThreadPool>
,线程池使用参数化的线程数。然后编写一个作业,为1000个等长的键设置相同的值。请注意,尽管键不同,但为了测试结果的一致性,应在每个循环中使用同一套键。
然后在每次线程写入键和值后,也应该assert!
调用成功(以确保作业执行中没有出错),从而表明作业成功完成。当所有线程都完成后,基准测试线程继续运行并结束迭代。实现这种信号式结束的最直观方法是让每个作业线程将消息发送回基准测试线程,但请记住,这些信号代码是与您希望评估的代码毫无关系的开销,因此它的工作量应尽可能少。您可以只使用一条消息,或者使用其他并发类型,仅向基准测试线程发出一次信号吗?
将此基准测试命名为write_queued_kvstore
(或其他)。
对于密集读取作业,在环境设置期间先创建KvServer<KvStore, SharedQueueThreadPool>
,线程池使用参数化的线程数。然后编写保护1000个线程的客户端线程池。仍然在环境设置阶段,创建另一个客户端并初始化1000个不同的等长键,并全部使用相同的值。
然后,在基准测试循环中,为客户端生成1000个检索相同键/值解析的作业,然后assert!
结果是正确的。最后,像以前一样,向基准测试线程发送一条消息,表示读取已完成。
将此基准测试命名为read_queued_kvstore
(或其他)。
的确有不少工作要做。
您可以像往常一样使用cargo bench
运行这组criterion
基准测试。
只不过这次您有更多工作要完成。由于您将要在多个参数上执行相同的基准测试,即以线程池中的线程数为参数,如果能将这些结果体现在一个漂亮的图表中,以看到不同线程数的影响,将会使测试结果更加直观。
恰好criterion
就有这个功能!
请再次并阅读有关使用参数作为基准测试的输入的内容。文章解释了如何制作输入的基准测试图标。您注意到了什么?当您的线程数接近服务器CPU核数时会发生什么?当线程数超过服务器的线程数时会发生什么?您认为是什么导致了测试结果中的趋势?结果取决于许多因素,因此您的结果可能与其他任何人都不同。
这是始终进行基准测试而不非推测性能的一个很好的理由。我们可以做出有根据的猜测,但直到我们测试才会知道结果。
第7部分:评估其他线程池和引擎
到这里,您已经解决了基准测试练习中最困难的部分。现在您只需在之前的基础上做更多配置性工作即可。
拿您之前写的那两个基准,然后复制粘贴三遍。将这些副本中的SharedQueueThreadPool
更改为RayonThreadPool
。
将第三个和第四个命名为read/write_rayon_kvstore
(或其他)。这两个将与前两个SharedQueueThreadPool
实现进行比较,以了解您的实现与RayonThreadPool
之间的区别。
第五个和第六个,命名为read/write_rayon_sledkvengine
,将引擎改成SledKvsEngine
。这些您将与前两个进行比较,以了解您的KvsEngine
与多线程环境中的sled
有什么区别。
和以前一样,运行并绘制所有这些基准测试。如上所述将它们相互比较,在各种线程数下,您的调度与rayon
相比如何?在各种线程数下,您的存储引擎与sled
相比如何?结果令人惊讶吗?你能想象为什么存在差异吗?
扩展1:比较函数
现在,您为三个不同的线程池执行了相同的基准测试,您运行了也比较了它们的性能。criterion
内置支持比较多个实现。查看Criterion用户手册中的“比较函数”并修改您的基准测试,以便让criterion
自己进行比较,看看那些华丽的图表。
背景:锁的极限
在本项目前期,我们建议通过将KvsEngine
内部数据放在堆上并保护在锁之后来保证其线程安全。您可能立即意识到这不会提高吞吐量,因为它只是将一种阻塞换成了另一种阻塞 — 将原来的阻塞磁盘访问换成了现在的阻塞互斥访问。
所以到目前为止,我们所取得的成就是:
1 | thread |
在上一节中,您对您的引擎与SledKvsEngine
的多线程吞吐量进行了基准测试。希望您已经发现,您的多线程实现比sled
的性能要差得多(如果不是,要么是您的实现非常棒,要么是sled
出了什么问题)。到目前为止,添加多线程导致的性能比单线程实现更差 — 现在您的实现还需要执行线程间上下文切换的额外工作,以及为了保证互斥锁而强加的阻塞。
因此,对于项目的这一部分将变得更加复杂。用锁保护整个状态很容易 — 整个状态总是以原子方式读写,于是一次只有一个客户端可以访问整个状态。但这也意味着想要访问共享状态的两个线程必须互相等待。换句话说,当KvsEngine
受互斥锁保护时,尽管是多线程的,但服务器中的实际并发量非常少。
高性能、可扩展、并行的软件倾向于尽可能避免锁和锁争用。与大多数语言相比,Rust使复杂且高性能的并发模式变的更容易(因为您无需担心数据竞争和程序崩溃),但它并不能阻止您写出可能导致错误程序行为的逻辑bug。
所以你仍然需要对并发进行一些认真的思考。幸运的是,Rust crate生态中有许多复杂的并行编程工具,因此您的任务通常只是了解它们是什么以及如何将它们组合在一起,而不是了解如何编写自己的复杂无锁数据结构。
让我们看一些更复杂的例子。我们将以单线程KvStore
为例,并考虑如何将其改为线程安全的。
这是一个单线程KvStore
示例,就像您在早期项目中创编写的那样(这是课程示例项目中的简化版本):
1 | pub struct KvStore { |
而这是简单的多线程版本,用锁保护一切。希望您的实现看起来已经类似这样:
1 |
|
如此Arc<Mutex<T>>
的解决方案,简单、正确且常见:
对于许多情况,这是一个完全合理的解决方案。但在本项目中,互斥锁将成为锁争用的源头:互斥锁不仅会串行化SharedKvStore
的写访问,还会串行化读访问。任何想要使用KvStore
的线程都需要等待Mutex
被另一个线程解锁。所有请求都会阻塞任何其他并发请求。
我们真正想要的是不必使用锁,或者 — 如果确实需要锁的 — 它们尽量少的与其他线程竞争。
互斥锁的进阶是RwLock
,即“读写锁”。这是每个并行软件开发者都必须知道的另一种常见锁。读写锁对互斥体的改进是它允许任意数量的读取,或单个写入。即用Rust术语,RwLock
可以同时支持任意数量的&
引用,或单个&mut
引用。读会被写阻塞,写会阻塞其他所有读和写。
在我们的数据库中,这意味着可以同时满足所有读取请求,但是当单个写入请求进入时,系统中的所有其他活动都会停止并等待该写操作完成。实现这一点很简单,基本上就是将Mutex
换成RwLock
。
现在,再次考虑我们的多线程示意图,最终的处理流程如下所示:
1 | thread |
第8部分:无锁读
对于本项目,我们想要尝试去创建永远不会锁定的读取器,即发生并发写入也依然不会阻塞读取。无论写入请求如何,都可以始终为读取请求提供服务。(写如现在仍然会阻塞其他写操作 — 除了成为一项具有挑战性的并行编程问题,并行写入本身是否有意义仍是一个难以回答的问题)。
我们最终的期望是:
1 | thread |
如果能做到这一点,那么我们将实现无锁的读操作:即使有读操作在等待来自文件系统的数据(被阻塞),所有类型的其他操作(读和写)都可以继续进行。不过,这仍然不足以保证系统始终可以为读请求提供服务。想想如果在大小为N
的线程池上存在N
个阻塞的写请求会发生什么。稍后你需要考虑这个问题,但现在,您的重点是从读操作中移除锁。
与Mutex
和RwLock
不同,并没有一种封装类型可以应用于整个任意共享状态,以实现同时读取和写入的目标(至少不能同时还具有高性能)。
这意味着我们需要考虑如何使用SharedKvStore
的每个字段,选择正确的同步方案以允许执行尽可能多的线程,并能够继续保持数据的逻辑一致性。
这就是多线程真正困难的地方。如果你移除那个大锁,Rust仍然会保护您免受数据竞争的影响,但它并不能帮助您保持数据的逻辑一致性。
所以在考虑解决方案之前,让我们考虑一下我们的需求。我们要:
- 同时在多个线程上读取内存索引和磁盘日志;
- 将命令写入磁盘,同时维护内存索引;
- 读与写并行,因此:
- 一般来说,为了保证读与写并发时总是能够读到一致的状态,这意味着:
- 维护一个不变量,使其实在指向日志中的一个有效命令;
- 维护一些适当的不变量以供其他的记录行为使用,比如为了下例
uncompacted
的记录;
- 定期压缩磁盘数据,同时为读操作维护不变量;
本节的其余部分是对有助于实现上述目标的各种背景知识的介绍,也这是项目的最终目标:修改KvStore
以同时执行读取和写入。
示例数据结构的解释
为了更具体的讨论,我们将需要一个示例,以展示应受保护的数据和应被维护的不变量。下面是一个KvStore
实现及其字段的示例:
1 | pub struct KvStore { |
这是本项目示例的简化版本。
各字段的作用非常明确:
path
:PathBuf
只是日志的目录路径,永远不会改变 — 它是不可变的,不可变的类型在Rust中是同步的,所以它甚至根本不需要任何保护。它可以被任何线程通过共享引用同时读取。
reader
:HashMap<u64, BufReaderWithPos<File>>
是当前日志文件的读句柄。它是可变的,即压缩后会指向新的日志文件。
writer
:BufWriterWithPos<File>
是当前日志文件的写句柄。任何写入都需要对writer
的可变访问,并且压缩过程需要修改writer
和current_gen
。
index
:BTreeMap<String, CommandPos>
是数据库中每个键到其所在日志文件中具体位置的内存索引。它从可以从每个读线程读取,从每个写线程写入,即使在压缩期间也能正常访问。
uncompacted
:u64
仅用于计算日志中已被后续写入命令取代的“陈旧”命令的总长度,用以触发日志压缩操作。
在以前的项目中,我们不必担心写、读和压缩之间的交叉会产生不一致的结果,因为它们都发生在同一个线程上。现在,如果您对数据结构的选择和使用不够小心,很容易破坏数据库的状态。
消除锁的策略
高级并行编程的关键是了解可用的工具以及使用它们的时机。以下是我们在实施此项目时发现的一些有用的技术,其中一些您也将用到。这些技术将以上面的数据结构为例进行讲解。
理解并维护顺序一致性
(请注意,“顺序一致性”一词有其准确含义,但在这里我们只是概括性地讨论如何确保以特定顺序执行作业)。
理解并行编程的关键在于理解代码各部分的“执行顺序”间的关系。在这个线程中,要让本线程早于其他线程看到数据,共享数据结构需要怎样改动?要让本线程早于其他线程暴露内部数据,共享数据结构需要怎样改动?我如何保证执行结果?
在单线程代码中,推断出任意行代码之前发生了什么很简单 — 如果代码写在前面,则会在前面执行,反之则会在后面执行。但这实际上并不是这样,即使在单线程代码中:为了使代码更高效的运行,CPU和编译器都会重新组织代码执行的顺序,只不过CPU使用器机码,编译器使用用以生成机器码的内部编码。实际上的代码执行顺序跟您编写代码的顺序并不相同,代码只是看起来在按您编写的顺序运行,因为CPU和编译器都会跟踪数据依赖,它们并不会打乱依赖顺序重新排列操作。
在多线程代码中,CPU和编译器仍使用与单线程相同的条件重新排序代码,而且您的代码块会被打碎重排,除非您使用同步类型和操作告诉编译器不允许重新排序。
任何必须在特定操作之前或之后发生的操作,都必须显式使用同步类型或操作,如使用锁、原子类型等等。
在上面的例子中,文件写入和内存索引写入显然应以特定的顺序发生 — 如果索引在文件更新前更新会发生什么?另外,例子中包含另一个状态,即未压缩命令的总长度uncompacted
。错误的计算未压缩命令的长度将产生什么影响?如果在将数据写入文件之前就可以看到uncompacted
发生了变化可能还好,但是必须为每个这样的独立同步值制定策略。
确定不可变值
你可能已经了解了很多关于Rust中的不可变性,以及不可变值如何轻松的在线程间共享(它们具有Sync
trait)。不可变值最适合并发 — 只需将它们放在Arc
后面即可。
在本例中,PathBuf
是不可变的。
复制值而不是复制共享
在Rust中我们可能不太喜欢克隆,特别是克隆大小不确定的类型,如String
和Vec
。但是克隆的存在是完全合理的:在某些情况下避免克隆可能非常困难,而且CPU也非常擅长复制内存缓冲区。此外,在本例中,服务端所需的状态副本的数量,实际上受线程池中线程数量的限制。
在本例中,PathBuf
也很容易克隆。
考虑一个潜在问题,如何跨线程共享对文件的访问。File
类型对读和写操作均需要可变访问。因此,其跨线程共享需要使用锁以授权可变访问。那么,什么是文件?它并不是一份实际上的文件 — 而只是磁盘上物理资源的句柄,一个文件同时存在多个打开句柄是可以的。注意File
的API — 并没有实现Clone
,虽然它确实有个诱人的try_clone
方法,它的语义对多线程代码会产生复杂的影响,如seek原文件的位置会同步到try_clone
创建的另一个文件。请考虑File::open
和try_clone
中文件之间的区别。使用try_clone
还是File::open
,将是您的选择。查看pread
可能会有所帮助。
按角色分解数据结构
在例子中,我们有两个明确的角色:读和写(也许还有第三个用于压缩)。将读逻辑和写逻辑分离并各封装为一个并发类型在Rust中很常见。读逻辑有自己的数据集,写逻辑也一样,这就为封装提供了很好的条件,所有读操作划为一种类型,而所有写操作划为另一种类型。
这种划分将进一步使两者都访问哪些资源变得非常明显,因为读类型和写类型都将包含这些资源的共享句柄。
使用专门的并发数据结构
知道哪些工具可用以及该在哪些场景中使用可能是并行编程中最困难的部分。除了学校里教的基础锁类型,同步数据类型也变得越来越专业化。
在本项目中,由于内存索引是某种类型的关联数据结构(也称为“映射”),如树或哈希表,那么我们自然会想到是否存在并发关联数据结构。
确实存在,且正确的使用这些数据结构是完成本项目的关键。
但是怎么才能找到这些类型呢?第一步是确定是否存在并发映射。您可以阅读Rust Discord上的#beginners
部分,但对于本项目,在网络上搜”concurrent map”即可。
这是容易的部分,在Rust中找到正确的并发映射类型则更困难。比较好的起手式是访问libs.rs。libs.rs与crates.io类似,但crates.io包含所有已发布的库,而libs.rs仅包含受到……好吧,某些人好评的库。因此,如果一个库在libs.rs上,就是库可用的一个指示,另一个是crates.io上的下载计数 — 通常,下载越多的crates被测试的越细。下载计数可以粗略的看作是“担保”库的人数。最后,在聊天中提问也是一个好主意。
将清理延后
像克隆一样,垃圾收集在Rust中经常遭到反对 — 避免GC几乎就是Rust存的意义。但实际上垃圾收集是不可避免的,“垃圾收集”和“内存回收”实际上是同义词,各种语言都复合使用了多种垃圾收集策略。GC策略轴的一端,在没有自动内存管理的语言(如C)中,垃圾收集完全由程序员决定,如使用malloc
和free
。另一端则是垃圾收集语言,如 Java,所有内存都由一个通用垃圾收集器管理。
但实际上,在C中并不是所有内存的管理和回收都使用malloc
/free
,在Java也并不是所有内存管理都通过GC完成。举个简单的例子,两者中的高性能应用程序通常都依赖于专门的内存区域,在这些区域既可以重用内存也可以大量解除分配,以优化其内存访问模式。
同样在Rust中,并非所有内存都被显式地释放。比如在Rc
和Arc
类型中实现了资源计数,也算算算一种简单的GC。
全局垃圾收集器的最大好处之一,就是使许多无锁数据结构成为可能。学术文献中描述的许多无锁数据结构都依赖于GC执行。crossbeam
库及其epoch
类型的出现,就是在不依赖GC的情况下实现无锁算法。
也就是说垃圾收集有多种形式,其“将资源清理延迟到未来某个时间”的基本策略在许多场景中都很强大。
当您现在不知道如何执行一些并发工作时,可以试着考虑:“可以稍后再做吗?”。
与原子类型共享标志和计数器
在底层,大多数并发数据结构都是使用原子操作或“原子类型”实现的。原子类型在单个内存单元上运行,一般在8到128个字节之间,通常是字长(与指针的字节数相同,并且与Rustusize
类型大小相同)。如果两个线程正确使用原子类型,则一个线程中的写入结果将立即对另一个线程中的读取可见。除了使读取或写入立即可见之外,在Rust中,原子操作还通过Ordering
标志限制编译器和CPU重新排序指令的方式。
当从粗粒度并行中的锁,转向更细粒度的并行时,通常需要使用原子类型来增强现成的并发数据结构。
实现无锁读
以上就是相关的背景知识。希望这些知识已经开始引导您考虑了很多东西,并朝着正确的方向前进了。现在轮到实现了:
修改KvStore
以同时执行读取和写入。
1 | use std::cell::RefCell; |
干得漂亮,朋友。休息一下吧。