Single-reader, multi-writer & single-reader, multi-verifier; broadcasts reads to multiple writeable destinations in parallel

Overview

Bus Writer

This Rust crate provides a generic single-reader, multi-writer, with support for callbacks for monitoring progress. It also provides a generic single-reader, multi-verifier so that you can verify the results using a similar technique. You provide any type which implements io::Read as the source, and a collection of destinations which implement io::Write. Callbacks may be given to control the cancellation of the writes & verifies, and to monitor the progress of each destination.

Why

The use case at System76 for this crate is for our Popsicle[0] utility. Popsicle is a CLI + GTK3 utility which reads from an ISO, and writes the ISO to all designated USB devices in parallel. Flashing USB drives serially can be very time-consuming as doing it in parallel with traditional tools will cause significant amounts of read I/O. There also doesn't exist any decent utilities, CLI or GTK, that can handle the process in a simple manner.

Implementation

Critical to the implementation is our usage of the bus crate for the Bus channel type within. As written in the documentation for bus, it is a "lock-free, bounded, single-producer, multi-consumer, broadcast channel"[1]. The goal of the BusWriter is to read from the source destination and transmit Arc'd buffers of the buffered data to every destination.

Each destination spawns a thread that listens for these buckets of data to write, and transmits events to another thread which monitors events from these threads and uses the provided callbacks so that the caller can handle these events (such as for use within progress bars to track progress of each device).

The main event loop blocks when the bus has reached the max limit of unread messages in the channel, which is to prevent your application from exceeding all of your available memory, which can easily happen if it takes longer to write to the destinations than it is to read from your source. Thus, this makes it possible to write large files to multiple destinations without buffering the entire file into memory in advance.

Blocking on broadcasts when an upper bound is reached does mean that devices that write too quickly will block once they have exhausted all of the messages on their end. Therefore, your writes will be limited to the speed of the slowest device.

Once reading has finished, the function will wait for all background threads to finish before returning a result. This is to ensure that all events have been received and processed.

Unsafe

There is no usage of unsafe in our source code, though dependencies like bus and crossbeam may use it.

Configurability

By default, a bucket size of 16 MiB. This is configurable using the BusWriter::with_bucket() method, which allows you to provide your own mutable reference to a buffer for storing reads in the main event loop. This allows allows you to reuse an exesting buffer, so it is an optimization choice for those who want it.

Additionally, up to 4 buckets are stored within the inner Bus at a given time. The BusWriter::buckets() method can configure the number of buckets that you want to use instead.

References

Example

extern crate bus_writer;

use bus_writer::*;
use std::io::{BufReader, Cursor, Read};
use std::fs::{self, File};
use std::process::exit;

fn main() {
    let data: Vec<u8> = [0u8; 1024 * 1024 * 5].into_iter()
        .zip([1u8; 1024 * 1024 * 5].into_iter())
        .cycle()
        .take(50 * 1024 * 1024)
        .fold(Vec::with_capacity(100 * 1024 * 1024), |mut acc, (&x, &y)| {
            acc.push(x);
            acc.push(y);
            acc
        });

    let mut source = Cursor::new(&data);

    let files = ["a", "b", "c", "d", "e", "f", "g", "h"];
    let mut temp_files = [
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[0]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[1]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[2]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[3]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[4]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[5]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[6]).unwrap(),
        fs::OpenOptions::new().read(true).write(true).create(true).open(files[7]).unwrap(),
    ];

    let mut errored = false;
    let result = BusWriter::new(
        &mut source,
        &mut temp_files,
        // Reports progress of each device so that callers may create their own progress bars
        // for each destination being written to, as seen in System76's Popsicle GTK UI.
        |event| match event {
            BusWriterMessage::Written { id, bytes_written } => {
                println!("{}: {} total bytes written", files[id], bytes_written);
            }
            BusWriterMessage::Completed { id } => {
                println!("{}: Completed", files[id]);
            }
            BusWriterMessage::Errored { id, why } => {
                println!("{} errored: {}", files[id], why);
                errored = true;
            }
        },
        // Executed at certain points while writing to check if the process needs to be cancelled
        || false
    ).write();

    if let Err(why) = result {
        eprintln!("writing failed: {}", why);
        exit(1);
    } else if errored {
        eprintln!("an error occurred");
        exit(1);
    }

    eprintln!("finished writing; validating files");

    let result = BusVerifier::new(
        source,
        &mut temp_files,
        |event| match event {
            BusVerifierMessage::Read { id, bytes_read } => {
                println!("{}: {} bytes verified", files[id], bytes_read);
            }
            BusVerifierMessage::Valid { id } => {
                println!("{}: Validated", files[id]);
            }
            BusVerifierMessage::Invalid { id } => {
                println!("{}: Invalid", id);
                errored = true;
            }
            BusVerifierMessage::Errored { id, why } => {
                println!("{} errored while verifying: {}", files[id], why);
                errored = true;
            }
        },
        || false
    ).verify();

    if let Err(why) = result {
        eprintln!("writing failed: {}", why);
        exit(1);
    } else if errored {
        eprintln!("Error occurred");
        exit(1);
    }

    eprintln!("All files validated!");
}
You might also like...
A STARK prover and verifier for arbitrary computations.

A STARK is a novel proof-of-computation scheme to create efficiently verifiable proofs of the correct execution of a computation. The scheme was developed by Eli Ben-Sasson, Michael Riabzev et al. at Technion - Israel Institute of Technology. STARKs do not require an initial trusted setup, and rely on very few cryptographic assumptions. See references for more info.

A certificate verification library for rustls that uses the operating system's verifier

rustls-platform-verifier A Rust library to verify the validity of TLS certificates based on the operating system's certificate facilities. On operatin

A certificate verification library for rustls that uses the operating system's verifier

rustls-platform-verifier A Rust library to verify the validity of TLS certificates based on the operating system's certificate facilities. On operatin

Simple, bare-minimum recaptcha verifier helper

recaptcha-verify Simple, bare-minimum recaptcha verifier helper Quick Start This library is supposed to be a (near) drop-in replacement for recaptcha-

Create a Stark prover & verifier from zero

stark-from-zero Create a Stark prover and verifier from zero, with Rust. Hopefully without external libraries. The point is to create a minimal versio

Open Graphic Image Writer
Open Graphic Image Writer

Open Graphic Image Writer Documentation You can generate Open Graphic Image dynamically. A CSS-like API. You can generate image by using template imag

A library to create zip files on a non-seekable writer

A library to create zip files on a non-seekable writer

Fast & Memory Efficient NodeJs Excel Writer using Rust Binding

FastExcel This project need Rust to be installed, check here for Rust installation instruction This project using Rust and Neon as a binding to Rust t

Hitbox is an asynchronous caching framework supporting multiple backends and suitable for distributed and for single-machine applications.

Hitbox is an asynchronous caching framework supporting multiple backends and suitable for distributed and for single-machine applications.

CLI tool that make it easier to perform multiple lighthouse runs towards a single target and output the result in a "plotable" format.

Lighthouse Groupie CLI tool that make it easier to perform multiple lighthouse runs towards a single target and output the result in a "plotable" form

CLI tool that make it easier to perform multiple lighthouse runs towards a single target and output the result in a plotable format.

Lighthouse Aggregator CLI tool that make it easier to perform multiple lighthouse runs towards a single target and output the result in a "plotable" f

A command line tool for easily generating multiple versions of a configuration file from a single template

MultiConf A command line tool for easily generating multiple versions of a configuration file from a single template. Why? I'm a big fan of the i3 win

Set Shell Environment Variables across multiple shells with a single configuration file.

Xshe – Cross-Shell Environment Vars xshe allows for setting Shell Environment Variables across multiple shells with a single TOML configuration file.

Multiplex server for rust-analyzer, allows multiple LSP clients (editor windows) to share a single rust-analyzer instance per cargo workspace

ra-multiplex   Multiplex server for rust-analyzer, allows multiple LSP clients (editor windows) to share a single rust-analyzer instance per cargo wor

Merge multiple Juniper object definitions into a single object type.

juniper-compose Merge multiple Juniper object definitions into a single object type. crates.io | docs | github Motivation You are building a GraphQL s

A Rust crate that reads and writes tfrecord files

tfrecord-rust The crate provides the functionality to serialize and deserialize TFRecord data format from TensorFlow. Features Provide both high level

Custom memory allocator that helps discover reads from uninitialized memory

libdiffuzz: security-oriented alternative to Memory Sanitizer This is a drop-in replacement for OS memory allocator that can be used to detect uses of

This app reads a csv file and sends an email with a formatted Handlebars file.

Bulkmail This app reads a csv file and sends an email with a formatted Handlebars file. This can be run on Linux for AMD64 and ARMv7. Upstream Links D

Midnote is a terminal application that reads a MIDI file and displays you its notes bar-by-bar, while playing it.

MIDNOTE Midnote is a terminal application that reads a MIDI file and displays you its notes bar-by-bar, while playing it. Goals As a blind musician my

Comments
  • Support Async I/O with Futures

    Support Async I/O with Futures

    The current API is based on blocking I/O, but we may be able to offer improved performance in some scenarios with a non-blocking interface based on Futures.

    enhancement help wanted good first issue 
    opened by mmstick 0
Owner
Pop!_OS
An Operating System by System76
Pop!_OS
Rust high performance xml reader and writer

quick-xml High performance xml pull reader/writer. The reader: is almost zero-copy (use of Cow whenever possible) is easy on memory allocation (the AP

Johann Tuffe 802 Dec 31, 2022
GPIO reader, writer and listener

Unbothered gpio Everything is unwrapped under the hood for the precious prettiness of your code. It's more than a simple Rust crate, it's a philosophy

null 0 Nov 7, 2021
Stack buffer provides alternatives to Buf{Reader,Writer} allocated on the stack instead of the heap.

StackBuf{Reader,Writer} Stack buffer provides alternatives to BufReader and BufWriter allocated on the stack instead of the heap. Its implementation i

Alex Saveau 14 Nov 20, 2022
Dynamic csv reader, editor, writer

dcsv Dynamic csv reader, editor, and writer library. If you use structured csv data, use csv crate Feature Read csv which has undecided format Optiona

Simhyeon 2 May 10, 2022
Rust low-level minimalist APNG writer and PNG reader with just a few dependencies with all possible formats coverage (including HDR).

project Wiki https://github.com/js29a/micro_png/wiki at glance use micro_png::*; fn main() { // load an image let image = read_png("tmp/test.

jacek SQ6KBQ 8 Aug 30, 2023
ABQ is a universal test runner that runs test suites in parallel. It’s the best tool for splitting test suites into parallel jobs locally or on CI

?? abq.build   ?? @rwx_research   ?? discord   ?? documentation ABQ is a universal test runner that runs test suites in parallel. It’s the best tool f

RWX 13 Apr 7, 2023
A fetcher hook for the Plato document reader that syncs an e-reader with an OPDS catalogue.

plato-opds A fetcher hook for the Plato document reader that syncs an e-reader with an OPDS catalogue. Motivation I wanted to be able to sync my e-rea

null 11 Nov 8, 2023
Splits test files into multiple groups to run tests in parallel nodes

split-test split-test splits tests into multiple groups based on timing data to run tests in parallel. Installation Download binary from GitHub releas

Fumiaki MATSUSHIMA 28 Dec 12, 2022
Download a file using multiple threads in parallel for faster download speeds.

multidl Download a file using multiple threads in parallel for faster download speeds. Uses 0 external dependencies. Usage Usage: multidl [--help] ADD

Divyanshu Agrawal 2 Sep 12, 2021
A PAM module that runs multiple other PAM modules in parallel, succeeding as long as one of them succeeds.

PAM Any A PAM module that runs multiple other PAM modules in parallel, succeeding as long as one of them succeeds. Development I created a VM to test

Rajas Paranjpe 8 Apr 23, 2024