大数跨境

Rust 并发:无畏并发

Rust 并发:无畏并发 索引目录
2025-03-27
2
导读:并发程序是运行多个任务(或看起来如此)的程序,这意味着两个或多个任务在重叠的时间跨度内交替执行。

并发程序是运行多个任务(或看起来如此)的程序,这意味着两个或多个任务在重叠的时间跨度内交替执行。这些任务由线程(最小的处理单位)执行。在幕后,这不是真正的多任务处理(并行处理),而是以人类无法察觉的速度在线程之间快速切换上下文。许多现代应用程序都依赖于这种幻觉;例如,服务器可以在等待其他请求的同时处理一个请求。当线程共享数据时,可能会出现许多问题,最常见的是:竞争条件死锁

Rust 的所有权系统类型安全系统是解决内存安全和并发问题的强大工具。通过所有权和类型检查,大多数错误都是在编译时而不是运行时捕获的。这允许开发人员在开发期间而不是部署到生产环境后修复代码。一旦代码编译完成,您就可以相信它在多线程环境中安全运行,而不会出现其他语言中常见的难以追踪的错误。这就是 Rust 所说的无畏并发

多线程模型

多线程编程的风险

在大多数现代操作系统中,运行程序代码在操作系统管理的进程内执行。在程序中,还可以有多个独立执行的组件,称为线程

将程序计算拆分为多个线程可以提高性能,因为程序可以同时处理多个任务。但是,这也增加了复杂性。由于线程并发运行,因此无法保证不同线程中代码的执行顺序。这可能导致以下问题:

  • 竞争条件,即多个线程以不一致的顺序访问数据或资源

  • 死锁,两个线程互相等待对方释放各自持有的资源,从而阻止进一步的进展

  • 仅在特定情况下发生且难以重现或持续修复的错误

编程语言有多种实现线程的方法。许多操作系统都提供 API 来创建新线程。当一种语言使用 OS API 创建线程时,这通常被称为1:1 模型,其中一个 OS 线程对应一个语言级线程。

Rust 的标准库只提供了1:1 线程模型

使用以下方式创建新线程spawn

use std::thread;
use std::time::Duration;

fn main() {
let thread = thread::spawn(|| {
for i in 1..10 {
println!("this is thread {}", i);
thread::sleep(Duration::from_millis(1));
}
});

for k in 1..5 {
println!("this is main {}", k);
thread::sleep(Duration::from_millis(1));
}
}

输出:

this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5

我们看到,在主线程完成 5 次循环迭代并退出后,新创建的线程(尽管设计为 10 次迭代)只能执行 5 次迭代然后退出。当主线程结束时,新线程也会结束,无论它是否已完成。

如果我们希望新线程在主线程继续之前完成,我们可以使用JoinHandle

use std::thread;
use std::time::Duration;

fn main() {
let handler = thread::spawn(|| {
for i in 1..10 {
println!("this is thread {}", i);
thread::sleep(Duration::from_millis(1));
}
});

for k in 1..5 {
println!("this is main {}", k);
thread::sleep(Duration::from_millis(1));
}

handler.join().unwrap(); // Block the main thread until the new thread finishes
}

输出:

this is main 1
this is thread 1
this is main 2
this is thread 2
this is main 3
this is thread 3
this is main 4
this is thread 4
this is thread 5
this is thread 6
this is thread 7
this is thread 8
this is thread 9

的返回类型thread::spawnJoinHandleJoinHandle是一个拥有的值,并且调用其join方法需要等待线程完成。

调用join句柄会阻塞当前线程,直到句柄所代表的线程结束。阻塞线程意味着阻止其执行进一步的工作或退出。

线程和move闭包

我们可以使用move闭包将变量的所有权从主线程转移到闭包中:

use std::thread;

fn main() {
let v = vec![2, 4, 5];
// `move` transfers ownership of `v` into the closure
let thread = thread::spawn(move || {
println!("v is {:?}", v);
});
}

输出:

v is [2, 4, 5]

Rust 将变量的所有权v移至新线程。这确保变量在新线程中可以安全使用,同时也意味着主线程无法再使用v(例如,将其删除)。

如果move省略关键字,编译器将引发错误:

$ cargo run
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
--> src/main.rs:6:32
|
6 | let handle = thread::spawn(|| {
| ^^ may outlive borrowed value `v`
7 | println!("Here's a vector: {:?}", v);
| - `v` is borrowed here

Rust 的所有权规则再次帮助确保了内存安全!

消息传递

在 Rust 中,用于消息传递并发的一个主要工具是通道,这是标准库提供的概念。您可以将其想象成一条水道——一条河流或溪流。如果您在其中放置橡皮鸭或小船之类的东西,它会顺流而下流向接收者。

通道由两部分组成:发送器接收器。当发送器或接收器断开时,通道即被视为关闭

通道是通过标准库的 实现的std::sync::mpsc,代表多个生产者,单个消费者

注:根据读者和作者的数量,频道可分为:

  • SPSC——单一生产者,单一消费者(只能使用原子)

  • SPMC——单一生产者,多个消费者(需要在消费者端锁定)

  • MPSC——多个生产者,单个消费者(需要在生产者端锁定)

  • MPMC——多个生产者,多个消费者

在线程间传递消息

use std::thread;
use std::sync::mpsc;

fn main() {
let (tx, rx) = mpsc::channel();
// Move `tx` into the closure so the new thread owns it
thread::spawn(move || {
tx.send("hello").unwrap();
});

// `recv()` blocks the main thread until a value is received
let msg = rx.recv().unwrap();
println!("message is {}", msg);
}

输出:

message is hello

通道的接收端有两个有用的方法:recvtry_recv

这里我们使用了,即接收 的recv缩写,它会阻塞主线程直到收到值。一旦发送了一个值,就会在 中返回它。如果发送器已关闭,它会返回一个错误,表示不会再有值到达。recvResult<T, E>

try_recv不会阻塞;相反,如果有数据可用,它会立即返回: ,否则,它会立即返回。Result<T, E>OkErr

如果新线程尚未执行完成,使用try_recv可能会导致运行时错误:

use std::thread;
use std::sync::mpsc;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
tx.send("hello").unwrap();
});

// `try_recv` returns immediately, so it might not receive the message in time
let msg = rx.try_recv().unwrap();
println!("message is {}", msg);
}

错误:

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Empty', ...

发送多个值并观察接收方等待

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

// The `for` loop on `rx` implicitly waits for incoming values as an iterator
for received in rx {
println!("Got: {}", received);
}
}

示例输出(行间间隔 1 秒):

Got: hi
Got: from
Got: the
Got: thread

通过克隆发送者创建多个生产者

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
let (tx, rx) = mpsc::channel();
// Clone the sender `tx` to create a second producer
let tx1 = tx.clone();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx1.send(val).unwrap(); // Use the cloned sender
thread::sleep(Duration::from_secs(1));
}
});

thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];

for val in vals {
tx.send(val).unwrap(); // Use the original sender
thread::sleep(Duration::from_secs(1));
}
});

// Both tx and tx1 send values to the same receiver rx
for received in rx {
println!("Got: {}", received);
}
}

示例输出(由于调度不同,因系统而异):

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

共享状态

共享状态或数据意味着多个线程同时访问同一内存位置。Rust 使用互斥锁 (mutexes)来实现共享内存并发原语。

互斥锁每次只允许一个线程访问数据

Mutex是互斥的缩写,表示在任意给定时间只有一个线程可以访问特定数据。要访问互斥中的数据,线程必须先获取锁。锁是一种数据结构,用于跟踪当前谁拥有独占访问权限。

使用标准库的std::sync::Mutex

use std::sync::Mutex;

fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 10; // Mutate the value inside the Mutex
println!("num is {}", num);
}
println!("m is {:?}", m);
}

输出:

num is 10
m is Mutex { data: 10 }

我们使用此lock方法来访问互斥锁中的数据。此调用会阻塞当前线程,直到获取锁为止。

AMutex是一个智能指针。更具体地说,lock返回一个MutexGuard,这是一个实现 的智能指针,Deref用于指向底层数据。它还实现了Drop,因此当MutexGuard超出范围时,锁会自动释放。

在线程之间共享互斥锁

当在多个线程之间共享数据时,需要多个所有者,我们使用Arc智能指针来包装MutexArc是线程安全的;Rc不是并且不能在多线程上下文中安全使用。

下面是一个使用Arc包装的示例Mutex,允许跨线程共享所有权:

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter); // Clone the Arc before moving into the thread
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

println!("Result: {}", *counter.lock().unwrap());
}

输出:

Result: 10

总结一下:

  • Rc<T>+RefCell<T>通常用于单线程内部可变性

  • Arc<T>+Mutex<T>用于多线程内部可变性

注意:Mutex仍然可能导致死锁。如果两个操作各自需要锁定两个资源,并且两个线程各自持有一个锁,等待另一个锁,则会发生这种情况 — — 从而导致循环等待。

基于Send和的线程安全Sync

在 Rust 中,与并发相关的工具是标准库的一部分,而不是语言本身。但是,语言中嵌入了两个并发概念:SendSync特征std::marker

目的SendSync

SendSync是 Rust 中安全并发的核心。从技术上讲,它们是标记特征(不定义任何方法的特征),用于标记并发行为的类型:

  • 实现的类型Send可以在线程之间安全地转移其所有权。

  • 实现的类型Sync可以通过引用在线程之间共享

由此,我们可以推断:&T如果SendTSync

实现的类型SendSync

在 Rust 中,几乎所有类型都默认实现SendSync。这意味着对于复合类型(如结构体),如果其所有成员都是SendSync,则复合类型会自动继承这些特征。

但是如果即使一个成员不是SendSync,那么整个类型就不是。

总结一下:

  • 实现的类型Send可以在线程之间安全地转移所有权

  • 实现的类型Sync可以在线程之间安全地共享(通过引用)。

  • Rust 中的绝大多数类型都是SendSync

实现这些特征的常见类型包括:

  • 原始指针

  • 细胞

  • 钢筋混凝土

可以您自己的类型手动实现SendSync,但是:

  • 必须使用unsafe代码

  • 必须手动确保线程安全

  • 很少有必要,并且应该非常谨慎地进行

笔记:

  • Cell并不是RefCellSync因为它们的核心实现(UnsafeCell)不是Sync

  • Rc都不是,Send也不是,Sync因为其内部的引用计数器不是线程安全的

  • 原始指针不实现任何特性,因为它们不提供任何安全保障

概括

Rust 提供了async/await多线程并发模型。要有效地使用多线程模型,必须了解 Rust 的线程基础知识,包括:

  • 线程创建

  • 线程同步

  • 线程安全

Rust 支持:

  • 消息传递并发,其中channel用于在线程之间传输数据

  • 共享状态并发,其中MutexArc用于跨线程共享和安全改变数据

类型系统借用检查器确保这些模式不存在数据竞争和悬垂引用。

一旦代码编译完成,您就可以确信它将在多线程环境中正确运行,而不会出现其他语言中难以捉摸、难以调试的错误。

和特征为线程间安全地传输或共享数据提供了保证SendSync

总之:

  • 线程模型:多线程程序必须处理竞争条件、死锁和难以重现的错误。

  • 消息传递:使用通道在线程之间传输数据。

  • 共享状态Mutex+Arc允许多个线程访问和改变相同的数据。

  • 线程安全Send特征Sync保证多线程上下文中数据传输和共享的安全性。


【声明】内容源于网络
0
0
索引目录
索引目录是一家专注于医疗、技术开发、物联网应用等领域的创新型公司。我们致力于为客户提供高质量的服务和解决方案,推动技术与行业发展。
内容 444
粉丝 0
索引目录 索引目录是一家专注于医疗、技术开发、物联网应用等领域的创新型公司。我们致力于为客户提供高质量的服务和解决方案,推动技术与行业发展。
总阅读544
粉丝0
内容444