A lock-free multi-producer multi-consumer unbounded queue.

Overview

lf-queue

Crates.io Documentation Build Status MIT licensed

A lock-free multi-producer multi-consumer unbounded queue.

Examples

[dependencies]
lf-queue = "0.1"

Single Producer - Single Consumer:

use lf_queue::Queue;

fn main() {
    const COUNT: usize = 1_000;
    let queue: Queue<usize> = Queue::new();

    for i in 0..COUNT {
        queue.push(i);
    }

    for i in 0..COUNT {
        assert_eq!(i, queue.pop().unwrap());
    }

    assert!(queue.pop().is_none());
}

Multi Producer - Single Consumer:

use lf_queue::Queue;
use std::thread;

fn main() {
    const COUNT: usize = 1_000;
    const CONCURRENCY: usize = 4;

    let queue: Queue<usize> = Queue::new();

    let ths: Vec<_> = (0..CONCURRENCY)
        .map(|_| {
            let q = queue.clone();
            thread::spawn(move || {
                for i in 0..COUNT {
                    q.push(i);
                }
            })
        })
        .collect();

    for th in ths {
        th.join().unwrap();
    }

    for _ in 0..COUNT * CONCURRENCY {
        assert!(queue.pop().is_some());
    }

    assert!(queue.pop().is_none());
}

Single Producer - Multi Consumer:

use lf_queue::Queue;
use std::thread;

fn main() {
    const COUNT: usize = 1_000;
    const CONCURRENCY: usize = 4;

    let queue: Queue<usize> = Queue::new();

    for i in 0..COUNT * CONCURRENCY {
        queue.push(i);
    }

    let ths: Vec<_> = (0..CONCURRENCY)
        .map(|_| {
            let q = queue.clone();
            thread::spawn(move || {
                for _ in 0..COUNT {
                    loop {
                        if q.pop().is_some() {
                            break;
                        }
                    }
                }
            })
        })
        .collect();

    for th in ths {
        th.join().unwrap();
    }

    assert!(queue.pop().is_none());
}

Multi Producer - Multi Consumer:

use lf_queue::Queue;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    const COUNT: usize = 1_000;
    const CONCURRENCY: usize = 4;

    let queue: Queue<usize> = Queue::new();
    let items = Arc::new((0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>());

    let ths: Vec<_> = (0..CONCURRENCY)
        .map(|_| {
            let q = queue.clone();
            let its = items.clone();
            thread::spawn(move || {
                for _ in 0..COUNT {
                    let n = loop {
                        if let Some(x) = q.pop() {
                            break x;
                        } else {
                            thread::yield_now();
                        }
                    };
                    its[n].fetch_add(1, Ordering::SeqCst);
                }
            })
        })
        .map(|_| {
            let q = queue.clone();
            thread::spawn(move || {
                for i in 0..COUNT {
                    q.push(i);
                }
            })
        })
        .collect();

    for th in ths {
        th.join().unwrap();
    }

    thread::sleep(std::time::Duration::from_millis(10));

    for c in &*items {
        assert_eq!(c.load(Ordering::SeqCst), CONCURRENCY);
    }

    assert!(queue.pop().is_none());
}

Acknowledgement

This implementation of a lock-free queue in Rust took inspiration from the concurrent-queue crate and aims to be used for educational purposes. The code documentation help you to discover the algorithm used to implement a concurrent lock-free queue in Rust, but might not yet be beginner-friendly. More details and learning materials will be added over time.

License

This project is licensed under the MIT license.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be licensed as above, without any additional terms or conditions.

Note that, as of now, my focus is on improving the documentation of this crate, not adding any additional feature. Please open an issue and start a discussion before working on any significant PR.

Contributions are welcome.

You might also like...
A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping

Swap Queue A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping. This is meant to be used [thread_local]

Handoff is an unbuffered, single-producer / single-consumer, async channel

handoff handoff is a single-producer / single-consumer, unbuffered, asynchronous channel. It's intended for cases where you want blocking communicatio

A lock-free, partially wait-free, eventually consistent, concurrent hashmap.
A lock-free, partially wait-free, eventually consistent, concurrent hashmap.

A lock-free, partially wait-free, eventually consistent, concurrent hashmap. This map implementation allows reads to always be wait-free on certain pl

🍋: A General Lock following paper
🍋: A General Lock following paper "Optimistic Lock Coupling: A Scalable and Efficient General-Purpose Synchronization Method"

Optimistic Lock Coupling from paper "Optimistic Lock Coupling: A Scalable and Efficient General-Purpose Synchronization Method" In actual projects, th

A balanced unbounded interval-tree in Rust with associated values in the nodes

store-interval-tree A balanced unbounded interval-tree in Rust with associated values in the nodes. Based on rudac and bio. Example use store_interval

wait-free spsc linked-list queue with individually reusable nodes

A wait-free single-producer single-consumer linked-list queue with individually reusable nodes.

Garbage Collector(Hyaline- Safe Memory Reclaimation) for lock free data structures

Hyaline-SMR This crate provides garbage collection using hyaline algorithm for building concurrent data structures. When a thread removes an object fr

Quinine is a Rust library that implements atomic, lock-free, but write-once versions of containers like `Box` or `Arc`

Quinine is a Rust library that implements atomic, lock-free, but write-once versions of containers like `Box` or `Arc`

A lock-free, append-only atomic pool.

A lock-free, append-only atomic pool. This library implements an atomic, append-only collection of items, where individual items can be acquired and r

Free and open-source reimplementation of Native Mouse Lock (display_mouse_lock) in rust.

dml-rs display_mouse_lock in rust. Free, open-source reimplementation of display_mouse_lock (Native Mouse Lock) in Rust. Written because I felt like i

High-performance, lock-free local and concurrent object memory pool with automated allocation, cleanup, and verification.

Opool: Fast lock-free concurrent and local object pool Opool is a high-performance Rust library that offers a concurrent and local object pool impleme

An efficient async condition variable for lock-free algorithms

async-event An efficient async condition variable for lock-free algorithms, a.k.a. "eventcount". Overview Eventcount-like primitives are useful to mak

Gauzilla: a 3D Gaussian Splatting renderer written in Rust for WebAssembly with lock-free multithreading
Gauzilla: a 3D Gaussian Splatting renderer written in Rust for WebAssembly with lock-free multithreading

Gauzilla A 3D Gaussian Splatting (3DGS) renderer written in Rust for platform-agnostic WebAssembly (WASM) with lock-free multithreading. Uses WebGL an

All-batteries included GStreamer WebRTC producer

webrtcsink All-batteries included GStreamer WebRTC producer, that tries its best to do The Right Thing™. Use case The webrtcbin element in GStreamer i

command line tools for coprolite research (paleontology and archaeology): estimate the producer's body mass based on coprolite diameter by the use of regression models
command line tools for coprolite research (paleontology and archaeology): estimate the producer's body mass based on coprolite diameter by the use of regression models

OVERVIEW OF COPROSIZE coprosize employs power, exponential and cubic regression models allowing to estimate the producer's body mass based on coprolit

Use free svg icons in your Dioxus projects easily with dioxus-free-icons.

dioxus-free-icons Use free svg icons in your Dioxus projects easily with dioxus-free-icons. More information about this crate can be found in the crat

Provide free GPT-3.5 API service by reverse engineering the login-free ChatGPT website.

ChatGPT Free API Provide free GPT-3.5 API service by reverse engineering the login-free ChatGPT website. Note: This service requires the IP to be able

Unlock vGPU functionality for consumer grade GPUs

Rust-based vgpu_unlock Unlock vGPU functionality for consumer-grade NVIDIA GPUs. This tool is to be used with the kernel patches from the main vgpu_un

Twitch data consumer and broadcaster

NeoTwitch Arch Network broadcaster Chat (message buffer) If the message buffer is full then shut down Channel point events If the message buffer is fu

Comments
  • RUSTSEC-2020-0159: Potential segfault in `localtime_r` invocations

    RUSTSEC-2020-0159: Potential segfault in `localtime_r` invocations

    Potential segfault in localtime_r invocations

    | Details | | | ------------------- | ---------------------------------------------- | | Package | chrono | | Version | 0.4.19 | | URL | https://github.com/chronotope/chrono/issues/499 | | Date | 2020-11-10 |

    Impact

    Unix-like operating systems may segfault due to dereferencing a dangling pointer in specific circumstances. This requires an environment variable to be set in a different thread than the affected functions. This may occur without the user's knowledge, notably in a third-party library.

    Workarounds

    No workarounds are known.

    References

    See advisory page for additional details.

    opened by github-actions[bot] 0
Releases(v0.1.0)
  • v0.1.0(Nov 13, 2021)

    What's Changed

    • Initial implementation by @broucz in https://github.com/broucz/lf-queue/pull/1

    New Contributors

    • @broucz made their first contribution in https://github.com/broucz/lf-queue/pull/1

    Full Changelog: https://github.com/broucz/lf-queue/commits/v0.1.0

    Source code(tar.gz)
    Source code(zip)
Garbage Collector(Hyaline- Safe Memory Reclaimation) for lock free data structures

Hyaline-SMR This crate provides garbage collection using hyaline algorithm for building concurrent data structures. When a thread removes an object fr

Abishek 2 Dec 21, 2022
A priority queue for Rust with efficient change function.

PriorityQueue This crate implements a Priority Queue with a function to change the priority of an object. Priority and items are stored in an IndexMap

null 139 Dec 30, 2022
disk backed wal queue

Repository Template  Queue like disk backed WAL Pronouced Quál - from the german wordrd for agony - because it is. Operations The basic concept is si

The Tremor Project 8 Jun 4, 2022
Message/job queue based on bonsaidb, similar to sqlxmq.

Bonsaimq Simple database message queue based on bonsaidb. The project is highly influenced by sqlxmq. Warning: This project is in early alpha and shou

Flix 6 Nov 8, 2022
A scalable message queue powered by a segmented, partitioned, replicated and immutable log.

A scalable message queue powered by a segmented, partitioned, replicated and immutable log. This is currently a work in progress. laminarmq is intende

Arindam Das 20 Dec 16, 2022
Extra iterator adaptors, iterator methods, free functions, and macros.

Itertools Extra iterator adaptors, functions and macros. Please read the API documentation here How to use with cargo: [dependencies] itertools = "0.1

null 1.9k Dec 30, 2022
Fast multi-producer, multi-consumer unbounded channel with async support.

Hyperbridge Fast multi-producer, multi-consumer unbounded channel with async support. Inspired by crossbeam unbounded channel. Examples Hyperbridge::c

Anton 1 Apr 20, 2022
A single-producer single-consumer Rust queue with smart batching

Batching Queue A library that implements smart batching between a producer and a consumer. In other words, a single-producer single-consumer queue tha

Roland Kuhn 2 Dec 21, 2021
A safe sync/async multi-producer, multi-consumer channel

Loole A safe async/sync multi-producer multi-consumer channel. Producers can send and consumers can receive messages asynchronously or synchronously:

Mahdi Shojaee 50 Oct 6, 2023
An asynchronous, multi-producer, single-consumer (MPSC) bounded channel that operates at tachyonic speeds

tachyonix An asynchronous, multi-producer, single-consumer (MPSC) bounded channel that operates at tachyonic speeds. This library is an offshoot of As

Asynchronics 66 Jan 29, 2023