Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Synchronization Primitives

Lock-free counter with AtomicUsize

std-badge cat-concurrency-badge

For a simple shared counter, an AtomicUsize avoids the overhead of a Mutex entirely. Atomic operations such as AtomicUsize::fetch_add complete as a single indivisible step, so concurrent increments never lose updates and no lock is required.

Every atomic operation takes an Ordering that controls how the compiler and CPU may reorder surrounding memory accesses. Ordering::Relaxed guarantees the atomicity of the operation itself but imposes no ordering relative to other memory operations — perfect for a standalone counter. Ordering::SeqCst (sequentially consistent) is the strongest, establishing a single global order across all SeqCst operations; reach for it when an atomic guards access to other data and you need those accesses to be visible in a predictable order.

This counter needs neither. The increments are Ordering::Relaxed because only their atomicity matters, and the final read is Relaxed too: JoinHandle::join already establishes a happens-before edge, so once every thread has joined the load is guaranteed to observe all of their increments. Using SeqCst for that load would only suggest an ordering requirement that does not exist.

use anyhow::{anyhow, Result};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;

fn main() -> Result<()> {
    let counter = Arc::new(AtomicUsize::new(0));
    let mut handles = Vec::new();

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(thread::spawn(move || {
            for _ in 0..1000 {
                // Relaxed is enough: we only care that the count is exact.
                counter.fetch_add(1, Ordering::Relaxed);
            }
        }));
    }

    for handle in handles {
        handle.join().map_err(|_| anyhow!("thread panicked"))?;
    }

    // join() above synchronizes the threads, so a Relaxed load sees the total.
    let total = counter.load(Ordering::Relaxed);
    println!("total: {}", total);
    assert_eq!(total, 10_000);
    Ok(())
}

Guard compound state with Arc<Mutex<T>>

std-badge cat-concurrency-badge

When the shared value is more than a single atomic-sized field — a struct whose fields must stay consistent with one another — a Mutex is the right tool. Arc gives every thread shared ownership of the same value; Mutex ensures only one thread mutates it at a time. For a plain integer counter, prefer AtomicUsize (above); reach for Arc<Mutex<T>> once an update touches several fields that must change together.

Each thread acquires the lock through Mutex::lock, updates both fields of the shared Stats, and releases the lock when the returned MutexGuard drops. Holding one lock across the whole update is what keeps count and total from ever being observed out of sync — something separate atomics could not guarantee. A poisoned lock (a thread panicked while holding it) surfaces as an error rather than a panic. When the shared state grows into its own long-lived component, the Actor Pattern is the usual scale-up path.

use anyhow::{anyhow, Result};
use std::sync::{Arc, Mutex};
use std::thread;

#[derive(Debug, Default)]
struct Stats {
    count: u64,
    total: u64,
}

fn main() -> Result<()> {
    let stats = Arc::new(Mutex::new(Stats::default()));
    let mut handles = Vec::new();

    for value in 1..=10 {
        let stats = Arc::clone(&stats);
        let handle = thread::spawn(move || -> Result<()> {
            let mut stats = stats.lock().map_err(|_| anyhow!("mutex poisoned"))?;
            // Both fields update under one lock, so count and total can
            // never be observed out of step with each other.
            stats.count += 1;
            stats.total += value;
            Ok(())
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().map_err(|_| anyhow!("thread panicked"))??;
    }

    let stats = stats.lock().map_err(|_| anyhow!("mutex poisoned"))?;
    println!("{:?}", *stats);
    assert_eq!(stats.count, 10);
    assert_eq!(stats.total, 55);
    Ok(())
}

Concurrent reads with Arc<RwLock<T>>

std-badge cat-concurrency-badge

When data is read often and written rarely, RwLock is a better fit than Mutex. It allows any number of concurrent readers or a single exclusive writer. Readers acquire a shared lock with RwLock::read; a writer acquires an exclusive lock with RwLock::write that blocks until every reader has released its lock.

RwLock gives no fairness or priority guarantee, and the policy is platform-dependent. On some targets a steady stream of readers can keep the lock held continuously and starve a waiting writer, so reach for RwLock only when reads genuinely dominate; under heavy contention a plain Mutex (or a dedicated fair lock) can be the safer choice.

This recipe spawns several reader threads that observe the shared vector simultaneously, plus one writer thread that appends to it under an exclusive lock.

use anyhow::{anyhow, Result};
use std::sync::{Arc, RwLock};
use std::thread;

fn main() -> Result<()> {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));
    let mut handles = Vec::new();

    for id in 0..3 {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || -> Result<()> {
            let reader = data.read().map_err(|_| anyhow!("lock poisoned"))?;
            println!("reader {} sees {:?}", id, *reader);
            Ok(())
        }));
    }

    let writer_data = Arc::clone(&data);
    handles.push(thread::spawn(move || -> Result<()> {
        let mut writer = writer_data.write().map_err(|_| anyhow!("lock poisoned"))?;
        writer.push(4);
        Ok(())
    }));

    for handle in handles {
        handle.join().map_err(|_| anyhow!("thread panicked"))??;
    }

    let final_state = data.read().map_err(|_| anyhow!("lock poisoned"))?;
    println!("final: {:?}", *final_state);
    Ok(())
}

Communicate between threads with mpsc channels

std-badge cat-concurrency-badge

A mpsc (multiple producer, single consumer) channel moves values from one or more producer threads to a single consumer. channel returns a Sender and a Receiver; producers push work with Sender::send. The “multiple producer” half comes from cloning the Sender — every clone feeds the same Receiver.

The idiomatic way to consume a channel is to iterate the Receiver: the loop blocks for each value and ends cleanly once every Sender (including clones) has been dropped. That drop is the channel’s natural shutdown signal, so the original Sender is dropped explicitly after the producers are spawned.

use anyhow::{anyhow, Result};
use std::sync::mpsc;
use std::thread;

fn main() -> Result<()> {
    let (tx, rx) = mpsc::channel();

    // One Sender clone per producer thread; all feed the single Receiver.
    let producers: Vec<_> = (0..3)
        .map(|id| {
            let tx = tx.clone();
            thread::spawn(move || -> Result<()> {
                for item in 0..3 {
                    tx.send((id, item)).map_err(|e| anyhow!("send failed: {e}"))?;
                }
                Ok(())
            })
        })
        .collect();

    // Drop the original Sender so the channel closes once the clones do;
    // otherwise the loop below would block forever waiting for more senders.
    drop(tx);

    // Blocking, idiomatic consumption: iterate until every Sender is gone.
    let mut received = 0;
    for (id, item) in rx {
        println!("producer {id} sent {item}");
        received += 1;
    }

    for producer in producers {
        producer.join().map_err(|_| anyhow!("producer panicked"))??;
    }
    assert_eq!(received, 9);
    Ok(())
}

For the common “block, but wake up periodically to do maintenance” case, reach for Receiver::recv_timeout rather than polling. Save Receiver::try_recv for the situation it is actually built for: a consumer that runs its own loop and only wants to fold in whatever has arrived, without ever stalling on the channel. The loop below models that — each iteration does a unit of its own work and then drains any messages that happen to be ready. TryRecvError::Empty means “nothing queued, carry on”; TryRecvError::Disconnected means every Sender is gone.

use anyhow::{anyhow, Result};
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;

fn main() -> Result<()> {
    let (tx, rx) = mpsc::channel();

    let producer = thread::spawn(move || -> Result<()> {
        for command in ["load", "render", "save"] {
            tx.send(command).map_err(|e| anyhow!("send failed: {e}"))?;
            thread::sleep(Duration::from_millis(10));
        }
        Ok(())
    });

    let mut frame = 0u32;
    loop {
        // The consumer's own work — this is why try_recv beats a blocking
        // recv() here: the loop must keep ticking even with no input.
        frame += 1;
        thread::sleep(Duration::from_millis(3)); // stand-in for real work

        match rx.try_recv() {
            Ok(command) => println!("frame {frame}: handling {command}"),
            Err(TryRecvError::Empty) => continue,
            Err(TryRecvError::Disconnected) => break,
        }
    }

    producer.join().map_err(|_| anyhow!("producer panicked"))??;
    println!("ran {frame} frames");
    Ok(())
}

Coordinate thread phases with Barrier

std-badge cat-concurrency-badge

A Barrier synchronizes a fixed number of threads at a common point. Each thread calls Barrier::wait; the call blocks until exactly that many threads have reached it, then releases them all together. This guarantees every thread finishes phase 1 before any thread begins phase 2.

The barrier is constructed for three threads, so each of the three workers blocks in Barrier::wait until its peers catch up.

use anyhow::{anyhow, Result};
use std::sync::{Arc, Barrier};
use std::thread;

fn main() -> Result<()> {
    let barrier = Arc::new(Barrier::new(3));
    let mut handles = Vec::new();

    for id in 0..3 {
        let barrier = Arc::clone(&barrier);
        handles.push(thread::spawn(move || {
            println!("thread {} finished phase 1", id);
            // No thread proceeds until all three have arrived.
            barrier.wait();
            println!("thread {} starting phase 2", id);
        }));
    }

    for handle in handles {
        handle.join().map_err(|_| anyhow!("thread panicked"))?;
    }
    Ok(())
}

Signal a waiting thread with Condvar

std-badge cat-concurrency-badge

A Condvar (condition variable) lets a thread sleep until another thread signals that some shared state has changed, without busy-waiting. It is always paired with a Mutex that protects the state being watched.

The waiting thread locks the mutex, then loops while the condition is false, releasing the lock and sleeping inside Condvar::wait. The signaling thread locks the same mutex, updates the state, drops the guard, and calls Condvar::notify_one to wake the waiter. The while loop guards against spurious wakeups: the waiter re-checks the condition every time it wakes.

Dropping the guard before notifying is a small but worthwhile habit: if the lock is still held when Condvar::notify_one fires, the woken thread wakes only to block again immediately on re-acquiring the mutex. Releasing first lets it proceed straight away. The result is correct either way — only the hand-off is tidier.

use anyhow::{anyhow, Result};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

fn main() -> Result<()> {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));

    let signaler = Arc::clone(&pair);
    let worker = thread::spawn(move || -> Result<()> {
        let (lock, cvar) = &*signaler;
        let mut ready = lock.lock().map_err(|_| anyhow!("mutex poisoned"))?;
        *ready = true;
        // Release the lock before signaling so the woken thread can proceed
        // immediately instead of blocking to re-acquire it.
        drop(ready);
        cvar.notify_one();
        Ok(())
    });

    let (lock, cvar) = &*pair;
    let mut ready = lock.lock().map_err(|_| anyhow!("mutex poisoned"))?;
    while !*ready {
        ready = cvar.wait(ready).map_err(|_| anyhow!("mutex poisoned"))?;
    }
    println!("worker signaled readiness");

    worker.join().map_err(|_| anyhow!("worker panicked"))??;
    Ok(())
}