Skip to content

Rust Concurrency Programming

Overview

Rust provides safe and efficient concurrency programming support. Through the ownership system and type system, Rust can prevent data races and other concurrency issues at compile time. This chapter will learn about threads, message passing, shared state, and other concurrency programming concepts in Rust.

🧵 Thread Basics

Creating and Managing Threads

rust
use std::thread;
use std::time::Duration;

fn basic_threading() {
    // Create new thread
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("Child thread: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    // Main thread continues execution
    for i in 1..5 {
        println!("Main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }

    // Wait for child thread to complete
    handle.join().unwrap();
    println!("All threads finished");
}

Transferring Data Between Threads

rust
fn thread_data_transfer() {
    let v = vec![1, 2, 3];

    // Use move to transfer ownership
    let handle = thread::spawn(move || {
        println!("Vector: {:?}", v);
    });

    handle.join().unwrap();

    // println!("v: {:?}", v); // Compile error! v has been moved
}

fn multiple_threads() {
    let mut handles = vec![];

    for i in 0..10 {
        let handle = thread::spawn(move || {
            let thread_id = thread::current().id();
            println!("Thread {} (ID: {:?}): Computing {} * 2 = {}", i, thread_id, i, i * 2);
            thread::sleep(Duration::from_millis(100));
            i * 2 // Return value
        });
        handles.push(handle);
    }

    // Collect all results
    let results: Vec<i32> = handles
        .into_iter()
        .map(|handle| handle.join().unwrap())
        .collect();

    println!("All results: {:?}", results);
}

📨 Message Passing

Basic Channels

rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn basic_channels() {
    // Create channel
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("Hello");
        tx.send(val).unwrap();
        // println!("val: {}", val); // Compile error! val has been moved
    });

    // Receive message
    let received = rx.recv().unwrap();
    println!("Received message: {}", received);
}

fn multiple_messages() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("Hello"),
            String::from("from"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // Iterate to receive messages
    for received in rx {
        println!("Received: {}", received);
    }
}

Multiple Producers Single Consumer

rust
fn multiple_producers() {
    let (tx, rx) = mpsc::channel();

    // Clone sender
    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("Thread 1: Message 1"),
            String::from("Thread 1: Message 2"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("Thread 2: Message 1"),
            String::from("Thread 2: Message 2"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // Receive all messages
    for received in rx {
        println!("Received: {}", received);
    }
}

Async Channels

rust
use std::sync::mpsc::{sync_channel, TryRecvError};

fn sync_channels() {
    // Sync channel, capacity 0 (blocking)
    let (tx, rx) = sync_channel(0);

    let handle = thread::spawn(move || {
        println!("Before sending message");
        tx.send(1).unwrap(); // Blocks until receiver is ready
        println!("After sending message");
    });

    thread::sleep(Duration::from_secs(2));
    println!("Ready to receive");
    let msg = rx.recv().unwrap();
    println!("Received: {}", msg);

    handle.join().unwrap();
}

fn non_blocking_receive() {
    let (tx, rx) = mpsc::channel();

    // Non-blocking receive
    match rx.try_recv() {
        Ok(msg) => println!("Received: {}", msg),
        Err(TryRecvError::Empty) => println!("Channel is empty"),
        Err(TryRecvError::Disconnected) => println!("Channel disconnected"),
    }

    // Send message
    tx.send("Hello").unwrap();

    // Now can receive
    match rx.try_recv() {
        Ok(msg) => println!("Received: {}", msg),
        Err(TryRecvError::Empty) => println!("Channel is empty"),
        Err(TryRecvError::Disconnected) => println!("Channel disconnected"),
    }
}

🔒 Shared State

Mutex

rust
use std::sync::{Arc, Mutex};
use std::thread;

fn basic_mutex() {
    // Mutex protects data
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    } // Lock is released here

    println!("m = {:?}", m);
}

fn shared_mutex() {
    // Use Arc to share Mutex across multiple threads
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Final count: {}", *counter.lock().unwrap());
}

RwLock

rust
use std::sync::RwLock;
use std::collections::HashMap;

fn rwlock_example() {
    let data = Arc::new(RwLock::new(HashMap::new()));
    let mut handles = vec![];

    // Threads writing data
    for i in 0..5 {
        let data = Arc::clone(&data);
        let handle = thread::spawn(move || {
            let mut map = data.write().unwrap();
            map.insert(i, i * i);
            println!("Write: {} -> {}", i, i * i);
        });
        handles.push(handle);
    }

    // Threads reading data
    for i in 0..10 {
        let data = Arc::clone(&data);
        let handle = thread::spawn(move || {
            let map = data.read().unwrap();
            if let Some(value) = map.get(&(i % 5)) {
                println!("Read: {} -> {}", i % 5, value);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

Atomic Types

rust
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

fn atomic_operations() {
    let counter = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            for _ in 0..1000 {
                counter.fetch_add(1, Ordering::SeqCst);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Atomic counter: {}", counter.load(Ordering::SeqCst));
}

fn compare_and_swap() {
    let value = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];

    for i in 0..10 {
        let value = Arc::clone(&value);
        let handle = thread::spawn(move || {
            let expected = i;
            let new_value = i + 10;

            match value.compare_exchange_weak(
                expected,
                new_value,
                Ordering::SeqCst,
                Ordering::SeqCst
            ) {
                Ok(prev) => println!("Thread {} successfully updated: {} -> {}", i, prev, new_value),
                Err(current) => println!("Thread {} update failed, current value: {}", i, current),
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Final value: {}", value.load(Ordering::SeqCst));
}

🔄 Concurrency Patterns

Producer-Consumer Pattern

rust
use std::sync::{Arc, Condvar, Mutex};
use std::collections::VecDeque;

struct Buffer<T> {
    queue: Mutex<VecDeque<T>>,
    not_empty: Condvar,
    not_full: Condvar,
    capacity: usize,
}

impl<T> Buffer<T> {
    fn new(capacity: usize) -> Self {
        Self {
            queue: Mutex::new(VecDeque::new()),
            not_empty: Condvar::new(),
            not_full: Condvar::new(),
            capacity,
        }
    }

    fn push(&self, item: T) {
        let mut queue = self.queue.lock().unwrap();

        // Wait for buffer to have space
        while queue.len() >= self.capacity {
            queue = self.not_full.wait(queue).unwrap();
        }

        queue.push_back(item);
        self.not_empty.notify_one();
    }

    fn pop(&self) -> T {
        let mut queue = self.queue.lock().unwrap();

        // Wait for buffer to have data
        while queue.is_empty() {
            queue = self.not_empty.wait(queue).unwrap();
        }

        let item = queue.pop_front().unwrap();
        self.not_full.notify_one();
        item
    }
}

fn producer_consumer_pattern() {
    let buffer = Arc::new(Buffer::new(5));

    // Producer
    let buffer_producer = Arc::clone(&buffer);
    let producer = thread::spawn(move || {
        for i in 0..10 {
            println!("Produce: {}", i);
            buffer_producer.push(i);
            thread::sleep(Duration::from_millis(100));
        }
    });

    // Consumer
    let buffer_consumer = Arc::clone(&buffer);
    let consumer = thread::spawn(move || {
        for _ in 0..10 {
            let item = buffer_consumer.pop();
            println!("Consume: {}", item);
            thread::sleep(Duration::from_millis(150));
        }
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

Work Stealing Pattern

rust
use std::sync::{Arc, Mutex};
use std::collections::VecDeque;

struct WorkStealing<T> {
    local_queue: Mutex<VecDeque<T>>,
    global_queue: Arc<Mutex<VecDeque<T>>>,
}

impl<T> WorkStealing<T> {
    fn new(global_queue: Arc<Mutex<VecDeque<T>>>) -> Self {
        Self {
            local_queue: Mutex::new(VecDeque::new()),
            global_queue,
        }
    }

    fn push_local(&self, item: T) {
        let mut queue = self.local_queue.lock().unwrap();
        queue.push_back(item);
    }

    fn pop_local(&self) -> Option<T> {
        let mut queue = self.local_queue.lock().unwrap();
        queue.pop_back()
    }

    fn steal_work(&self) -> Option<T> {
        // Try local queue first
        if let Some(item) = self.pop_local() {
            return Some(item);
        }

        // Then try global queue
        let mut global = self.global_queue.lock().unwrap();
        global.pop_front()
    }
}

fn work_stealing_pattern() {
    let global_queue = Arc::new(Mutex::new(VecDeque::new()));

    // Add some work
    {
        let mut queue = global_queue.lock().unwrap();
        for i in 0..20 {
            queue.push_back(i);
        }
    }

    let mut handles = vec![];

    // Create worker threads
    for thread_id in 0..4 {
        let global_queue = Arc::clone(&global_queue);
        let worker = WorkStealing::new(global_queue);

        let handle = thread::spawn(move || {
            let mut processed = 0;

            while let Some(work) = worker.steal_work() {
                println!("Thread {} processing work: {}", thread_id, work);
                thread::sleep(Duration::from_millis(10));
                processed += 1;
            }

            println!("Thread {} finished, processed {} tasks", thread_id, processed);
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

⚡ Concurrency Performance Optimization

Thread Pool

rust
use std::sync::{mpsc, Arc, Mutex};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
            println!("Worker {} executing task", id);
            job();
        });

        Worker { id, thread }
    }
}

fn thread_pool_example() {
    let pool = ThreadPool::new(4);

    for i in 0..8 {
        pool.execute(move || {
            println!("Task {} started", i);
            thread::sleep(Duration::from_secs(1));
            println!("Task {} completed", i);
        });
    }

    thread::sleep(Duration::from_secs(3));
}

Lock-Free Data Structures

rust
use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;

pub struct LockFreeStack<T> {
    head: AtomicPtr<Node<T>>,
}

struct Node<T> {
    data: T,
    next: *mut Node<T>,
}

impl<T> LockFreeStack<T> {
    pub fn new() -> Self {
        Self {
            head: AtomicPtr::new(ptr::null_mut()),
        }
    }

    pub fn push(&self, data: T) {
        let new_node = Box::into_raw(Box::new(Node {
            data,
            next: ptr::null_mut(),
        }));

        loop {
            let head = self.head.load(Ordering::Acquire);
            unsafe {
                (*new_node).next = head;
            }

            match self.head.compare_exchange_weak(
                head,
                new_node,
                Ordering::Release,
                Ordering::Relaxed,
            ) {
                Ok(_) => break,
                Err(_) => continue,
            }
        }
    }

    pub fn pop(&self) -> Option<T> {
        loop {
            let head = self.head.load(Ordering::Acquire);
            if head.is_null() {
                return None;
            }

            let next = unsafe { (*head).next };

            match self.head.compare_exchange_weak(
                head,
                next,
                Ordering::Release,
                Ordering::Relaxed,
            ) {
                Ok(_) => {
                    let data = unsafe { Box::from_raw(head).data };
                    return Some(data);
                }
                Err(_) => continue,
            }
        }
    }
}

unsafe impl<T: Send> Send for LockFreeStack<T> {}
unsafe impl<T: Send> Sync for LockFreeStack<T> {}

fn lock_free_example() {
    let stack = Arc::new(LockFreeStack::new());
    let mut handles = vec![];

    // Push data
    for i in 0..10 {
        let stack = Arc::clone(&stack);
        let handle = thread::spawn(move || {
            stack.push(i);
            println!("Push: {}", i);
        });
        handles.push(handle);
    }

    // Pop data
    for _ in 0..10 {
        let stack = Arc::clone(&stack);
        let handle = thread::spawn(move || {
            if let Some(value) = stack.pop() {
                println!("Pop: {}", value);
            }
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

📝 Chapter Summary

Through this chapter, you should have mastered:

Basic Concurrency

  • ✅ Creating and managing threads
  • ✅ Message passing mechanisms (Channel)
  • ✅ Shared state synchronization (Mutex, RwLock)
  • ✅ Usage of atomic operations

Advanced Concurrency Patterns

  • ✅ Producer-consumer pattern
  • ✅ Work stealing pattern
  • ✅ Thread pool implementation
  • ✅ Lock-free data structures

Performance and Safety

  • ✅ Rust's memory safety guarantees
  • ✅ Prevention of data races
  • ✅ Concurrency performance optimization techniques
  • ✅ Choosing appropriate concurrency primitives

Best Practices

  1. Prefer message passing
  2. Use shared state cautiously
  3. Leverage the type system for safety
  4. Choose synchronization primitives based on the scenario

Continue Learning: Next Chapter - Rust Macros

Content is for learning and research only.