并发程序是运行多个任务(或看起来如此)的程序,这意味着两个或多个任务在重叠的时间跨度内交替执行。这些任务由线程(最小的处理单位)执行。在幕后,这不是真正的多任务处理(并行处理),而是以人类无法察觉的速度在线程之间快速切换上下文。许多现代应用程序都依赖于这种幻觉;例如,服务器可以在等待其他请求的同时处理一个请求。当线程共享数据时,可能会出现许多问题,最常见的是:竞争条件和死锁。
Rust 的所有权系统和类型安全系统是解决内存安全和并发问题的强大工具。通过所有权和类型检查,大多数错误都是在编译时而不是运行时捕获的。这允许开发人员在开发期间而不是部署到生产环境后修复代码。一旦代码编译完成,您就可以相信它在多线程环境中安全运行,而不会出现其他语言中常见的难以追踪的错误。这就是 Rust 所说的无畏并发。
多线程模型
多线程编程的风险
在大多数现代操作系统中,运行程序代码在操作系统管理的进程内执行。在程序中,还可以有多个独立执行的组件,称为线程。
将程序计算拆分为多个线程可以提高性能,因为程序可以同时处理多个任务。但是,这也增加了复杂性。由于线程并发运行,因此无法保证不同线程中代码的执行顺序。这可能导致以下问题:
竞争条件,即多个线程以不一致的顺序访问数据或资源
死锁,两个线程互相等待对方释放各自持有的资源,从而阻止进一步的进展
仅在特定情况下发生且难以重现或持续修复的错误
编程语言有多种实现线程的方法。许多操作系统都提供 API 来创建新线程。当一种语言使用 OS API 创建线程时,这通常被称为1:1 模型,其中一个 OS 线程对应一个语言级线程。
Rust 的标准库只提供了1:1 线程模型。
使用以下方式创建新线程spawn
use std::thread;
use std::time::Duration;
fn main() {
let thread = thread::spawn(|| {
for i in 1..10 {
println!("this is thread {}", i);
thread::sleep(Duration::from_millis(1));
}
});
for k in 1..5 {
println!("this is main {}", k);
thread::sleep(Duration::from_millis(1));
}
}
输出:
this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5
我们看到,在主线程完成 5 次循环迭代并退出后,新创建的线程(尽管设计为 10 次迭代)只能执行 5 次迭代然后退出。当主线程结束时,新线程也会结束,无论它是否已完成。
如果我们希望新线程在主线程继续之前完成,我们可以使用JoinHandle:
use std::thread;
use std::time::Duration;
fn main() {
let handler = thread::spawn(|| {
for i in 1..10 {
println!("this is thread {}", i);
thread::sleep(Duration::from_millis(1));
}
});
for k in 1..5 {
println!("this is main {}", k);
thread::sleep(Duration::from_millis(1));
}
handler.join().unwrap(); // Block the main thread until the new thread finishes
}
输出:
this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5
this is thread 6
this is thread 7
this is thread 8
this is thread 9
的返回类型thread::spawn是JoinHandle。JoinHandle是一个拥有的值,并且调用其join方法需要等待线程完成。
调用join句柄会阻塞当前线程,直到句柄所代表的线程结束。阻塞线程意味着阻止其执行进一步的工作或退出。
线程和move闭包
我们可以使用move闭包将变量的所有权从主线程转移到闭包中:
use std::thread;
fn main() {
let v = vec![2, 4, 5];
// `move` transfers ownership of `v` into the closure
let thread = thread::spawn(move || {
println!("v is {:?}", v);
});
}
输出:
v is [2, 4, 5]
Rust 将变量的所有权v移至新线程。这确保变量在新线程中可以安全使用,同时也意味着主线程无法再使用v(例如,将其删除)。
如果move省略关键字,编译器将引发错误:
$ cargo run
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
--> src/main.rs:6:32
|
6 | let handle = thread::spawn(|| {
| ^^ may outlive borrowed value `v`
7 | println!("Here's a vector: {:?}", v);
| - `v` is borrowed here
Rust 的所有权规则再次帮助确保了内存安全!
消息传递
在 Rust 中,用于消息传递并发的一个主要工具是通道,这是标准库提供的概念。您可以将其想象成一条水道——一条河流或溪流。如果您在其中放置橡皮鸭或小船之类的东西,它会顺流而下流向接收者。
通道由两部分组成:发送器和接收器。当发送器或接收器断开时,通道即被视为关闭。
通道是通过标准库的 实现的std::sync::mpsc,代表多个生产者,单个消费者。
注:根据读者和作者的数量,频道可分为:
SPSC——单一生产者,单一消费者(只能使用原子)
SPMC——单一生产者,多个消费者(需要在消费者端锁定)
MPSC——多个生产者,单个消费者(需要在生产者端锁定)
MPMC——多个生产者,多个消费者
在线程间传递消息
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
// Move `tx` into the closure so the new thread owns it
thread::spawn(move || {
tx.send("hello").unwrap();
});
// `recv()` blocks the main thread until a value is received
let msg = rx.recv().unwrap();
println!("message is {}", msg);
}
输出:
message is hello
通道的接收端有两个有用的方法:recv和try_recv。
这里我们使用了,即接收 的recv缩写,它会阻塞主线程直到收到值。一旦发送了一个值,就会在 中返回它。如果发送器已关闭,它会返回一个错误,表示不会再有值到达。recvResult<T, E>
try_recv不会阻塞;相反,如果有数据可用,它会立即返回: ,否则,它会立即返回。Result<T, E>OkErr
如果新线程尚未执行完成,使用try_recv可能会导致运行时错误:
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send("hello").unwrap();
});
// `try_recv` returns immediately, so it might not receive the message in time
let msg = rx.try_recv().unwrap();
println!("message is {}", msg);
}
错误:
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Empty', ...
发送多个值并观察接收方等待
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// The `for` loop on `rx` implicitly waits for incoming values as an iterator
for received in rx {
println!("Got: {}", received);
}
}
示例输出(行间间隔 1 秒):
Got: hi
Got: from
Got: the
Got: thread
通过克隆发送者创建多个生产者
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Clone the sender `tx` to create a second producer
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap(); // Use the cloned sender
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap(); // Use the original sender
thread::sleep(Duration::from_secs(1));
}
});
// Both tx and tx1 send values to the same receiver rx
for received in rx {
println!("Got: {}", received);
}
}
示例输出(由于调度不同,因系统而异):
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
共享状态
共享状态或数据意味着多个线程同时访问同一内存位置。Rust 使用互斥锁 (mutexes)来实现共享内存并发原语。
互斥锁每次只允许一个线程访问数据
Mutex是互斥的缩写,表示在任意给定时间只有一个线程可以访问特定数据。要访问互斥中的数据,线程必须先获取锁。锁是一种数据结构,用于跟踪当前谁拥有独占访问权限。
使用标准库的std::sync::Mutex:
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 10; // Mutate the value inside the Mutex
println!("num is {}", num);
}
println!("m is {:?}", m);
}
输出:
num is 10
m is Mutex { data: 10 }
我们使用此lock方法来访问互斥锁中的数据。此调用会阻塞当前线程,直到获取锁为止。
AMutex是一个智能指针。更具体地说,lock返回一个MutexGuard,这是一个实现 的智能指针,Deref用于指向底层数据。它还实现了Drop,因此当MutexGuard超出范围时,锁会自动释放。
在线程之间共享互斥锁
当在多个线程之间共享数据时,需要多个所有者,我们使用Arc智能指针来包装Mutex。Arc是线程安全的;Rc不是并且不能在多线程上下文中安全使用。
下面是一个使用Arc包装的示例Mutex,允许跨线程共享所有权:
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter); // Clone the Arc before moving into the thread
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
输出:
Result: 10
总结一下:
Rc<T>+RefCell<T>通常用于单线程内部可变性Arc<T>+Mutex<T>用于多线程内部可变性
注意:Mutex仍然可能导致死锁。如果两个操作各自需要锁定两个资源,并且两个线程各自持有一个锁,等待另一个锁,则会发生这种情况 — — 从而导致循环等待。
基于Send和的线程安全Sync
在 Rust 中,与并发相关的工具是标准库的一部分,而不是语言本身。但是,语言中嵌入了两个并发概念:Send和Sync特征std::marker。
目的Send和Sync
Send和Sync是 Rust 中安全并发的核心。从技术上讲,它们是标记特征(不定义任何方法的特征),用于标记并发行为的类型:
实现的类型
Send可以在线程之间安全地转移其所有权。实现的类型
Sync可以通过引用在线程之间共享。
由此,我们可以推断:&T如果Send,T则Sync。
实现的类型Send和Sync
在 Rust 中,几乎所有类型都默认实现Send和Sync。这意味着对于复合类型(如结构体),如果其所有成员都是Send或Sync,则复合类型会自动继承这些特征。
但是如果即使一个成员不是Send或Sync,那么整个类型就不是。
总结一下:
实现的类型
Send可以在线程之间安全地转移所有权。实现的类型
Sync可以在线程之间安全地共享(通过引用)。Rust 中的绝大多数类型都是
Send和Sync。
未实现这些特征的常见类型包括:
原始指针
细胞
钢筋混凝土
可以为您自己的类型手动实现Send和Sync,但是:
您必须使用
unsafe代码必须手动确保线程安全
这很少有必要,并且应该非常谨慎地进行
笔记:
Cell并不是RefCell,Sync因为它们的核心实现(UnsafeCell)不是Sync。Rc都不是,Send也不是,Sync因为其内部的引用计数器不是线程安全的。原始指针不实现任何特性,因为它们不提供任何安全保障。
概括
Rust 提供了async/await和多线程并发模型。要有效地使用多线程模型,必须了解 Rust 的线程基础知识,包括:
线程创建
线程同步
线程安全
Rust 支持:
消息传递并发,其中
channel用于在线程之间传输数据共享状态并发,其中
Mutex和Arc用于跨线程共享和安全改变数据
类型系统和借用检查器确保这些模式不存在数据竞争和悬垂引用。
一旦代码编译完成,您就可以确信它将在多线程环境中正确运行,而不会出现其他语言中难以捉摸、难以调试的错误。
和特征为线程间安全地传输或共享数据提供了保证Send。Sync
总之:
线程模型:多线程程序必须处理竞争条件、死锁和难以重现的错误。
消息传递:使用通道在线程之间传输数据。
共享状态:
Mutex+Arc允许多个线程访问和改变相同的数据。线程安全:
Send特征Sync保证多线程上下文中数据传输和共享的安全性。

