XingPiaoLiang's

Back

Fearless Concurrency#

Fearless concurrency allows as to write code that is free of subtle bugs and is easy to refactor without introducing new bugs.

本篇将包括以下四个主要内容:

  • How to create threads in Rust
  • Message-passing concurrency
  • Shared-state concurrency
  • Sync and Send Trait

Use Thread In Rust#

不同于 Java Go 等语言,在 Rust 中,在 Rust 的标准库中,其语言线程模型和 OS 是 1:1 的关系,==也就是说 Rust 标准库中一条线程,直接对应的就是 OS 本身的一条线程==。(当然在 Rust’s async system 以及其他的一些 crates 中,都对其他的线程模型做了对应的实现和更改)

spawn#

spawn 是标准库下 thread 模块中一个关联函数(associated function) 使用它可以创建一个新的线程,通过闭包传递实现。

pub fn main() -> Result<(), Error> {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_millis(1));
        }
    });
    for i in 1..10 {
        println!("hi number {i} from the parent thread!");
        thread::sleep(Duration::from_millis(1));
    }
    Ok(())
}
rust

JoinHandle#

使用 Join 等待所有线程执行完毕之后,该(主)线程才随之关闭。

fn main() -> Result<(), Error> {
    let handle: JoinHandle<()> = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {i} from the spawned thread!");
            thread::sleep(Duration::from_secs(1));
        }
    });
    for i in 1..10 {
        println!("hi number {i} from the parent thread!");
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap_or_default();
    Ok(())
}
rust

Always try to use move in spawn closure#

pub fn main() -> Result<()> {
	let v = vec![1, 2, 3];
	let handle = thread::spawn(move || {
		println!("{v:?}");
	} )
	handle.join().unwrap();
	Ok(())
}
rust

在创建线程的过程中,我们经常使用 move 关键字,将主线程中的对象的 ownership 转移到闭包中去。因为 Rust 并不知道子线程需要运行多久,所以向子线程中借用对象,使用对象的引用是不被允许的。

Channel ( Message-Passing )#

std::sync::mpsc::channel 标准库中的 channel 定义在 mpsc 模块之下。mpsc 意为 Mutible Producer Single Consumer,也就是说这样的 Channel 是拥有多个生产者和只有一个消费者。

use std::sync::mpsc::channel;
fn main() -> Result<(), Error> {
    let (tx, rx) = channel::<i32>(); // 这里必须显式声明为传递的类型

    thread::spawn(move || {
        for i in 1..10 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    for i in rx {
        println!("Got {i:?}")
    }
    Ok(())
}
rust

使用 channel() 得到的是一个元组 (Sender<T>, Receiver<T>)。

  • rx.recv() 阻塞当前线程,等待生产者发送消息。
  • rx.try_recv() 直接返回一个 Result<T, Error> 并不会直接阻塞当前线程。
  • 上述代码中直接将 rx 当作一个迭代器,使用 for 循环进行遍历。

同时,也可以通过 clone 生产者,在不同的线程中使用同一个 channel:

    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
rust

Mutex ( Shared-State Concurrency)#

Do not communicate by sharing memory.

#Arc #Rc

Rust 同样提供了较为安全的 Mutex 互斥锁机制。使用方法如下:

fn main() {
    let m = std::sync::Mutex::new(5);

	{
		let mut num = m.lock().unwrap();
		*num += 1;
	}
	println!("{num:?}")
}
rust

在多个线程中共享一个 Mutex。那么问题来了,在上面提到,在创建新线程时,我们一定需要将新线程所需要的对象的所有权 move 到新线程中去。那么多个线程如何能够共享一个 Mutex?[[智能指针]],就是答案。

在平常场景中,我们使用 Rc<T> 来达到将一个对象的所有权传递给多个拥有者的目的。但是在多线程情况下,Rc<T> 就不能使用了,取而代之的是 Arc<T> 这里的 A 代表着 Atomic,显而易见,该智能指针就是为了多线程场景而创造的。他们俩的 API 完全一致,可以无缝切换。

use std::sync::{Arc, Mutex}; 
use std::thread;
fn main() -> Result<(), Error> {
    let counter = Arc::new(Mutex::new(0));
    let mut handles: Vec<JoinHandle<()>> = vec![];

    for _ in 0..10 {
        let cnt = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = cnt.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for h in handles {
        h.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());

    Ok(())
}
rust

RefCell<T> Rc<T> Mutex<T> Arc<T>#

在上面的示例中,counter 并不是一个可变的对象,但是我们却可以改变它的值,这是因为 Mutex<T> 提供了 Interior mutability 和 Cell 类对象类似。 我们可以通过 Mutex<T> 来更改 Arc<T> 中的值,正如我们可以通过 RefCell<T> 更改 Rc<T> 中的值。

Sync & Send#

Sync 以及 Send Trait 都定义在 std::marker 模块中,这两个 trait 是 Rust 中并发编程的基本.

  • Send所有实现了 Send 的对象,他们的所有权都可以在不同的线程之间进行传递,几乎所有 Rust 中的内置类型都是 Send 但也有例外,比如上面提到的 Rc<T> 就并没有实现 Send
  • Sync:任何类型 T 只要 &T 实现了 Send 那么 T 就实现了 Sync这意味着该类型的引用可以安全地在不同的线程之间进行传递,所有 Cell<T> 家族的类型都没有实现 Sync
[Rust] Fearless Concurrency
https://astro-pure.js.org/blog/rust-concurrency
Author erasernoob
Published at July 15, 2025
Comment seems to stuck. Try to refresh?✨