Synchronization Primitives
Lock-free counter with AtomicUsize
For a simple shared counter, an AtomicUsize avoids the overhead of a
Mutex entirely. Atomic operations such as AtomicUsize::fetch_add complete
as a single indivisible step, so concurrent increments never lose updates and no
lock is required.
Every atomic operation takes an Ordering that controls how the compiler and
CPU may reorder surrounding memory accesses. Ordering::Relaxed guarantees the
atomicity of the operation itself but imposes no ordering relative to other
memory operations — perfect for a standalone counter. Ordering::SeqCst
(sequentially consistent) is the strongest, establishing a single global order
across all SeqCst operations; reach for it when an atomic guards access to
other data and you need those accesses to be visible in a predictable order.
This counter needs neither. The increments are Ordering::Relaxed because only
their atomicity matters, and the final read is Relaxed too: JoinHandle::join
already establishes a happens-before edge, so once every thread has joined the
load is guaranteed to observe all of their increments. Using SeqCst for that
load would only suggest an ordering requirement that does not exist.
use anyhow::{anyhow, Result};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
fn main() -> Result<()> {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
// Relaxed is enough: we only care that the count is exact.
counter.fetch_add(1, Ordering::Relaxed);
}
}));
}
for handle in handles {
handle.join().map_err(|_| anyhow!("thread panicked"))?;
}
// join() above synchronizes the threads, so a Relaxed load sees the total.
let total = counter.load(Ordering::Relaxed);
println!("total: {}", total);
assert_eq!(total, 10_000);
Ok(())
}
Guard compound state with Arc<Mutex<T>>
When the shared value is more than a single atomic-sized field — a struct whose
fields must stay consistent with one another — a Mutex is the right tool.
Arc gives every thread shared ownership of the same value; Mutex ensures
only one thread mutates it at a time. For a plain integer counter, prefer
AtomicUsize (above); reach for Arc<Mutex<T>> once an update touches several
fields that must change together.
Each thread acquires the lock through Mutex::lock, updates both fields of
the shared Stats, and releases the lock when the returned MutexGuard drops.
Holding one lock across the whole update is what keeps count and total from
ever being observed out of sync — something separate atomics could not
guarantee. A poisoned lock (a thread panicked while holding it) surfaces as an
error rather than a panic. When the shared state grows into its own long-lived
component, the Actor Pattern is the usual scale-up path.
use anyhow::{anyhow, Result};
use std::sync::{Arc, Mutex};
use std::thread;
#[derive(Debug, Default)]
struct Stats {
count: u64,
total: u64,
}
fn main() -> Result<()> {
let stats = Arc::new(Mutex::new(Stats::default()));
let mut handles = Vec::new();
for value in 1..=10 {
let stats = Arc::clone(&stats);
let handle = thread::spawn(move || -> Result<()> {
let mut stats = stats.lock().map_err(|_| anyhow!("mutex poisoned"))?;
// Both fields update under one lock, so count and total can
// never be observed out of step with each other.
stats.count += 1;
stats.total += value;
Ok(())
});
handles.push(handle);
}
for handle in handles {
handle.join().map_err(|_| anyhow!("thread panicked"))??;
}
let stats = stats.lock().map_err(|_| anyhow!("mutex poisoned"))?;
println!("{:?}", *stats);
assert_eq!(stats.count, 10);
assert_eq!(stats.total, 55);
Ok(())
}
Concurrent reads with Arc<RwLock<T>>
When data is read often and written rarely, RwLock is a better fit than
Mutex. It allows any number of concurrent readers or a single exclusive
writer. Readers acquire a shared lock with RwLock::read; a writer acquires an
exclusive lock with RwLock::write that blocks until every reader has released
its lock.
RwLock gives no fairness or priority guarantee, and the policy is
platform-dependent. On some targets a steady stream of readers can keep the lock
held continuously and starve a waiting writer, so reach for RwLock only when
reads genuinely dominate; under heavy contention a plain Mutex (or a
dedicated fair lock) can be the safer choice.
This recipe spawns several reader threads that observe the shared vector simultaneously, plus one writer thread that appends to it under an exclusive lock.
use anyhow::{anyhow, Result};
use std::sync::{Arc, RwLock};
use std::thread;
fn main() -> Result<()> {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = Vec::new();
for id in 0..3 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || -> Result<()> {
let reader = data.read().map_err(|_| anyhow!("lock poisoned"))?;
println!("reader {} sees {:?}", id, *reader);
Ok(())
}));
}
let writer_data = Arc::clone(&data);
handles.push(thread::spawn(move || -> Result<()> {
let mut writer = writer_data.write().map_err(|_| anyhow!("lock poisoned"))?;
writer.push(4);
Ok(())
}));
for handle in handles {
handle.join().map_err(|_| anyhow!("thread panicked"))??;
}
let final_state = data.read().map_err(|_| anyhow!("lock poisoned"))?;
println!("final: {:?}", *final_state);
Ok(())
}
Communicate between threads with mpsc channels
A mpsc (multiple producer, single consumer) channel moves values from one or
more producer threads to a single consumer. channel returns a Sender and
a Receiver; producers push work with Sender::send. The “multiple
producer” half comes from cloning the Sender — every clone feeds the same
Receiver.
The idiomatic way to consume a channel is to iterate the Receiver: the
loop blocks for each value and ends cleanly once every Sender (including
clones) has been dropped. That drop is the channel’s natural shutdown signal, so
the original Sender is dropped explicitly after the producers are spawned.
use anyhow::{anyhow, Result};
use std::sync::mpsc;
use std::thread;
fn main() -> Result<()> {
let (tx, rx) = mpsc::channel();
// One Sender clone per producer thread; all feed the single Receiver.
let producers: Vec<_> = (0..3)
.map(|id| {
let tx = tx.clone();
thread::spawn(move || -> Result<()> {
for item in 0..3 {
tx.send((id, item)).map_err(|e| anyhow!("send failed: {e}"))?;
}
Ok(())
})
})
.collect();
// Drop the original Sender so the channel closes once the clones do;
// otherwise the loop below would block forever waiting for more senders.
drop(tx);
// Blocking, idiomatic consumption: iterate until every Sender is gone.
let mut received = 0;
for (id, item) in rx {
println!("producer {id} sent {item}");
received += 1;
}
for producer in producers {
producer.join().map_err(|_| anyhow!("producer panicked"))??;
}
assert_eq!(received, 9);
Ok(())
}
For the common “block, but wake up periodically to do maintenance” case, reach
for Receiver::recv_timeout rather than polling. Save Receiver::try_recv
for the situation it is actually built for: a consumer that runs its own loop
and only wants to fold in whatever has arrived, without ever stalling on the
channel. The loop below models that — each iteration does a unit of its own work
and then drains any messages that happen to be ready. TryRecvError::Empty
means “nothing queued, carry on”; TryRecvError::Disconnected means every
Sender is gone.
use anyhow::{anyhow, Result};
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
use std::time::Duration;
fn main() -> Result<()> {
let (tx, rx) = mpsc::channel();
let producer = thread::spawn(move || -> Result<()> {
for command in ["load", "render", "save"] {
tx.send(command).map_err(|e| anyhow!("send failed: {e}"))?;
thread::sleep(Duration::from_millis(10));
}
Ok(())
});
let mut frame = 0u32;
loop {
// The consumer's own work — this is why try_recv beats a blocking
// recv() here: the loop must keep ticking even with no input.
frame += 1;
thread::sleep(Duration::from_millis(3)); // stand-in for real work
match rx.try_recv() {
Ok(command) => println!("frame {frame}: handling {command}"),
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => break,
}
}
producer.join().map_err(|_| anyhow!("producer panicked"))??;
println!("ran {frame} frames");
Ok(())
}
Coordinate thread phases with Barrier
A Barrier synchronizes a fixed number of threads at a common point. Each
thread calls Barrier::wait; the call blocks until exactly that many threads
have reached it, then releases them all together. This guarantees every thread
finishes phase 1 before any thread begins phase 2.
The barrier is constructed for three threads, so each of the three workers blocks
in Barrier::wait until its peers catch up.
use anyhow::{anyhow, Result};
use std::sync::{Arc, Barrier};
use std::thread;
fn main() -> Result<()> {
let barrier = Arc::new(Barrier::new(3));
let mut handles = Vec::new();
for id in 0..3 {
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
println!("thread {} finished phase 1", id);
// No thread proceeds until all three have arrived.
barrier.wait();
println!("thread {} starting phase 2", id);
}));
}
for handle in handles {
handle.join().map_err(|_| anyhow!("thread panicked"))?;
}
Ok(())
}
Signal a waiting thread with Condvar
A Condvar (condition variable) lets a thread sleep until another thread
signals that some shared state has changed, without busy-waiting. It is always
paired with a Mutex that protects the state being watched.
The waiting thread locks the mutex, then loops while the condition is false,
releasing the lock and sleeping inside Condvar::wait. The signaling thread
locks the same mutex, updates the state, drops the guard, and calls
Condvar::notify_one to wake the waiter. The while loop guards against
spurious wakeups: the waiter re-checks the condition every time it wakes.
Dropping the guard before notifying is a small but worthwhile habit: if the
lock is still held when Condvar::notify_one fires, the woken thread wakes
only to block again immediately on re-acquiring the mutex. Releasing first lets
it proceed straight away. The result is correct either way — only the hand-off is
tidier.
use anyhow::{anyhow, Result};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() -> Result<()> {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let signaler = Arc::clone(&pair);
let worker = thread::spawn(move || -> Result<()> {
let (lock, cvar) = &*signaler;
let mut ready = lock.lock().map_err(|_| anyhow!("mutex poisoned"))?;
*ready = true;
// Release the lock before signaling so the woken thread can proceed
// immediately instead of blocking to re-acquire it.
drop(ready);
cvar.notify_one();
Ok(())
});
let (lock, cvar) = &*pair;
let mut ready = lock.lock().map_err(|_| anyhow!("mutex poisoned"))?;
while !*ready {
ready = cvar.wait(ready).map_err(|_| anyhow!("mutex poisoned"))?;
}
println!("worker signaled readiness");
worker.join().map_err(|_| anyhow!("worker panicked"))??;
Ok(())
}