Threads
Spawn a short-lived thread
The example uses the crossbeam crate, which provides data structures and functions
for concurrent and parallel programming. Scope::spawn
spawns a new scoped thread that is guaranteed
to terminate before returning from the closure that passed into crossbeam::scope
function, meaning that
you can reference data from the calling function.
This example splits the array in half and performs the work in separate threads.
fn main() { let arr = &[1, 25, -4, 10]; let max = find_max(arr); assert_eq!(max, Some(25)); } fn find_max(arr: &[i32]) -> Option<i32> { const THRESHOLD: usize = 2; if arr.len() <= THRESHOLD { return arr.iter().cloned().max(); } let mid = arr.len() / 2; let (left, right) = arr.split_at(mid); crossbeam::scope(|s| { let thread_l = s.spawn(|_| find_max(left)); let thread_r = s.spawn(|_| find_max(right)); let max_l = thread_l.join().unwrap()?; let max_r = thread_r.join().unwrap()?; Some(max_l.max(max_r)) }).unwrap() }
Create a parallel pipeline
This example uses the crossbeam and crossbeam-channel crates to create a parallel pipeline, similar to that described in the ZeroMQ guide There is a data source and a data sink, with data being processed by two worker threads in parallel on its way from the source to the sink.
We use bounded channels with a capacity of one using
crossbeam_channel::bounded
. The producer must be on its own thread because
it produces messages faster than the workers can process them (since they sleep
for half a second) - this means the producer blocks on the call to
[crossbeam_channel::Sender::send
] for half a second until one of the workers
processes the data in the channel. Also note that the data in the channel is
consumed by whichever worker calls receive first, so each message is delivered
to a single worker rather than both workers.
Reading from the channels via the iterator
crossbeam_channel::Receiver::iter
method will block, either waiting
for new messages or until the channel is closed. Because the channels were
created within the crossbeam::scope
, we must manually close them via drop
to prevent the entire program from blocking on the worker for-loops. You can
think of the calls to drop
as signaling that no more messages will be sent.
use std::thread; use std::time::Duration; use crossbeam_channel::bounded; fn main() { let (snd1, rcv1) = bounded(1); let (snd2, rcv2) = bounded(1); let n_msgs = 4; let n_workers = 2; crossbeam::scope(|s| { // Producer thread s.spawn(|_| { for i in 0..n_msgs { snd1.send(i).unwrap(); println!("Source sent {}", i); } // Close the channel - this is necessary to exit // the for-loop in the worker drop(snd1); }); // Parallel processing by 2 threads for _ in 0..n_workers { // Send to sink, receive from source let (sendr, recvr) = (snd2.clone(), rcv1.clone()); // Spawn workers in separate threads s.spawn(move |_| { thread::sleep(Duration::from_millis(500)); // Receive until channel closes for msg in recvr.iter() { println!("Worker {:?} received {}.", thread::current().id(), msg); sendr.send(msg * 2).unwrap(); } }); } // Close the channel, otherwise sink will never // exit the for-loop drop(snd2); // Sink for msg in rcv2.iter() { println!("Sink received {}", msg); } }).unwrap(); }
Pass data between two threads
This example demonstrates the use of crossbeam-channel in a single producer, single
consumer (SPSC) setting. We build off the ex-crossbeam-spawn example by using
crossbeam::scope
and Scope::spawn
to manage the producer thread. Data is
exchanged between the two threads using a crossbeam_channel::unbounded
channel, meaning there is no limit to the number of storeable messages. The
producer thread sleeps for half a second in between messages.
use std::{thread, time}; use crossbeam_channel::unbounded; fn main() { let (snd, rcv) = unbounded(); let n_msgs = 5; crossbeam::scope(|s| { s.spawn(|_| { for i in 0..n_msgs { snd.send(i).unwrap(); thread::sleep(time::Duration::from_millis(100)); } }); }).unwrap(); for _ in 0..n_msgs { let msg = rcv.recv().unwrap(); println!("Received {}", msg); } }
Maintain global mutable state
Declare global state using lazy_static. lazy_static
creates a globally available static ref
which requires a Mutex
to allow mutation (also see RwLock
). The Mutex
wrap ensures
the state cannot be simultaneously accessed by multiple threads, preventing
race conditions. A MutexGuard
must be acquired to read or mutate the
value stored in a Mutex
.
use error_chain::error_chain; use lazy_static::lazy_static; use std::sync::Mutex; error_chain!{ } lazy_static! { static ref FRUIT: Mutex<Vec<String>> = Mutex::new(Vec::new()); } fn insert(fruit: &str) -> Result<()> { let mut db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?; db.push(fruit.to_string()); Ok(()) } fn main() -> Result<()> { insert("apple")?; insert("orange")?; insert("peach")?; { let db = FRUIT.lock().map_err(|_| "Failed to acquire MutexGuard")?; db.iter().enumerate().for_each(|(i, item)| println!("{}: {}", i, item)); } insert("grape")?; Ok(()) }
Calculate SHA256 sum of iso files concurrently
This example calculates the SHA256 for every file with iso extension in the
current directory. A threadpool generates threads equal to the number of cores
present in the system found with num_cpus::get
. Walkdir::new
iterates
the current directory and calls execute
to perform the operations of reading
and computing SHA256 hash.
use walkdir::WalkDir; use std::fs::File; use std::io::{BufReader, Read, Error}; use std::path::Path; use threadpool::ThreadPool; use std::sync::mpsc::channel; use ring::digest::{Context, Digest, SHA256}; // Verify the iso extension fn is_iso(entry: &Path) -> bool { match entry.extension() { Some(e) if e.to_string_lossy().to_lowercase() == "iso" => true, _ => false, } } fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<(Digest, P), Error> { let mut buf_reader = BufReader::new(File::open(&filepath)?); let mut context = Context::new(&SHA256); let mut buffer = [0; 1024]; loop { let count = buf_reader.read(&mut buffer)?; if count == 0 { break; } context.update(&buffer[..count]); } Ok((context.finish(), filepath)) } fn main() -> Result<(), Error> { let pool = ThreadPool::new(num_cpus::get()); let (tx, rx) = channel(); for entry in WalkDir::new("/home/user/Downloads") .follow_links(true) .into_iter() .filter_map(|e| e.ok()) .filter(|e| !e.path().is_dir() && is_iso(e.path())) { let path = entry.path().to_owned(); let tx = tx.clone(); pool.execute(move || { let digest = compute_digest(path); tx.send(digest).expect("Could not send data!"); }); } drop(tx); for t in rx.iter() { let (sha, path) = t?; println!("{:?} {:?}", sha, path); } Ok(()) }
Draw fractal dispatching work to a thread pool
This example generates an image by drawing a fractal from the Julia set with a thread pool for distributed computation.
Allocate memory for output image of given width and height with ImageBuffer::new
.
Rgb::from_channels
calculates RGB pixel values.
Create ThreadPool
with thread count equal to number of cores with num_cpus::get
.
ThreadPool::execute
receives each pixel as a separate job.
mpsc::channel
receives the jobs and Receiver::recv
retrieves them.
ImageBuffer::put_pixel
uses the data to set the pixel color.
ImageBuffer::save
writes the image to output.png
.
use error_chain::error_chain; use std::sync::mpsc::{channel, RecvError}; use threadpool::ThreadPool; use num::complex::Complex; use image::{ImageBuffer, Pixel, Rgb}; error_chain! { foreign_links { MpscRecv(RecvError); Io(std::io::Error); Image(image::ImageError); } } // Function converting intensity values to RGB // Based on http://www.efg2.com/Lab/ScienceAndEngineering/Spectra.htm fn wavelength_to_rgb(wavelength: u32) -> Rgb<u8> { let wave = wavelength as f32; let (r, g, b) = match wavelength { 380..=439 => ((440. - wave) / (440. - 380.), 0.0, 1.0), 440..=489 => (0.0, (wave - 440.) / (490. - 440.), 1.0), 490..=509 => (0.0, 1.0, (510. - wave) / (510. - 490.)), 510..=579 => ((wave - 510.) / (580. - 510.), 1.0, 0.0), 580..=644 => (1.0, (645. - wave) / (645. - 580.), 0.0), 645..=780 => (1.0, 0.0, 0.0), _ => (0.0, 0.0, 0.0), }; let factor = match wavelength { 380..=419 => 0.3 + 0.7 * (wave - 380.) / (420. - 380.), 701..=780 => 0.3 + 0.7 * (780. - wave) / (780. - 700.), _ => 1.0, }; let (r, g, b) = (normalize(r, factor), normalize(g, factor), normalize(b, factor)); Rgb::from_channels(r, g, b, 0) } // Maps Julia set distance estimation to intensity values fn julia(c: Complex<f32>, x: u32, y: u32, width: u32, height: u32, max_iter: u32) -> u32 { let width = width as f32; let height = height as f32; let mut z = Complex { // scale and translate the point to image coordinates re: 3.0 * (x as f32 - 0.5 * width) / width, im: 2.0 * (y as f32 - 0.5 * height) / height, }; let mut i = 0; for t in 0..max_iter { if z.norm() >= 2.0 { break; } z = z * z + c; i = t; } i } // Normalizes color intensity values within RGB range fn normalize(color: f32, factor: f32) -> u8 { ((color * factor).powf(0.8) * 255.) as u8 } fn main() -> Result<()> { let (width, height) = (1920, 1080); let mut img = ImageBuffer::new(width, height); let iterations = 300; let c = Complex::new(-0.8, 0.156); let pool = ThreadPool::new(num_cpus::get()); let (tx, rx) = channel(); for y in 0..height { let tx = tx.clone(); pool.execute(move || for x in 0..width { let i = julia(c, x, y, width, height, iterations); let pixel = wavelength_to_rgb(380 + i * 400 / iterations); tx.send((x, y, pixel)).expect("Could not send data!"); }); } for _ in 0..(width * height) { let (x, y, pixel) = rx.recv()?; img.put_pixel(x, y, pixel); } let _ = img.save("output.png")?; Ok(()) }