Skip to content

并发编程

Rust 的所有权系统在编译时防止数据竞争,让并发编程更加安全。本章介绍 Rust 中的线程、消息传递和共享状态并发。

use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("子线程: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("主线程: {}", i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap(); // 等待子线程完成
}

使用 move 将数据所有权转移到线程:

use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("向量: {:?}", v);
});
// println!("{:?}", v); // 错误!v 已被移动
handle.join().unwrap();
}
use std::thread;
fn main() {
let mut handles = vec![];
for i in 0..10 {
let handle = thread::spawn(move || {
println!("线程 {} 正在运行", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}

使用通道(channel)在线程间传递消息。

mpsc = Multiple Producer, Single Consumer

use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hello");
tx.send(val).unwrap();
// println!("{}", val); // 错误!val 已被移动
});
let received = rx.recv().unwrap();
println!("收到: {}", received);
}
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("你好"),
String::from("来自"),
String::from("线程"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
// 将 rx 当作迭代器使用
for received in rx {
println!("收到: {}", received);
}
}
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![String::from("hi"), String::from("from"), String::from("thread 1")];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
thread::spawn(move || {
let vals = vec![String::from("more"), String::from("from"), String::from("thread 2")];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
for received in rx {
println!("收到: {}", received);
}
}
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
tx.send("hello").unwrap();
});
loop {
match rx.try_recv() {
Ok(msg) => {
println!("收到: {}", msg);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("还没有消息,继续做其他事...");
thread::sleep(Duration::from_millis(200));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("发送端已关闭");
break;
}
}
}
}
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
} // 锁自动释放
println!("m = {:?}", m);
}
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", *counter.lock().unwrap());
}

适用于读多写少的场景:

use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// 多个读取线程
for i in 0..3 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let values = data.read().unwrap();
println!("读取线程 {}: {:?}", i, *values);
}));
}
// 一个写入线程
{
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut values = data.write().unwrap();
values.push(4);
println!("写入线程: {:?}", *values);
}));
}
for handle in handles {
handle.join().unwrap();
}
}

实现 Send 的类型可以在线程间转移所有权。

大多数类型都实现了 Send,除了:

  • Rc<T>:不是线程安全的
  • 裸指针

实现 Sync 的类型可以安全地被多线程引用。

如果 &TSend,则 TSync

类型SendSync
i32
String
Rc<T>
Arc<T>
Mutex<T>
RefCell<T>

对于简单的数值操作,使用原子类型比 Mutex 更高效:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
counter.fetch_add(1, Ordering::SeqCst);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("结果: {}", counter.load(Ordering::SeqCst));
}
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 生产者
let producer = thread::spawn(move || {
for i in 0..10 {
println!("生产: {}", i);
tx.send(i).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// 消费者
let consumer = thread::spawn(move || {
for received in rx {
println!("消费: {}", received);
thread::sleep(Duration::from_millis(150));
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
let receiver = Arc::clone(&receiver);
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => {
println!("Worker {} 执行任务", id);
job();
}
Err(_) => {
println!("Worker {} 停止", id);
break;
}
}
});
workers.push(Worker { id, thread });
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("任务 {} 开始", i);
thread::sleep(std::time::Duration::from_millis(100));
println!("任务 {} 完成", i);
});
}
thread::sleep(std::time::Duration::from_secs(2));
}
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let a = Arc::new(Mutex::new(0));
let b = Arc::new(Mutex::new(0));
let a1 = Arc::clone(&a);
let b1 = Arc::clone(&b);
let handle1 = thread::spawn(move || {
let _lock_a = a1.lock().unwrap();
thread::sleep(std::time::Duration::from_millis(100));
let _lock_b = b1.lock().unwrap(); // 等待 b
});
let a2 = Arc::clone(&a);
let b2 = Arc::clone(&b);
let handle2 = thread::spawn(move || {
let _lock_b = b2.lock().unwrap();
thread::sleep(std::time::Duration::from_millis(100));
let _lock_a = a2.lock().unwrap(); // 等待 a -> 死锁!
});
handle1.join().unwrap();
handle2.join().unwrap();
}
  1. 固定锁顺序:总是以相同顺序获取锁
  2. 使用 try_lock:尝试获取锁,失败则放弃
  3. 减少锁粒度:减少持有锁的时间
  4. 使用消息传递代替共享状态

创建 10 个线程,每个打印自己的编号:

use std::thread;
fn main() {
let mut handles = vec![];
for i in 0..10 {
// 创建线程并打印编号
}
for handle in handles {
handle.join().unwrap();
}
}

使用 channel 实现生产者-消费者模式:

use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 创建生产者线程,发送 1-5
// 主线程作为消费者,接收并打印
}

使用 Arc<Mutex<T>> 让多个线程安全地增加计数器:

use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
// 每个线程增加计数器 100 次
}
for handle in handles {
handle.join().unwrap();
}
println!("最终结果: {}", /* 获取计数器值 */); // 应该是 1000
}

实现一个简单的线程池,包含:

  • 固定数量的工作线程
  • 任务队列
  • execute 方法提交任务

掌握了并发编程后,下一章我们将通过一个实战项目综合运用所学知识。