并发编程
Rust 的所有权系统在编译时防止数据竞争,让并发编程更加安全。本章介绍 Rust 中的线程、消息传递和共享状态并发。
use std::thread;use std::time::Duration;
fn main() { let handle = thread::spawn(|| { for i in 1..10 { println!("子线程: {}", i); thread::sleep(Duration::from_millis(1)); } });
for i in 1..5 { println!("主线程: {}", i); thread::sleep(Duration::from_millis(1)); }
handle.join().unwrap(); // 等待子线程完成}move 闭包
Section titled “move 闭包”使用 move 将数据所有权转移到线程:
use std::thread;
fn main() { let v = vec![1, 2, 3];
let handle = thread::spawn(move || { println!("向量: {:?}", v); });
// println!("{:?}", v); // 错误!v 已被移动
handle.join().unwrap();}use std::thread;
fn main() { let mut handles = vec![];
for i in 0..10 { let handle = thread::spawn(move || { println!("线程 {} 正在运行", i); }); handles.push(handle); }
for handle in handles { handle.join().unwrap(); }}使用通道(channel)在线程间传递消息。
mpsc 通道
Section titled “mpsc 通道”mpsc = Multiple Producer, Single Consumer
use std::sync::mpsc;use std::thread;
fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let val = String::from("hello"); tx.send(val).unwrap(); // println!("{}", val); // 错误!val 已被移动 });
let received = rx.recv().unwrap(); println!("收到: {}", received);}use std::sync::mpsc;use std::thread;use std::time::Duration;
fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let vals = vec![ String::from("你好"), String::from("来自"), String::from("线程"), ];
for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_millis(200)); } });
// 将 rx 当作迭代器使用 for received in rx { println!("收到: {}", received); }}use std::sync::mpsc;use std::thread;use std::time::Duration;
fn main() { let (tx, rx) = mpsc::channel();
let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![String::from("hi"), String::from("from"), String::from("thread 1")]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_millis(200)); } });
thread::spawn(move || { let vals = vec![String::from("more"), String::from("from"), String::from("thread 2")]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_millis(200)); } });
for received in rx { println!("收到: {}", received); }}use std::sync::mpsc;use std::thread;use std::time::Duration;
fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { thread::sleep(Duration::from_secs(1)); tx.send("hello").unwrap(); });
loop { match rx.try_recv() { Ok(msg) => { println!("收到: {}", msg); break; } Err(mpsc::TryRecvError::Empty) => { println!("还没有消息,继续做其他事..."); thread::sleep(Duration::from_millis(200)); } Err(mpsc::TryRecvError::Disconnected) => { println!("发送端已关闭"); break; } } }}Mutex:互斥锁
Section titled “Mutex:互斥锁”use std::sync::Mutex;
fn main() { let m = Mutex::new(5);
{ let mut num = m.lock().unwrap(); *num = 6; } // 锁自动释放
println!("m = {:?}", m);}Arc<Mutex>:多线程共享
Section titled “Arc<Mutex>:多线程共享”use std::sync::{Arc, Mutex};use std::thread;
fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![];
for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); }
for handle in handles { handle.join().unwrap(); }
println!("结果: {}", *counter.lock().unwrap());}RwLock:读写锁
Section titled “RwLock:读写锁”适用于读多写少的场景:
use std::sync::{Arc, RwLock};use std::thread;
fn main() { let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// 多个读取线程 for i in 0..3 { let data = Arc::clone(&data); handles.push(thread::spawn(move || { let values = data.read().unwrap(); println!("读取线程 {}: {:?}", i, *values); })); }
// 一个写入线程 { let data = Arc::clone(&data); handles.push(thread::spawn(move || { let mut values = data.write().unwrap(); values.push(4); println!("写入线程: {:?}", *values); })); }
for handle in handles { handle.join().unwrap(); }}Send 和 Sync Trait
Section titled “Send 和 Sync Trait”实现 Send 的类型可以在线程间转移所有权。
大多数类型都实现了 Send,除了:
Rc<T>:不是线程安全的- 裸指针
实现 Sync 的类型可以安全地被多线程引用。
如果 &T 是 Send,则 T 是 Sync。
| 类型 | Send | Sync |
|---|---|---|
i32 | ✓ | ✓ |
String | ✓ | ✓ |
Rc<T> | ✗ | ✗ |
Arc<T> | ✓ | ✓ |
Mutex<T> | ✓ | ✓ |
RefCell<T> | ✓ | ✗ |
对于简单的数值操作,使用原子类型比 Mutex 更高效:
use std::sync::atomic::{AtomicUsize, Ordering};use std::sync::Arc;use std::thread;
fn main() { let counter = Arc::new(AtomicUsize::new(0)); let mut handles = vec![];
for _ in 0..10 { let counter = Arc::clone(&counter); handles.push(thread::spawn(move || { for _ in 0..1000 { counter.fetch_add(1, Ordering::SeqCst); } })); }
for handle in handles { handle.join().unwrap(); }
println!("结果: {}", counter.load(Ordering::SeqCst));}常见并发模式
Section titled “常见并发模式”生产者-消费者
Section titled “生产者-消费者”use std::sync::mpsc;use std::thread;use std::time::Duration;
fn main() { let (tx, rx) = mpsc::channel();
// 生产者 let producer = thread::spawn(move || { for i in 0..10 { println!("生产: {}", i); tx.send(i).unwrap(); thread::sleep(Duration::from_millis(100)); } });
// 消费者 let consumer = thread::spawn(move || { for received in rx { println!("消费: {}", received); thread::sleep(Duration::from_millis(150)); } });
producer.join().unwrap(); consumer.join().unwrap();}线程池(简化版)
Section titled “线程池(简化版)”use std::sync::{mpsc, Arc, Mutex};use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>,}
struct Worker { id: usize, thread: thread::JoinHandle<()>,}
impl ThreadPool { fn new(size: usize) -> ThreadPool { let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size { let receiver = Arc::clone(&receiver); let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv(); match job { Ok(job) => { println!("Worker {} 执行任务", id); job(); } Err(_) => { println!("Worker {} 停止", id); break; } } }); workers.push(Worker { id, thread }); }
ThreadPool { workers, sender } }
fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); }}
fn main() { let pool = ThreadPool::new(4);
for i in 0..8 { pool.execute(move || { println!("任务 {} 开始", i); thread::sleep(std::time::Duration::from_millis(100)); println!("任务 {} 完成", i); }); }
thread::sleep(std::time::Duration::from_secs(2));}use std::sync::{Arc, Mutex};use std::thread;
fn main() { let a = Arc::new(Mutex::new(0)); let b = Arc::new(Mutex::new(0));
let a1 = Arc::clone(&a); let b1 = Arc::clone(&b);
let handle1 = thread::spawn(move || { let _lock_a = a1.lock().unwrap(); thread::sleep(std::time::Duration::from_millis(100)); let _lock_b = b1.lock().unwrap(); // 等待 b });
let a2 = Arc::clone(&a); let b2 = Arc::clone(&b);
let handle2 = thread::spawn(move || { let _lock_b = b2.lock().unwrap(); thread::sleep(std::time::Duration::from_millis(100)); let _lock_a = a2.lock().unwrap(); // 等待 a -> 死锁! });
handle1.join().unwrap(); handle2.join().unwrap();}避免死锁的方法
Section titled “避免死锁的方法”- 固定锁顺序:总是以相同顺序获取锁
- 使用 try_lock:尝试获取锁,失败则放弃
- 减少锁粒度:减少持有锁的时间
- 使用消息传递代替共享状态
练习 1:多线程计数
Section titled “练习 1:多线程计数”创建 10 个线程,每个打印自己的编号:
use std::thread;
fn main() { let mut handles = vec![];
for i in 0..10 { // 创建线程并打印编号 }
for handle in handles { handle.join().unwrap(); }}练习 2:消息传递
Section titled “练习 2:消息传递”使用 channel 实现生产者-消费者模式:
use std::sync::mpsc;use std::thread;
fn main() { let (tx, rx) = mpsc::channel();
// 创建生产者线程,发送 1-5
// 主线程作为消费者,接收并打印
}练习 3:共享计数器
Section titled “练习 3:共享计数器”使用 Arc<Mutex<T>> 让多个线程安全地增加计数器:
use std::sync::{Arc, Mutex};use std::thread;
fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![];
for _ in 0..10 { // 每个线程增加计数器 100 次 }
for handle in handles { handle.join().unwrap(); }
println!("最终结果: {}", /* 获取计数器值 */); // 应该是 1000}练习 4:线程池(挑战)
Section titled “练习 4:线程池(挑战)”实现一个简单的线程池,包含:
- 固定数量的工作线程
- 任务队列
- execute 方法提交任务
掌握了并发编程后,下一章我们将通过一个实战项目综合运用所学知识。