赞
踩
安全且高效地处理并发编程是 Rust 的另一个主要目标。并发编程(Concurrent programming),代表程序的不同部分相互独立地执行,而并行编程(parallel programming)代表程序不同部分同时执行,这两个概念随着计算机越来越多的利用多处理器的优势而显得愈发重要。由于历史原因,在此类上下文中编程一直是困难且容易出错的:Rust 希望能改变这一点。
起初,Rust 团队认为确保内存安全和防止并发问题是两个分别需要不同方法应对的挑战。随着时间的推移,团队发现所有权和类型系统是一系列解决内存安全和并发问题的强有力的工具!通过利用所有权和类型检查,在 Rust 中很多并发错误都是编译时错误,而非运行时错误。因此,相比花费大量时间尝试重现运行时并发 bug 出现的特定情况,Rust 会拒绝编译不正确的代码并提供解释问题的错误信息。因此,你可以在开发时修复代码,而不是在部署到生产环境后修复代码。
在大部分现代操作系统中,已执行程序的代码在一个进程(process)中运行,操作系统则会负责管理多个进程。在程序内部,也可以拥有多个同时运行的独立部分。这些运行这些独立部分的功能被称为线程(threads)。
将程序中的计算拆分进多个线程可以改善性能,因为程序可以同时进行多个任务,不过这也会增加复杂性。因为线程是同时运行的,所以无法预先保证不同线程中的代码的执行顺序。这会导致诸如此类的问题:
Rust 尝试减轻使用线程的负面影响。不过在多线程上下文中编程仍需格外小心,同时其所要求的代码结构也不同于运行于单线程的程序。
编程语言有一些不同的方法来实现线程,而且很多操作系统提供了创建新线程的 API。Rust 标准库使用 1:1 线程实现,这代表程序的每一个语言级线程使用一个系统线程。
在 Rust 中,你可以使用 std::thread::spawn
来创建多个线程。以下是一个创建多个线程并让它们执行不同任务的例子:
use std::thread; use std::time::Duration; fn main() { // 创建一个向量来存储JoinHandle let mut handles: Vec<thread::JoinHandle<()>> = Vec::new(); // 创建并启动多个线程 for i in 0..5 { let handle = thread::spawn(move || { // 打印线程编号 println!("Thread number {} is running", i); // 模拟一些工作 thread::sleep(Duration::from_millis(500 * i)); // 打印线程完成的消息 println!("Thread number {} finished", i); }); // 将JoinHandle存储在向量中 handles.push(handle); } // 等待所有线程结束 for handle in handles { handle.join().unwrap(); } println!("All threads have finished execution."); }
在这个例子中,我们首先创建了一个 Vec<thread::JoinHandle<()>>
来存储每个线程的 JoinHandle
。然后,我们使用一个 for
循环来创建 5 个线程。每个线程都执行一个移动(move
)闭包,该闭包打印线程编号,休眠一段时间,然后打印完成消息。移动闭包通过 move
关键字捕获循环变量 i
的值,使得每个线程都有自己的 i
副本。
在所有线程创建之后,我们遍历 handles
向量,调用每个 JoinHandle
的 join
方法。这会阻塞主线程直到对应的线程结束。如果线程成功结束,join
方法会返回 ()
。如果线程由于 panic 而结束,join
方法会返回一个错误,这里我们使用 unwrap
来获取结果并忽略潜在的错误。最后,当所有线程都完成后,我们在主线程中打印一条消息。
请注意,在实际应用中,你可能需要更细致地处理 join
方法返回的结果,以确保能够适当地响应线程中的 panic。
一个日益流行的确保安全并发的方式是消息传递(message passing),这里线程或 actor 通过发送包含数据的消息来相互沟通。这个思想来源于 Go 编程语言文档中 的口号:“不要通过共享内存来通讯;而是通过通讯来共享内存”。
为了实现消息传递并发,Rust 标准库提供了一个信道(channel)实现。信道是一个通用编程概念,表示数据从一个线程发送到另一个线程。编程中的信息渠道(信道)有两部分组成,一个发送者(transmitter)和一个接收者(receiver)。代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。当发送者或接收者任一被丢弃时可以认为信道被关闭(closed)了。
mpsc::channel
函数创建一个新的信道;mpsc
是多个生产者,单个消费者(multiple producer, single consumer)的缩写。简而言之,Rust 标准库实现信道的方式意味着一个信道可以有多个产生值的发送(sending)端,但只能有一个消费这些值的接收(receiving)端。
mpsc::channel
函数返回一个元组:第一个元素是发送端,而第二个元素是接收端。由于历史原因,tx
和 rx
通常作为发送者(transmitter)和接收者(receiver)的缩写。
信道的发送端有一个 send
方法用来获取需要放入信道的值。send
方法返回一个 Result<T, E>
类型,所以如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作会返回错误。
信道的接收者有两个有用的方法:recv
和 try_recv
。 recv
是 receive
的缩写。这个方法会阻塞线程执行直到从信道中接收一个值。一旦发送了一个值,recv
会在一个 Result<T, E>
中返回它。当信道发送端关闭,recv
会返回一个错误表明不会再有新的值到来了。
try_recv
不会阻塞,相反它立刻返回一个 Result<T, E>
:Ok
值包含可用的信息,而 Err
值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv
很有用:可以编写一个循环来频繁调用 try_recv
,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。
这里尝试在通过 tx.send
发送 data
到信道中之后将其打印出来。允许这么做是一个坏主意:一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); let sender_thread = thread::spawn(move || { let data = "Hello from sender!".to_string(); tx.send(data).unwrap(); println!("Sended: {}", data); }); let receiver_thread = thread::spawn(move || { let received = rx.recv().unwrap(); println!("Received: {}", received); }); sender_thread.join().unwrap(); receiver_thread.join().unwrap(); }
编译这一段代码会报错:
Compiling playground v0.0.1 (/playground)
error[E0382]: borrow of moved value: `data`
--> src/main.rs:10:32
|
8 | let data = "Hello from sender!".to_string();
| ---- move occurs because `data` has type `String`, which does not implement the `Copy` trait
9 | tx.send(data).unwrap();
| ---- value moved here
10 | println!("Sended: {}", data);
| ^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `playground` (bin "playground") due to 1 previous error
我们的并发错误会造成一个编译时错误。send
函数获取其参数的所有权并移动这个值归接收者所有。这可以防止在发送后再次意外地使用这个值;所有权系统检查一切是否合乎规则。
修正这段代码,就是把 tx.send
之后的打印语句去除,程序就可以正常运行了。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); let sender_thread = thread::spawn(move || { let data = "Hello from sender!".to_string(); tx.send(data).unwrap(); }); let receiver_thread = thread::spawn(move || { let received = rx.recv().unwrap(); println!("Received: {}", received); }); sender_thread.join().unwrap(); receiver_thread.join().unwrap(); }
运行结果
Received: Hello from sender!
在这个示例中,我们创建了一个信道 (tx, rx)
,tx
是发送端,rx
是接收端。我们创建了一个线程 sender_thread
来发送数据,并将数据的所有权通过 send
方法转移给接收端。接收端通过 recv
方法接收数据。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); // 创建一个线程发送多个值 let sender_thread = thread::spawn(move || { for i in 1..=5 { tx.send(i).unwrap(); println!("Sent {}", i); } }); // 创建一个线程接收值 let receiver_thread = thread::spawn(move || { for _ in 1..=5 { let received = rx.recv().unwrap(); println!("Received {}", received); } }); sender_thread.join().unwrap(); receiver_thread.join().unwrap(); }
运行结果
Sent 1
Sent 2
Sent 3
Sent 4
Sent 5
Received 1
Received 2
Received 3
Received 4
Received 5
在这个示例中,我们创建了一个线程发送一系列值(1 到 5),并在发送每个值后打印一条消息。接收端在接收到所有值之前会阻塞等待。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); // 克隆发送端以创建多个生产者 let tx_clone = tx.clone(); let sender_thread1 = thread::spawn(move || { for i in 1..=5 { tx.send(i).unwrap(); println!("Sender 1 sent {}", i); } }); let sender_thread2 = thread::spawn(move || { for i in 5..=10 { tx_clone.send(i).unwrap(); println!("Sender 2 sent {}", i); } }); // 接收端 let receiver_thread = thread::spawn(move || { let mut values = Vec::new(); for _ in 1..=10 { values.push(rx.recv().unwrap()); } println!("Received all values: {:?}", values); }); sender_thread1.join().unwrap(); sender_thread2.join().unwrap(); receiver_thread.join().unwrap(); }
运行结果
Sender 1 sent 1
Sender 1 sent 2
Sender 2 sent 5
Sender 2 sent 6
Sender 2 sent 7
Sender 2 sent 8
Sender 2 sent 9
Sender 2 sent 10
Sender 1 sent 3
Sender 1 sent 4
Sender 1 sent 5
Received all values: [1, 5, 2, 6, 7, 8, 9, 10, 3, 4]
在这个示例中,我们克隆了发送端 tx
来创建两个生产者 sender_thread1
和 sender_thread2
。每个生产者都发送一系列值,而接收端接收所有值并将它们存储在一个向量中。
请注意,在实际应用中,你可能需要考虑使用更高级的并发原语,如 std::sync::Arc
来共享状态,或者使用 std::sync::Barrier
来同步线程。此外,错误处理在多线程程序中也非常重要,这里为了示例的简洁性,我们使用了 unwrap
来处理可能的错误。在生产代码中,你应该更细致地处理这些情况。
在某种程度上,任何编程语言中的信道都类似于单所有权,因为一旦将一个值传送到信道中,将无法再使用这个值。共享内存类似于多所有权:多个线程可以同时访问相同的内存位置。
互斥锁(mutex)是 mutual exclusion 的缩写,也就是说,任意时刻,其只允许一个线程访问某些数据。为了访问互斥锁中的数据,线程首先需要通过获取互斥锁的锁(lock)来表明其希望访问数据。锁是一个作为互斥锁一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥锁为通过锁系统保护(guarding)其数据。
互斥锁以难以使用著称,因为你不得不记住:
正确的管理互斥锁异常复杂,这也是许多人之所以热衷于信道的原因。然而,在 Rust 中,得益于类型系统和所有权,我们不会在锁和解锁上出错。
Mutex<T>
是 std::sync
模块提供的一种同步原语,用于在多线程环境中保护共享数据。Mutex<T>
代表互斥锁(Mutex),它允许你以线程安全的方式对数据进行访问。以下是 Mutex<T>
的 API 详细介绍:
构造函数
Mutex::new(data: T)
: 创建一个新的 Mutex
,并将 data
作为内部数据。方法
lock()
: 获取互斥锁,返回一个可变引用 MutexGuard
。如果锁当前被其他线程持有,则当前线程将阻塞,直到锁被释放。try_lock()
: 尝试获取互斥锁,如果成功则返回 Some(MutexGuard)
,如果失败(锁已被其他线程持有)则返回 None
,不会阻塞当前线程。get_mut()
: 返回对内部可变数据的可变引用。这个方法需要 Mutex
已经获得锁,通常在使用 lock
或 try_lock
之后使用。into_inner()
: 消耗 Mutex
并返回内部数据。这个方法只有在 Mutex
没有被锁定的情况下才能成功使用,否则会 panic。实例方法
is_poisoned()
: 检查互斥锁是否处于“中毒”状态。如果一个线程在持有互斥锁时 panic,那么锁将进入中毒状态。其他线程在尝试获取这个锁时会收到一个 PoisonError
。unlock()
: 释放互斥锁。通常,互斥锁会在 MutexGuard
离开作用域时自动释放,但在某些情况下,你可能需要手动释放锁。类型
MutexGuard<'a, T>
: 表示对互斥锁的锁定访问。它是通过调用 lock
或 try_lock
获得的。当 MutexGuard
离开作用域时,互斥锁会自动释放。错误处理
PoisonError<T>
: 当互斥锁中毒时,lock
和 try_lock
方法会返回这个错误。它包含内部数据的一个可变引用,允许你安全地处理中毒情况。当然,这里有一个使用 Mutex<T>
的 Rust 程序示例,它演示了如何在多个线程之间共享和修改数据:
use std::sync::{Arc, Mutex}; use std::thread; fn main() { // 创建一个Arc包装的Mutex,以便跨多个线程安全共享 let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; // 创建并启动10个线程 for _ in 0..10 { // 克隆Arc来获取counter的一个新引用 let counter_clone = Arc::clone(&counter); let handle = thread::spawn(move || { // 通过lock获取互斥锁的访问权 let mut num = counter_clone.lock().unwrap(); // 修改数据 *num += 1; }); handles.push(handle); } // 等待所有线程完成 for handle in handles { handle.join().unwrap(); } // 打印最终的计数结果 println!("Result: {}", *counter.lock().unwrap()); }
运行结果
Result: 10
这个程序执行以下步骤:
Arc::new
和 Mutex::new
创建一个在多个线程间共享的计数器。handles
来存储所有线程的句柄。Arc
对象,以确保每个线程都有 counter
的独立引用。Mutex
的锁,递增计数器的值,然后释放锁。thread::spawn
启动每个线程。这个程序使用了 Arc
(Atomic Reference Counting) 来允许 Mutex
在多个线程间安全共享。Arc::clone
用于增加内部引用计数,而不是复制数据。每个线程通过调用 lock
方法来获取互斥锁的可变访问权,并通过 unwrap
处理可能的锁定错误(在实际应用中,你可能需要更优雅地处理这种错误)。当 MutexGuard
离开作用域时,互斥锁会自动释放。
之前讨论的几乎所有内容,都属于标准库,而不是语言本身的内容。由于不需要语言提供并发相关的基础设施,并发方案不受标准库或语言所限:我们可以编写自己的或使用别人编写的并发功能。
然而有两个并发概念是内嵌于语言中的:std::marker
中的 Sync
和 Send
trait。
Send
标记 trait 表明实现了 Send
的类型值的所有权可以在线程间传送。几乎所有的 Rust 类型都是 Send
的,不过有一些例外,包括 Rc<T>
:这是不能 Send
的,因为如果克隆了 Rc<T>
的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数。为此,Rc<T>
被实现为用于单线程场景,这时不需要为拥有线程安全的引用计数而付出性能代价。
因此,Rust 类型系统和 trait bound 确保永远也不会意外的将不安全的 Rc<T>
在线程间发送。而使用标记为 Send
的 Arc<T>
时,就没有问题了。
任何完全由 Send
的类型组成的类型也会自动被标记为 Send
。几乎所有基本类型都是 Send
的,除了裸指针(raw pointer)。
Sync
标记 trait 表明一个实现了 Sync
的类型可以安全的在多个线程中拥有其值的引用。换一种方式来说,对于任意类型 T
,如果 &T
(T
的不可变引用)是 Send
的话 T
就是 Sync
的,这意味着其引用就可以安全的发送到另一个线程。类似于 Send
的情况,基本类型是 Sync
的,完全由 Sync
的类型组成的类型也是 Sync
的。
智能指针 Rc<T>
也不是 Sync
的,出于其不是 Send
相同的原因。RefCell<T>
和 Cell<T>
系列类型不是 Sync
的。RefCell<T>
在运行时所进行的借用检查也不是线程安全的。Mutex<T>
是 Sync
的。
通常并不需要手动实现 Send
和 Sync
trait,因为由 Send
和 Sync
的类型组成的类型,自动就是 Send
和 Sync
的。因为它们是标记 trait,甚至都不需要实现任何方法。它们只是用来加强并发相关的不可变性的。
手动实现这些标记 trait 涉及到编写不安全的 Rust 代码,当前重要的是,在创建新的由不是 Send
和 Sync
的部分构成的并发类型时需要多加小心,以确保维持其安全保证。
以下是一个完整的示例,展示如何在一个多线程程序中手动实现 Send
和 Sync
,以及如何在多个线程之间共享数据。
首先,我们定义一个简单的结构体 SharedData
,它包含了一些数据,我们将为这个结构体手动实现 Send
和 Sync
特征。
use std::sync::{Arc, Mutex}; use std::thread; // 定义一个简单的结构体,包含一些数据 struct SharedData { count: i32, } // 手动实现Send特征,表示SharedData可以安全地在线程间发送 unsafe impl Send for SharedData {} // 手动实现Sync特征,表示SharedData可以被多个线程安全地访问 unsafe impl Sync for SharedData {} fn main() { // 使用Arc来包装SharedData,使其可以被多个线程共享 let shared_data = Arc::new(Mutex::new(SharedData { count: 0 })); // 创建多个线程,它们将共享并修改同一个SharedData实例 let mut handles = vec![]; for i in 0..10 { let data = Arc::clone(&shared_data); let handle = thread::spawn(move || { let mut data = data.lock().unwrap(); println!("i: {}", i); data.count += i; }); handles.push(handle); } // 等待所有线程完成 for handle in handles { handle.join().unwrap(); } // 打印最终的count值 println!("Final count is: {}", shared_data.lock().unwrap().count); }
运行结果
i: 1
i: 0
i: 2
i: 9
i: 8
i: 4
i: 3
i: 5
i: 7
i: 6
Final count is: 45
在这个例子中,我们首先定义了一个 SharedData
结构体,它包含一个 i32
类型的字段 count
。然后我们手动为 SharedData
实现了 Send
和 Sync
特征,使用 unsafe
关键字,因为我们需要确保 SharedData
在多线程环境中是安全的。
在 main
函数中,我们使用 Arc
(原子引用计数指针)和 Mutex
(互斥锁)来创建一个可以在多个线程之间安全共享的 SharedData
实例。Arc
允许多个线程拥有对数据的所有权,而 Mutex
确保在任何时刻只有一个线程可以修改数据。
我们创建了 10 个线程,每个线程都会增加 SharedData
中的 count
字段的值。由于我们为 SharedData
实现了 Send
和 Sync
,我们可以安全地将 Arc<Mutex<SharedData>>
的克隆传递给每个线程。每个线程完成其工作后,我们使用 join
方法等待所有线程完成,并打印最终的 count
值。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。