Message/job queue based on bonsaidb, similar to sqlxmq.

Last update: May 29, 2022

Bonsaimq

Simple database message queue based on bonsaidb.

The project is highly influenced by sqlxmq.

Warning: This project is in early alpha and should not be used in production!

Usage

Import the project using:

# adjust the version to the latest version:
bonsaimq = "0.2.0"
# or
bonsaimq = { git = "https://github.com/FlixCoder/bonsaimq" }

Then you can use the message/job queue as follows:

  • You need job handlers, which are async functions that receive one argument of type CurrentJob and return nothing. CurrentJob allows interfacing the job to retrieve job input or complete the job etc.
  • The macro job_regristy! needs to be use to create a job registry, which maps message names/types to the job handlers and allows spawning new jobs.
  • A job runner needs to be created and run on a bonsai database. It runs in the background as long as the handle is in scope and executes the jobs according to the incoming messages. It acts on the job registry.

Example

Besides the following simple example, see the examples in the examples folder and take a look at the tests.

use bonsaidb::local::{
	config::{Builder, StorageConfiguration},
	AsyncDatabase,
};
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;

/// Example job function. It receives a handle to the current job, which gives
/// the ability to get the input payload, complete the job and more.
async fn greet(mut job: CurrentJob) -> color_eyre::Result<()> {
	// Load the JSON payload and make sure it is there.
	let name: String = job.payload_json().expect("input should be given")?;
	println!("Hello {name}!");
	job.complete().await?;
	Ok(())
}

// The JobRegistry provides a way to spawn new jobs and provides the interface
// for the JobRunner to find the functions to execute for the jobs.
job_registry!(JobRegistry, {
	Greetings: "greet" => greet,
});

#[tokio::main]
async fn main() -> Result<()> {
	// Open a local database for this example.
	let db_path = "simple-doc-example.bonsaidb";
	let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;

	// Start the job runner to execute jobs from the messages in the queue in the
	// database.
	let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();

	// Spawn new jobs via a message on the database queue.
	let job_id = JobRegistry::Greetings.builder().payload_json("cats")?.spawn(&db).await?;

	// Wait for job to finish execution, polling every 100 ms.
	bonsaimq::await_job(job_id, 100, &db).await?;

	// Clean up.
	job_runner.abort(); // Is done automatically on drop.
	tokio::fs::remove_dir_all(db_path).await?;
	Ok(())
}

Lints

This projects uses a bunch of clippy lints for higher code quality and style.

Install cargo-lints using cargo install --git https://github.com/FlixCoder/cargo-lints. The lints are defined in lints.toml and can be checked by running cargo lints clippy --all-targets --workspace.

GitHub

https://github.com/FlixCoder/bonsaimq
Comments
  • 1. Checkpoints

    Implement checkpoint and address one bug and TODOs.

    Closes https://github.com/FlixCoder/bonsaimq/issues/6 . Closes https://github.com/FlixCoder/bonsaimq/issues/1 .

    Reviewed by FlixCoder at 2022-06-12 13:33
  • 2. Allow returning errors from jobs

    The Job function type should be altered to return a Result. The returned errors should be handled by a set error handler function somehow. This might include retrieving the results from the jobs via the JoinHandles, but possibly the handler can be forwarded to the job-execution-tasks so the JoinHandles can still be dropped. Preferably, the join handles would be kept.

    Reviewed by FlixCoder at 2022-05-29 12:24
  • 3. Implement functionality for job checkpoints

    • [x] CurrentJob needs to allow setting a checkpoint by altering the message payload
    • [x] The runner needs to implement altering of the message payload
    • [x] A test for checkpoints is required
    Reviewed by FlixCoder at 2022-05-29 12:18
  • 4. Determine whether it works to have multiple JobRunners on the same database

    It should mostly work, but currently it fetches all available jobs (https://github.com/FlixCoder/bonsaimq/issues/4). So, it wouldn't make a lot of sense to have multiple runners, because they don't split work effectively.

    Would be interesting to know, if it works in general though, especially after concurrency limit is implemented.

    Reviewed by FlixCoder at 2022-05-29 12:27
  • 5. JobBuilder needs a possibility for defaults

    The JobRegistry could get another optional field as macro input, which sets the job-default-configs. The generated builder function then sets the defaults before returning the builder.

    Reviewed by FlixCoder at 2022-05-29 12:21
  • 6. Jobs should be able to return output

    A new database collection for job output could be created. CurrentJob could expose an interface for writing to the job output. The output should only be read once the job has finished (so it is not in the database anymore). Output should be deleted from the DB once read. A function for waiting for a job and returning the output as well as polling the output for a job ID should be implemented.

    Reviewed by FlixCoder at 2022-05-29 12:20
wait-free spsc linked-list queue with individually reusable nodes

A wait-free single-producer single-consumer linked-list queue with individually reusable nodes.

Jan 8, 2022
disk backed wal queue

Repository Template  Queue like disk backed WAL Pronouced Quál - from the german wordrd for agony - because it is. Operations The basic concept is si

Jun 4, 2022
A lock-free multi-producer multi-consumer unbounded queue.

lf-queue A lock-free multi-producer multi-consumer unbounded queue. Examples [dependencies] lf-queue = "0.1" Single Producer - Single Consumer: use lf

Nov 13, 2021
SegVec data structure for rust. Similar to Vec, but allocates memory in chunks of increasing size.

segvec This crate provides the SegVec data structure. It is similar to Vec, but allocates memory in chunks of increasing size, referred to as "segment

May 11, 2022
A simple Vec-based Map inspired on JavaScript for rust.

A simple alternative to HashMap inspired on JavaScript's Map.

Oct 3, 2021
Novusk-based OS running the Ardaku engine.

Arc Novusk-based OS running the Ardaku engine. Build Environment You will need the nightly toolchain and sources as well as cargo-binutils. rustup too

Jun 21, 2022
A job queue built on sqlx and PostgreSQL.

sqlxmq A job queue built on sqlx and PostgreSQL. This library allows a CRUD application to run background jobs without complicating its deployment. Th

Jun 13, 2022
A better message queue built by rust

bettermq A better message queue built by rust I start this project to study Rust

Mar 29, 2022
A (flash) message framework for actix-web. A port to Rust of Django's message framework.

actix-web-flash-messages Flash messages for actix-web Web applications sometimes need to show a one-time notification to the user - e.g. an error mess

Jun 11, 2022
A mini-game demo of BonsaiDb + Gooey

"Minority Game" Demo Why build this? This repository originally was created for a presentation at the December Rust Gamedev Meetup. @ecton recorded th

Jan 2, 2022
Fang - Background job processing library for Rust.
Fang - Background job processing library for Rust.

Fang Background job processing library for Rust. Currently, it uses Postgres to store state. But in the future, more backends will be supported.

Jun 15, 2022
🕵️Scrape multiple media providers on a cron job and dispatch webhooks when changes are detected.
🕵️Scrape multiple media providers on a cron job and dispatch webhooks when changes are detected.

Jiu is a multi-threaded media scraper capable of juggling thousands of endpoints from different providers with unique restrictions/requirements.

Jun 16, 2022
A CI inspired approach for local job automation.

nauman A CI inspired approach for local job automation. Features • Installation • Usage • FAQ • Examples • Job Syntax About nauman is an easy-to-use j

May 8, 2022
An unsafe botched job that doesn't rely on types being 'static lifetime.

An unsafe botched job that doesn't rely on types being 'static lifetime. Will panic if provided a 0 field struct. I will fix this when I figure out how.

Feb 4, 2022
A simple message based networking library for the bevy framework

Spicy Networking for Bevy bevy_spicy_networking is a solution to the "How do I connect multiple clients to a single server" problem in your bevy games

Jun 2, 2022
A priority queue for Rust with efficient change function.

PriorityQueue This crate implements a Priority Queue with a function to change the priority of an object. Priority and items are stored in an IndexMap

Jun 11, 2022
wait-free spsc linked-list queue with individually reusable nodes

A wait-free single-producer single-consumer linked-list queue with individually reusable nodes.

Jan 8, 2022
Beanstalk is a simple, fast work queue.

beanstalkd Simple and fast general purpose work queue.

Jun 17, 2022
disk backed wal queue

Repository Template  Queue like disk backed WAL Pronouced Quál - from the german wordrd for agony - because it is. Operations The basic concept is si

Jun 4, 2022
A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping

Swap Queue A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping. This is meant to be used [thread_local]

May 24, 2022