Rust Concurrency Programming
Overview
Rust provides safe and efficient concurrency programming support. Through the ownership system and type system, Rust can prevent data races and other concurrency issues at compile time. This chapter will learn about threads, message passing, shared state, and other concurrency programming concepts in Rust.
🧵 Thread Basics
Creating and Managing Threads
rust
use std::thread;
use std::time::Duration;
fn basic_threading() {
// Create new thread
let handle = thread::spawn(|| {
for i in 1..10 {
println!("Child thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
// Main thread continues execution
for i in 1..5 {
println!("Main thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
// Wait for child thread to complete
handle.join().unwrap();
println!("All threads finished");
}Transferring Data Between Threads
rust
fn thread_data_transfer() {
let v = vec![1, 2, 3];
// Use move to transfer ownership
let handle = thread::spawn(move || {
println!("Vector: {:?}", v);
});
handle.join().unwrap();
// println!("v: {:?}", v); // Compile error! v has been moved
}
fn multiple_threads() {
let mut handles = vec![];
for i in 0..10 {
let handle = thread::spawn(move || {
let thread_id = thread::current().id();
println!("Thread {} (ID: {:?}): Computing {} * 2 = {}", i, thread_id, i, i * 2);
thread::sleep(Duration::from_millis(100));
i * 2 // Return value
});
handles.push(handle);
}
// Collect all results
let results: Vec<i32> = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect();
println!("All results: {:?}", results);
}📨 Message Passing
Basic Channels
rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn basic_channels() {
// Create channel
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("Hello");
tx.send(val).unwrap();
// println!("val: {}", val); // Compile error! val has been moved
});
// Receive message
let received = rx.recv().unwrap();
println!("Received message: {}", received);
}
fn multiple_messages() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("Hello"),
String::from("from"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// Iterate to receive messages
for received in rx {
println!("Received: {}", received);
}
}Multiple Producers Single Consumer
rust
fn multiple_producers() {
let (tx, rx) = mpsc::channel();
// Clone sender
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("Thread 1: Message 1"),
String::from("Thread 1: Message 2"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("Thread 2: Message 1"),
String::from("Thread 2: Message 2"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// Receive all messages
for received in rx {
println!("Received: {}", received);
}
}Async Channels
rust
use std::sync::mpsc::{sync_channel, TryRecvError};
fn sync_channels() {
// Sync channel, capacity 0 (blocking)
let (tx, rx) = sync_channel(0);
let handle = thread::spawn(move || {
println!("Before sending message");
tx.send(1).unwrap(); // Blocks until receiver is ready
println!("After sending message");
});
thread::sleep(Duration::from_secs(2));
println!("Ready to receive");
let msg = rx.recv().unwrap();
println!("Received: {}", msg);
handle.join().unwrap();
}
fn non_blocking_receive() {
let (tx, rx) = mpsc::channel();
// Non-blocking receive
match rx.try_recv() {
Ok(msg) => println!("Received: {}", msg),
Err(TryRecvError::Empty) => println!("Channel is empty"),
Err(TryRecvError::Disconnected) => println!("Channel disconnected"),
}
// Send message
tx.send("Hello").unwrap();
// Now can receive
match rx.try_recv() {
Ok(msg) => println!("Received: {}", msg),
Err(TryRecvError::Empty) => println!("Channel is empty"),
Err(TryRecvError::Disconnected) => println!("Channel disconnected"),
}
}🔒 Shared State
Mutex
rust
use std::sync::{Arc, Mutex};
use std::thread;
fn basic_mutex() {
// Mutex protects data
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
} // Lock is released here
println!("m = {:?}", m);
}
fn shared_mutex() {
// Use Arc to share Mutex across multiple threads
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final count: {}", *counter.lock().unwrap());
}RwLock
rust
use std::sync::RwLock;
use std::collections::HashMap;
fn rwlock_example() {
let data = Arc::new(RwLock::new(HashMap::new()));
let mut handles = vec![];
// Threads writing data
for i in 0..5 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let mut map = data.write().unwrap();
map.insert(i, i * i);
println!("Write: {} -> {}", i, i * i);
});
handles.push(handle);
}
// Threads reading data
for i in 0..10 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
let map = data.read().unwrap();
if let Some(value) = map.get(&(i % 5)) {
println!("Read: {} -> {}", i % 5, value);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}Atomic Types
rust
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
fn atomic_operations() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..1000 {
counter.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Atomic counter: {}", counter.load(Ordering::SeqCst));
}
fn compare_and_swap() {
let value = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for i in 0..10 {
let value = Arc::clone(&value);
let handle = thread::spawn(move || {
let expected = i;
let new_value = i + 10;
match value.compare_exchange_weak(
expected,
new_value,
Ordering::SeqCst,
Ordering::SeqCst
) {
Ok(prev) => println!("Thread {} successfully updated: {} -> {}", i, prev, new_value),
Err(current) => println!("Thread {} update failed, current value: {}", i, current),
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final value: {}", value.load(Ordering::SeqCst));
}🔄 Concurrency Patterns
Producer-Consumer Pattern
rust
use std::sync::{Arc, Condvar, Mutex};
use std::collections::VecDeque;
struct Buffer<T> {
queue: Mutex<VecDeque<T>>,
not_empty: Condvar,
not_full: Condvar,
capacity: usize,
}
impl<T> Buffer<T> {
fn new(capacity: usize) -> Self {
Self {
queue: Mutex::new(VecDeque::new()),
not_empty: Condvar::new(),
not_full: Condvar::new(),
capacity,
}
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
// Wait for buffer to have space
while queue.len() >= self.capacity {
queue = self.not_full.wait(queue).unwrap();
}
queue.push_back(item);
self.not_empty.notify_one();
}
fn pop(&self) -> T {
let mut queue = self.queue.lock().unwrap();
// Wait for buffer to have data
while queue.is_empty() {
queue = self.not_empty.wait(queue).unwrap();
}
let item = queue.pop_front().unwrap();
self.not_full.notify_one();
item
}
}
fn producer_consumer_pattern() {
let buffer = Arc::new(Buffer::new(5));
// Producer
let buffer_producer = Arc::clone(&buffer);
let producer = thread::spawn(move || {
for i in 0..10 {
println!("Produce: {}", i);
buffer_producer.push(i);
thread::sleep(Duration::from_millis(100));
}
});
// Consumer
let buffer_consumer = Arc::clone(&buffer);
let consumer = thread::spawn(move || {
for _ in 0..10 {
let item = buffer_consumer.pop();
println!("Consume: {}", item);
thread::sleep(Duration::from_millis(150));
}
});
producer.join().unwrap();
consumer.join().unwrap();
}Work Stealing Pattern
rust
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
struct WorkStealing<T> {
local_queue: Mutex<VecDeque<T>>,
global_queue: Arc<Mutex<VecDeque<T>>>,
}
impl<T> WorkStealing<T> {
fn new(global_queue: Arc<Mutex<VecDeque<T>>>) -> Self {
Self {
local_queue: Mutex::new(VecDeque::new()),
global_queue,
}
}
fn push_local(&self, item: T) {
let mut queue = self.local_queue.lock().unwrap();
queue.push_back(item);
}
fn pop_local(&self) -> Option<T> {
let mut queue = self.local_queue.lock().unwrap();
queue.pop_back()
}
fn steal_work(&self) -> Option<T> {
// Try local queue first
if let Some(item) = self.pop_local() {
return Some(item);
}
// Then try global queue
let mut global = self.global_queue.lock().unwrap();
global.pop_front()
}
}
fn work_stealing_pattern() {
let global_queue = Arc::new(Mutex::new(VecDeque::new()));
// Add some work
{
let mut queue = global_queue.lock().unwrap();
for i in 0..20 {
queue.push_back(i);
}
}
let mut handles = vec![];
// Create worker threads
for thread_id in 0..4 {
let global_queue = Arc::clone(&global_queue);
let worker = WorkStealing::new(global_queue);
let handle = thread::spawn(move || {
let mut processed = 0;
while let Some(work) = worker.steal_work() {
println!("Thread {} processing work: {}", thread_id, work);
thread::sleep(Duration::from_millis(10));
processed += 1;
}
println!("Thread {} finished, processed {} tasks", thread_id, processed);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}⚡ Concurrency Performance Optimization
Thread Pool
rust
use std::sync::{mpsc, Arc, Mutex};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} executing task", id);
job();
});
Worker { id, thread }
}
}
fn thread_pool_example() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {} started", i);
thread::sleep(Duration::from_secs(1));
println!("Task {} completed", i);
});
}
thread::sleep(Duration::from_secs(3));
}Lock-Free Data Structures
rust
use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;
pub struct LockFreeStack<T> {
head: AtomicPtr<Node<T>>,
}
struct Node<T> {
data: T,
next: *mut Node<T>,
}
impl<T> LockFreeStack<T> {
pub fn new() -> Self {
Self {
head: AtomicPtr::new(ptr::null_mut()),
}
}
pub fn push(&self, data: T) {
let new_node = Box::into_raw(Box::new(Node {
data,
next: ptr::null_mut(),
}));
loop {
let head = self.head.load(Ordering::Acquire);
unsafe {
(*new_node).next = head;
}
match self.head.compare_exchange_weak(
head,
new_node,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(_) => continue,
}
}
}
pub fn pop(&self) -> Option<T> {
loop {
let head = self.head.load(Ordering::Acquire);
if head.is_null() {
return None;
}
let next = unsafe { (*head).next };
match self.head.compare_exchange_weak(
head,
next,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
let data = unsafe { Box::from_raw(head).data };
return Some(data);
}
Err(_) => continue,
}
}
}
}
unsafe impl<T: Send> Send for LockFreeStack<T> {}
unsafe impl<T: Send> Sync for LockFreeStack<T> {}
fn lock_free_example() {
let stack = Arc::new(LockFreeStack::new());
let mut handles = vec![];
// Push data
for i in 0..10 {
let stack = Arc::clone(&stack);
let handle = thread::spawn(move || {
stack.push(i);
println!("Push: {}", i);
});
handles.push(handle);
}
// Pop data
for _ in 0..10 {
let stack = Arc::clone(&stack);
let handle = thread::spawn(move || {
if let Some(value) = stack.pop() {
println!("Pop: {}", value);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}📝 Chapter Summary
Through this chapter, you should have mastered:
Basic Concurrency
- ✅ Creating and managing threads
- ✅ Message passing mechanisms (Channel)
- ✅ Shared state synchronization (Mutex, RwLock)
- ✅ Usage of atomic operations
Advanced Concurrency Patterns
- ✅ Producer-consumer pattern
- ✅ Work stealing pattern
- ✅ Thread pool implementation
- ✅ Lock-free data structures
Performance and Safety
- ✅ Rust's memory safety guarantees
- ✅ Prevention of data races
- ✅ Concurrency performance optimization techniques
- ✅ Choosing appropriate concurrency primitives
Best Practices
- Prefer message passing
- Use shared state cautiously
- Leverage the type system for safety
- Choose synchronization primitives based on the scenario
Continue Learning: Next Chapter - Rust Macros