[Rust] Fearless Concurrency
introduce the basic concurrency programming in std rust
Fearless Concurrency#
Fearless concurrency allows as to write code that is free of subtle bugs and is easy to refactor without introducing new bugs.
本篇将包括以下四个主要内容:
- How to create threads in Rust
- Message-passing concurrency
- Shared-state concurrency
Sync
andSend
Trait
Use Thread In Rust#
不同于 Java Go 等语言,在 Rust 中,在 Rust 的标准库中,其语言线程模型和 OS 是 1:1 的关系,==也就是说 Rust 标准库中一条线程,直接对应的就是 OS 本身的一条线程==。(当然在 Rust’s async system 以及其他的一些 crates 中,都对其他的线程模型做了对应的实现和更改)
spawn#
spawn
是标准库下 thread
模块中一个关联函数(associated function) 使用它可以创建一个新的线程,通过闭包传递实现。
pub fn main() -> Result<(), Error> {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {i} from the spawned thread!");
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..10 {
println!("hi number {i} from the parent thread!");
thread::sleep(Duration::from_millis(1));
}
Ok(())
}
rustJoinHandle#
使用 Join
等待所有线程执行完毕之后,该(主)线程才随之关闭。
fn main() -> Result<(), Error> {
let handle: JoinHandle<()> = thread::spawn(|| {
for i in 1..10 {
println!("hi number {i} from the spawned thread!");
thread::sleep(Duration::from_secs(1));
}
});
for i in 1..10 {
println!("hi number {i} from the parent thread!");
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap_or_default();
Ok(())
}
rustAlways try to use move in spawn closure#
pub fn main() -> Result<()> {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("{v:?}");
} )
handle.join().unwrap();
Ok(())
}
rust在创建线程的过程中,我们经常使用 move
关键字,将主线程中的对象的 ownership 转移到闭包中去。因为 Rust 并不知道子线程需要运行多久,所以向子线程中借用对象,使用对象的引用是不被允许的。
Channel ( Message-Passing )#
std::sync::mpsc::channel
标准库中的 channel
定义在 mpsc
模块之下。mpsc
意为 Mutible Producer Single Consumer,也就是说这样的 Channel 是拥有多个生产者和只有一个消费者。
use std::sync::mpsc::channel;
fn main() -> Result<(), Error> {
let (tx, rx) = channel::<i32>(); // 这里必须显式声明为传递的类型
thread::spawn(move || {
for i in 1..10 {
tx.send(i).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for i in rx {
println!("Got {i:?}")
}
Ok(())
}
rust使用 channel()
得到的是一个元组 (Sender<T>, Receiver<T>
)。
rx.recv()
阻塞当前线程,等待生产者发送消息。rx.try_recv()
直接返回一个Result<T, Error>
并不会直接阻塞当前线程。- 上述代码中直接将
rx
当作一个迭代器,使用for
循环进行遍历。
同时,也可以通过 clone
生产者,在不同的线程中使用同一个 channel:
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
rustMutex ( Shared-State Concurrency)#
Do not communicate by sharing memory.
#Arc #Rc
Rust 同样提供了较为安全的 Mutex 互斥锁机制。使用方法如下:
fn main() {
let m = std::sync::Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num += 1;
}
println!("{num:?}")
}
rust在多个线程中共享一个 Mutex。那么问题来了,在上面提到,在创建新线程时,我们一定需要将新线程所需要的对象的所有权 move 到新线程中去。那么多个线程如何能够共享一个 Mutex?[[智能指针]],就是答案。
在平常场景中,我们使用 Rc<T>
来达到将一个对象的所有权传递给多个拥有者的目的。但是在多线程情况下,Rc<T>
就不能使用了,取而代之的是 Arc<T>
这里的 A
代表着 Atomic
,显而易见,该智能指针就是为了多线程场景而创造的。他们俩的 API 完全一致,可以无缝切换。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() -> Result<(), Error> {
let counter = Arc::new(Mutex::new(0));
let mut handles: Vec<JoinHandle<()>> = vec![];
for _ in 0..10 {
let cnt = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = cnt.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for h in handles {
h.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
Ok(())
}
rustRefCell<T>
Rc<T>
Mutex<T>
Arc<T>
#
在上面的示例中,counter
并不是一个可变的对象,但是我们却可以改变它的值,这是因为 Mutex<T>
提供了 Interior mutability 和 Cell
类对象类似。
我们可以通过 Mutex<T>
来更改 Arc<T>
中的值,正如我们可以通过 RefCell<T>
更改 Rc<T>
中的值。
Sync & Send#
Sync
以及 Send
Trait 都定义在 std::marker
模块中,这两个 trait 是 Rust 中并发编程的基本.
Send
:所有实现了Send
的对象,他们的所有权都可以在不同的线程之间进行传递,几乎所有 Rust 中的内置类型都是Send
但也有例外,比如上面提到的Rc<T>
就并没有实现Send
Sync
:任何类型T
只要&T
实现了Send
那么T
就实现了Sync
,这意味着该类型的引用可以安全地在不同的线程之间进行传递,所有Cell<T>
家族的类型都没有实现Sync