Rust Parallel Iterator With Output Sequential Consistency

Overview

par_iter_sync: Parallel Iterator With Sequential Output

Crate like rayon do not offer synchronization mechanism. This crate provides easy mixture of parallelism and synchronization, such as executing tasks in concurrency with synchronization in certain steps.

Consider the case where multiple threads share a cache which can be read only after prior tasks have written to it (e.g., reads of task 4 depends on writes of task 1-4).

Using IntoParallelIteratorSync trait

// in concurrency: task1 write | task2 write | task3 write | task4 write
//                      \_____________\_____________\_____________\
//             task4 read depends on task 1-4 write  \___________
//                                                               \
// in concurrency:              | task2 read  | task3 read  | task4 read

use par_iter_sync::IntoParallelIteratorSync;
use std::sync::{Arc, Mutex};
use std::collections::HashSet;

// there are 100 tasks
let tasks = 0..100;

// an in-memory cache for integers
let cache: Arc<Mutex<HashSet<i32>>> = Arc::new(Mutex::new(HashSet::new()));
let cache_clone = cache.clone();

// iterate through tasks
tasks.into_par_iter_sync(move |task_number| {

    // writes cache (write the integer in cache), in parallel
    cache.lock().unwrap().insert(task_number);
    // return the task number to the next iterator
    Ok(task_number)

}).into_par_iter_sync(move |task_number| { // <- synced to sequential order

    // reads
    assert!(cache_clone.lock().unwrap().contains(&task_number));
    Ok(())

// append a for each to actually run the whole chain
}).for_each(|_| ());

Sequential Consistency

The output order is guaranteed to be the same as the upstream iterator, but the execution order is not sequential.

Examples

Mix Syncing and Parallelism By Chaining

use par_iter_sync::IntoParallelIteratorSync;

(0..100).into_par_iter_sync(|i| {
    Ok(i)                     // <~ async execution
}).into_par_iter_sync(|i| { // <- sync order
    Ok(i)                     // <~async execution
}).into_par_iter_sync(|i| { // <- sync order
    Ok(i)                     // <~async execution
});                           // <- sync order

Use std::iter::IntoIterator interface

use par_iter_sync::IntoParallelIteratorSync;

let mut count = 0;

// for loop
for i in (0..100).into_par_iter_sync(|i| Ok(i)) {
    assert_eq!(i, count);
    count += 1;
}

// sum
let sum: i32 = (1..=100).into_par_iter_sync(|i| Ok(i)).sum();

// take and collect
let results: Vec<i32> = (0..10).into_par_iter_sync(|i| Ok(i)).take(5).collect();

assert_eq!(sum, 5050);
assert_eq!(results, vec![0, 1, 2, 3, 4])

Closure Captures Variables

Variables captured are cloned to each threads automatically.

use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;

// use `Arc` to save RAM
let resource_captured = Arc::new(vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3]);
let len = resource_captured.len();

let result_iter = (0..len).into_par_iter_sync(move |i| {
    // `resource_captured` is moved into the closure
    // and cloned to worker threads.
    let read_from_resource = resource_captured.get(i).unwrap();
    Ok(*read_from_resource)
});

// the result is produced in sequential order
let collected: Vec<i32> = result_iter.collect();
assert_eq!(collected, vec![3, 1, 4, 1, 5, 9, 2, 6, 5, 3])

Fast Fail During Exception

The iterator stops once the inner function returns an Err.

use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;
use log::warn;

/// this function returns `Err` when it reads 1000
fn error_at_1000(n: i32) -> Result<i32, ()> {
    if n == 1000 {
        // you may log this error
        warn!("Some Error Occurs");
        Err(())
    } else {
        Ok(n)
    }
}

let results: Vec<i32> = (0..10000).into_par_iter_sync(move |a| {
    Ok(a)
}).into_par_iter_sync(move |a| {
    // error at 1000
    error_at_1000(a)
}).into_par_iter_sync(move |a| {
    Ok(a)
}).collect();

let expected: Vec<i32> = (0..1000).collect();
assert_eq!(results, expected)

You may choose to skip error

If you do not want to stop on Err, this is a workaround.

use par_iter_sync::IntoParallelIteratorSync;
use std::sync::Arc;

let results: Vec<Result<i32, ()>> = (0..5).into_par_iter_sync(move |n| {
    // error at 3, but skip
    if n == 3 {
        Ok(Err(()))
    } else {
        Ok(Ok(n))
    }
}).collect();

assert_eq!(results, vec![Ok(0), Ok(1), Ok(2), Err(()), Ok(4)])

Implementation Note

Output Buffering

  • Each worker use a synced single-producer mpsc channel to buffer outputs. So, when a thread is waiting for its turn to get polled, it does not get blocked. The channel size is hard-coded to 10 for each thread.
  • The number of threads equals to the number of logical cores.

Synchronization Mechanism

  • When each thread fetch a task, it registers its thread ID (thread_number) and the task ID (task_number) into a mpsc channel.
  • When next() is called, the consumer fetch from the task registry (task_order) the next thread ID and task ID. It then receives from the channel of that thread, and checks whether the current task (current) matches the task ID to ensure that no thread has run into exception.
  • If next() detect that some thread has not produced result due to exception, it calls kill(), which stop threads from fetching new tasks, flush remaining tasks, and joining the worker threads.

Error handling and Dropping

  • When any exception occurs, stop producers from fetching new task.
  • Before dropping the structure, stop all producers from fetching tasks, flush all remaining tasks, and join all threads.
You might also like...
Parallel iterator processing library for Rust

Parallel iterator processing library for Rust I keep needing one, so I wrote it. See [IteratorExt] for supported operations. In essence, if you have:

ABQ is a universal test runner that runs test suites in parallel. It’s the best tool for splitting test suites into parallel jobs locally or on CI

🌐 abq.build   🐦 @rwx_research   💬 discord   📚 documentation ABQ is a universal test runner that runs test suites in parallel. It’s the best tool f

tiny_id is a Rust library for generating non-sequential, tightly-packed short IDs.

tiny_id tiny_id is a Rust library for generating non-sequential, tightly-packed short IDs. Most other short ID generators just string together random

The [cain!] macro is a macro that rewrites sequential Rust branch statements into nested branches

Note! This crate is experimental and under development. It may include bugs that alter the behavior of your code in unexpected ways. You should review

Library containing implementations of various sequential data-structures.

Library containing implementations of various sequential data-structures.

bevy_sequential_actions is a library for the Bevy game engine that aims to execute a list of actions in a sequential manner.
bevy_sequential_actions is a library for the Bevy game engine that aims to execute a list of actions in a sequential manner.

Bevy Sequential Actions bevy_sequential_actions is a library for the Bevy game engine that aims to execute a list of actions in a sequential manner. T

Linked Atomic Random Insert Vector: a thread-safe, self-memory-managed vector with no guaranteed sequential insert.
Linked Atomic Random Insert Vector: a thread-safe, self-memory-managed vector with no guaranteed sequential insert.

Linked Atomic Random Insert Vector Lariv is a thread-safe, self-memory-managed vector with no guaranteed sequential insert. It internally uses a linke

A syntax exploration of eventually stable Rust Iterator items

Rust Iterator Items: a syntax exploration This crate is a thin wrapper around the unstable generator feature, allowing users to create new items that

A more free Rust-Iterator.
A more free Rust-Iterator.

CURSOR A more free Rust-Iterator. | Examples | Docs | Latest Note | [dependencies] cursor = "1.0.0" Example use cursor::*; fn example7() - u8 {

A rust crate can find first `Err` in `IteratorResultT, E` and iterating continuously, without allocation.

Api Document first-err Find the first Err in IteratorResultT, E and allow iterating continuously. This crate is specifically designed to replace t

 An iterator following a space-filling pattern over a given range
An iterator following a space-filling pattern over a given range

rlp-iter rlp-iter (Resolving Lattice Point Iterator) is an iterator that returns a space-filling permutation of integers in a given range. Specificall

Peekable iterator that allows to peek the next N elements without consuming them.

peekaboo docs - crates.io Peekable iterator that allows to peek the next N elements without consuming them. It's no_std compatible by default. It also

Wrap a standalone FFmpeg binary in an intuitive Iterator interface. 🏍

FFmpeg Sidecar 🏍 Wrap a standalone FFmpeg binary in an intuitive Iterator interface. Motivation The core goal of this project is to provide a method

A lending iterator trait based on generic associated types and higher-rank trait bounds

A lending iterator trait based on higher-rank trait bounds (HRTBs) A lending iterator is an iterator which lends mutable borrows to the items it retur

Fast, parallel, extensible and adaptable genetic algorithms framework written in Rust

oxigen Oxigen is a parallel genetic algorithm framework implemented in Rust. The name comes from the merge of OXIdación (Rust translated to Spanish) a

Rust crate for configurable parallel web crawling, designed to crawl for content

url-crawler A configurable parallel web crawler, designed to crawl a website for content. Changelog Docs.rs Example extern crate url_crawler; use std:

A massively parallel, optimal functional runtime in Rust
A massively parallel, optimal functional runtime in Rust

High-order Virtual Machine (HVM) is a pure functional compile target that is lazy, non-garbage-collected and massively parallel. It is also beta-optimal, meaning that, in several cases, it can be exponentially faster than most functional runtimes, including Haskell's GHC.

This project now lives on in a rewrite at https://gitlab.redox-os.org/redox-os/parallel

MIT/Rust Parallel: A Command-line CPU Load Balancer Written in Rust This is an attempt at recreating the functionality of GNU Parallel, a work-stealer

A parallel universal-ctags wrapper for git repository

ptags A parallel universal-ctags wrapper for git repository Description ptags is a universal-ctags wrapper to have the following features. Search git

Releases(v0.1.10)
Tools for concurrent programming in Rust

Crossbeam This crate provides a set of tools for concurrent programming: Atomics AtomicCell, a thread-safe mutable memory location.(no_std) AtomicCons

Crossbeam 5.7k Dec 30, 2022
Abstract over the atomicity of reference-counting pointers in rust

Archery Archery is a rust library that offers a way to abstraction over Rc and Arc smart pointers. This allows you to create data structures where the

Diogo Sousa 107 Nov 23, 2022
Rayon: A data parallelism library for Rust

Rayon Rayon is a data-parallelism library for Rust. It is extremely lightweight and makes it easy to convert a sequential computation into a parallel

null 7.8k Jan 7, 2023
Coroutine Library in Rust

coroutine-rs Coroutine library in Rust [dependencies] coroutine = "0.8" Usage Basic usage of Coroutine extern crate coroutine; use std::usize; use co

Rust中文社区 404 Dec 31, 2022
Coroutine I/O for Rust

Coroutine I/O Coroutine scheduling with work-stealing algorithm. WARN: Possibly crash because of TLS inline, check https://github.com/zonyitoo/coio-rs

ty 454 Dec 2, 2022
Cross-platform Rust wrappers for the USB ID Repository

usb-ids Cross-platform Rust wrappers for the USB ID Repository. This library bundles the USB ID database, allowing platforms other than Linux to query

William Woodruff 18 Dec 14, 2022
Rust Ethereum 2.0 Client

Lighthouse: Ethereum 2.0 An open-source Ethereum 2.0 client, written in Rust and maintained by Sigma Prime. Documentation Overview Lighthouse is: Read

Sigma Prime 2.1k Jan 6, 2023
Implementação de uma Skip List em Rust

SkipList SkipList é uma estrutura descrita em 1989 por William Pugh que se baseia em balancear de forma probabilística atalhos de um item a outro com

Rodrigo Crispim 3 Apr 27, 2022
Extra iterator adaptors, iterator methods, free functions, and macros.

Itertools Extra iterator adaptors, functions and macros. Please read the API documentation here How to use with cargo: [dependencies] itertools = "0.1

null 1.9k Dec 30, 2022
An iterator adapter to peek at future elements without advancing the cursor of the underlying iterator.

multipeek An iterator adapter to peek at future elements without advancing the cursor of the underlying iterator. Check out the documentation for more

Luca Palmieri 20 Jul 16, 2022