30天拿下Rust之并发
============
💡 如果想阅读最新的文章,或者有技术问题需要交流和沟通,可搜索并关注微信公众号“希望睿智”。
概述
随着多核处理器和分布式系统的普及,并发编程成为了现代软件开发中不可或缺的一部分。然而,并发编程也是一项极具挑战性的任务,因为它涉及到数据共享、线程同步和竞态条件等复杂问题。在这些挑战面前,Rust以其独特的内存安全性和并发原语,为开发者提供了一个安全、高效且优雅的并发编程环境。
线程
线程是Rust中最基本的并发单元。在Rust中,可以使用std::thread::spawn函数来创建一个新的线程。这个函数接收一个闭包作为参数,这个闭包会在新线程中执行。通过使用std::thread::spawn函数,开发者可以轻松地创建新的线程来执行并发任务。这些线程在操作系统级别进行调度,可以实现真正的并行执行。
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个新线程
thread::spawn(|| {
println!("from a thread");
// 为了让线程运行足够长的时间,以便观察其输出,我们可以让线程休眠一段时间
thread::sleep(Duration::from_secs(1));
});
// 主线程继续执行其他任务
println!("from main thread");
// 等待子线程完成
thread::sleep(Duration::from_secs(2));
}
在上面的示例代码中,我们创建了一个新线程来打印一条消息,并让主线程继续执行。注意:我们没有等待新线程完成,所以主线程可能会在新线程之前或之后结束。
在实际应用中,我们可能需要使用join方法来确保线程结束。join方法是线程句柄的一个方法,用于阻塞当前线程(调用join的线程),直到被join的线程完成执行。换句话说,join方法会等待另一个线程结束。一旦被等待的线程结束,join方法就会立即返回。
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个新线程
let handle = thread::spawn(|| {
for i in 1..5 {
println!("from a thread: {}", i);
thread::sleep(Duration::from_secs(1));
}
});
// 主线程继续执行其他任务
println!("from main thread");
// 使用unwrap简化代码,等待子线程完成
handle.join().unwrap();
}
在上面的示例代码中,我们创建了一个新线程,每隔1秒钟打印一条消息,一共打印4次。在主线程中,我们打印了一条消息,然后使用join方法等待子线程结束。执行本程序后,输出大致如下。
from main thread
from a thread: 1
from a thread: 2
from a thread: 3
from a thread: 4
我们还可以通过闭包(匿名函数)向线程中传入参数。当使用std::thread::spawn创建一个新线程时,我们可以传递一个闭包作为参数,这个闭包可以捕获外部作用域中的变量,并将它们作为参数传递给新线程中执行的代码。
use std::thread;
use std::time::Duration;
fn main() {
let text: &str = "Hello World";
let thread_id = thread::current().id();
let handle = thread::spawn(move || {
// 在这里,message 和 thread_id 是从外部作用域捕获的变量
println!("{}", text);
println!("{:?}", thread_id);
// 线程也可以有自己的局部变量
let text_local = "Hello Github";
println!("{}", text_local);
thread::sleep(Duration::from_secs(1));
});
// 主线程继续执行其他任务
println!("from main thread");
// 等待子线程完成
handle.join().unwrap();
}
在上面的示例代码中,我们定义了一个字符串text和一个线程 ID thread_id,并将它们作为闭包的捕获变。move关键字用于确保这些变量被移动到闭包中,这样它们就可以在新线程中使用了。注意:如果没有move,这些变量可能会被借用,而Rust的借用规则不允许在多个线程中同时借用同一个变量。闭包中的代码会在新线程中执行,并且可以访问从外部作用域捕获的变量text和thread_id。同时,新线程也可以有自己的局部变量,比如:text_local。
通过这种方式,我们可以向线程中传入任意数量的参数,只要它们能够被安全地移动到闭包中即可。这包括:基本数据类型、复杂的数据结构,甚至是其他线程句柄或同步原语。注意:当向线程中传入引用类型的参数时(比如:在堆上分配的数据的引用),我们需要确保这些引用在线程执行期间仍然有效,否则可能会出现悬挂引用或数据竞争的问题。在大多数情况下,使用值的移动而不是引用是更安全的选择。
通道
在Rust中,通道是用于在不同线程之间进行通信的一种机制。它们由两个端点组成:一个发送端和一个接收端。发送端用于向通道发送消息,而接收端用于从通道接收消息。这种通信方式在并发编程中非常有用,允许线程之间安全地传递数据。
Rust标准库提供了两种主要类型的通道:std::sync::mpsc和crossbeam_channel。
std::sync::mpsc提供的是多生产者单消费者(Multiple Producer Single Consumer)通道,这意味着多个发送者可以向一个接收者发送数据。这种通道在std::sync模块中定义,适用于传统的同步线程间通信场景。
use std::thread;
use std::sync::mpsc;
fn main() {
// 创建一个新的通道
let (tx, rx) = mpsc::channel();
// 在新线程中发送数据
thread::spawn(move || {
tx.send(66).unwrap();
});
let received_value = rx.recv().unwrap();
// 主线程接收数据,输出:66
println!("{}", received_value);
}
在上面的示例代码中,我们首先通过调用mpsc::channel()方法创建了一个通道。这个通道返回一个发送端tx和一个接收端rx。发送端用于发送数据,而接收端用于接收数据。接着,我们使用thread::spawn来创建一个新的线程。这个新线程会执行传递给它的闭包。在闭包内部,我们调用tx.send(66)来发送一个整数66到通道。unwrap()用于处理可能的错误,但在实际代码中,应该更优雅地处理错误。最后,在主线程中,我们调用rx.recv()来从通道接收数据。同样,我们使用unwrap()来处理可能的错误。
注意:Rust中通道的recv方法是阻塞的。当调用rx.recv()时,如果通道中没有可用的数据,接收者线程将会阻塞,直到有数据可用为止。这种阻塞行为确保了数据按照发送的顺序被接收,并且只有在数据实际可用时,接收者才会继续执行。为了避免阻塞,Rust还提供了其他方法,比如:try_recv、recv_timeout。try_recv方法尝试立即返回一个挂起的值,而不会阻塞调用线程。如果没有可用的数据,它将返回一个错误。recv_timeout方法则尝试在指定的超时时间内等待一个值。如果超时时间内没有接收到数据,它将返回一个错误。这些方法提供了更多的灵活性,可以根据具体的需求选择使用。
crossbeam_channel是Rust中一个流行的并发通道库,它提供了高效、无锁的通道实现,用于在并发任务之间传递消息。关于这个库的具体使用,我们会在后续其他专栏中专门介绍,这里就不再赘述了。
互斥锁
在Rust中,互斥锁是用于同步访问共享资源的机制,确保在任意时刻只有一个线程可以访问特定的数据。Rust标准库中的互斥锁可以通过std::sync::Mutex类型来实现,其中T是被保护的数据类型。当一个线程获取到互斥锁时,其他尝试获取该锁的线程会被阻塞,直到持有锁的线程释放它。
use std::sync::{Mutex, Arc};
use std::thread;
use std::time::Duration;
fn main() {
let counter = Arc::new(Mutex::new(0));
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
thread::spawn(move || {
let mut num = counter_clone.lock().unwrap();
*num += 1;
});
}
thread::sleep(Duration::from_secs(1));
let final_count = (*counter.lock().unwrap()) as usize;
println!("{}", final_count);
}
在上面的示例代码中,Arc(原子引用计数)用于跨线程安全地共享Mutex包装的计数器实例,而lock()方法用于获取互斥锁并返回一个MutexGuard,它是对内部数据的一个可变引用。当MutexGuard超出作用域时,互斥锁会自动释放,允许其他线程获取锁并访问共享资源。unwrap()用于在获取锁失败时引发panic,但在实际应用中通常会使用更稳健的错误处理方式。
读写锁
读写锁是一种更为精细的同步原语,它允许多个读取者同时访问共享资源,但同一时间内只允许一个写入者访问,以此来提高并发性能。相比于互斥锁,读写锁在读取操作密集且写入操作较少的情况下能提供更好的并发性能。
在Rust标准库中,读写锁由std::sync::RwLock类型实现,其中T是被保护的数据类型。
use std::sync::{RwLock, Arc};
use std::thread;
fn main() {
// 创建一个被读写锁保护的整数
let shared_data = Arc::new(RwLock::new(66));
// 创建读取线程
let reader_threads = (0..5).map(|_| {
let shared_data_clone = Arc::clone(&shared_data);
thread::spawn(move || {
// 获取读锁
let data = shared_data_clone.read().unwrap();
println!("read: {}", *data);
// 读取完成后,读锁会自动释放
})
});
// 创建写入线程
let writer_thread = {
let shared_data_write = Arc::clone(&shared_data);
thread::spawn(move || {
// 获取写锁
let mut data = shared_data_write.write().unwrap();
*data += 1;
println!("write data: {}", *data);
// 写入完成后,写锁会自动释放
})
};
// 等待所有读取线程完成
for reader in reader_threads {
reader.join().unwrap();
}
// 等待写入线程完成
writer_thread.join().unwrap();
// 读取已更新的数据
let final_data = shared_data.read().unwrap();
println!("{}", *final_data);
}
在上面的示例代码中,我们首先创建了一个被Arc
接着,我们使用Arc::clone()创建shared_data的克隆引用,这样每个读取线程都能拥有独立的引用,并且它们指向同一个受保护的数据。然后,使用thread::spawn()创建5个读取线程,每个线程内部获取读锁,这会阻塞线程直到获得读锁。当read方法返回的RwLockReadGuard超出作用域时,读锁会自动释放。
同样的,我们使用Arc::clone()创建写入线程所需的shared_data_write引用。然后,使用thread::spawn创建一个写入线程,线程内部获取写锁,这会阻塞线程直到获得写锁。递增被锁定的整数后,我们打印更新后的数据。当write方法返回的RwLockWriteGuard超出作用域时,写锁会自动释放。
最后,我们等待所有读取线程和写入线程完成,并从共享数据中读取已更新的整数值进行了打印输出。
总结
Rust以其强大的内存安全性和丰富的并发原语,为开发者提供了一个安全、高效且优雅的并发编程环境。通过合理利用Rust的并发特性,开发者可以编写出高性能、高可靠性的并发应用程序,满足现代软件开发的需求。然而,并发编程仍然是一项具有挑战性的任务,需要开发者具备深厚的编程经验和良好的设计思维。
原文链接: https://juejin.cn/post/7367236229291540506
文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17068.html