Message/job queue based on bonsaidb, similar to sqlxmq.

Overview

Bonsaimq

Simple database message queue based on bonsaidb.

The project is highly influenced by sqlxmq.

Warning: This project is in early alpha and should not be used in production!

Usage

Import the project using:

# adjust the version to the latest version:
bonsaimq = "0.2.0"
# or
bonsaimq = { git = "https://github.com/FlixCoder/bonsaimq" }

Then you can use the message/job queue as follows:

  • You need job handlers, which are async functions that receive one argument of type CurrentJob and return nothing. CurrentJob allows interfacing the job to retrieve job input or complete the job etc.
  • The macro job_regristy! needs to be use to create a job registry, which maps message names/types to the job handlers and allows spawning new jobs.
  • A job runner needs to be created and run on a bonsai database. It runs in the background as long as the handle is in scope and executes the jobs according to the incoming messages. It acts on the job registry.

Example

Besides the following simple example, see the examples in the examples folder and take a look at the tests.

use bonsaidb::local::{
	config::{Builder, StorageConfiguration},
	AsyncDatabase,
};
use bonsaimq::{job_registry, CurrentJob, JobRegister, JobRunner, MessageQueueSchema};
use color_eyre::Result;

/// Example job function. It receives a handle to the current job, which gives
/// the ability to get the input payload, complete the job and more.
async fn greet(mut job: CurrentJob) -> color_eyre::Result<()> {
	// Load the JSON payload and make sure it is there.
	let name: String = job.payload_json().expect("input should be given")?;
	println!("Hello {name}!");
	job.complete().await?;
	Ok(())
}

// The JobRegistry provides a way to spawn new jobs and provides the interface
// for the JobRunner to find the functions to execute for the jobs.
job_registry!(JobRegistry, {
	Greetings: "greet" => greet,
});

#[tokio::main]
async fn main() -> Result<()> {
	// Open a local database for this example.
	let db_path = "simple-doc-example.bonsaidb";
	let db = AsyncDatabase::open::<MessageQueueSchema>(StorageConfiguration::new(db_path)).await?;

	// Start the job runner to execute jobs from the messages in the queue in the
	// database.
	let job_runner = JobRunner::new(db.clone()).run::<JobRegistry>();

	// Spawn new jobs via a message on the database queue.
	let job_id = JobRegistry::Greetings.builder().payload_json("cats")?.spawn(&db).await?;

	// Wait for job to finish execution, polling every 100 ms.
	bonsaimq::await_job(job_id, 100, &db).await?;

	// Clean up.
	job_runner.abort(); // Is done automatically on drop.
	tokio::fs::remove_dir_all(db_path).await?;
	Ok(())
}

Lints

This projects uses a bunch of clippy lints for higher code quality and style.

Install cargo-lints using cargo install --git https://github.com/FlixCoder/cargo-lints. The lints are defined in lints.toml and can be checked by running cargo lints clippy --all-targets --workspace.

Comments
  • Checkpoints

    Checkpoints

    Implement checkpoint and address one bug and TODOs.

    Closes https://github.com/FlixCoder/bonsaimq/issues/6 . Closes https://github.com/FlixCoder/bonsaimq/issues/1 .

    opened by FlixCoder 0
  • Allow returning errors from jobs

    Allow returning errors from jobs

    The Job function type should be altered to return a Result. The returned errors should be handled by a set error handler function somehow. This might include retrieving the results from the jobs via the JoinHandles, but possibly the handler can be forwarded to the job-execution-tasks so the JoinHandles can still be dropped. Preferably, the join handles would be kept.

    opened by FlixCoder 0
  • Implement functionality for job checkpoints

    Implement functionality for job checkpoints

    • [x] CurrentJob needs to allow setting a checkpoint by altering the message payload
    • [x] The runner needs to implement altering of the message payload
    • [x] A test for checkpoints is required
    opened by FlixCoder 0
  • Determine whether it works to have multiple JobRunners on the same database

    Determine whether it works to have multiple JobRunners on the same database

    It should mostly work, but currently it fetches all available jobs (https://github.com/FlixCoder/bonsaimq/issues/4). So, it wouldn't make a lot of sense to have multiple runners, because they don't split work effectively.

    Would be interesting to know, if it works in general though, especially after concurrency limit is implemented.

    opened by FlixCoder 0
  • JobBuilder needs a possibility for defaults

    JobBuilder needs a possibility for defaults

    The JobRegistry could get another optional field as macro input, which sets the job-default-configs. The generated builder function then sets the defaults before returning the builder.

    opened by FlixCoder 0
  • Jobs should be able to return output

    Jobs should be able to return output

    A new database collection for job output could be created. CurrentJob could expose an interface for writing to the job output. The output should only be read once the job has finished (so it is not in the database anymore). Output should be deleted from the DB once read. A function for waiting for a job and returning the output as well as polling the output for a job ID should be implemented.

    opened by FlixCoder 0
Releases(v0.2.0)
Owner
Flix
Backend Developer - Rust
Flix
A priority queue for Rust with efficient change function.

PriorityQueue This crate implements a Priority Queue with a function to change the priority of an object. Priority and items are stored in an IndexMap

null 139 Dec 30, 2022
wait-free spsc linked-list queue with individually reusable nodes

A wait-free single-producer single-consumer linked-list queue with individually reusable nodes.

glowcoil 22 Dec 26, 2022
disk backed wal queue

Repository Template  Queue like disk backed WAL Pronouced Quál - from the german wordrd for agony - because it is. Operations The basic concept is si

The Tremor Project 8 Jun 4, 2022
A lock-free multi-producer multi-consumer unbounded queue.

lf-queue A lock-free multi-producer multi-consumer unbounded queue. Examples [dependencies] lf-queue = "0.1" Single Producer - Single Consumer: use lf

Pierre Brouca 2 Sep 11, 2022
SegVec data structure for rust. Similar to Vec, but allocates memory in chunks of increasing size.

segvec This crate provides the SegVec data structure. It is similar to Vec, but allocates memory in chunks of increasing size, referred to as "segment

Jacob Ryan McCollum 30 Dec 16, 2022
A simple Vec-based Map inspired on JavaScript for rust.

A simple alternative to HashMap inspired on JavaScript's Map.

Squioole 2 Oct 3, 2021
Novusk-based OS running the Ardaku engine.

Arc Novusk-based OS running the Ardaku engine. Build Environment You will need the nightly toolchain and sources as well as cargo-binutils. rustup too

Ardaku Systems 12 Jan 1, 2023
A job queue built on sqlx and PostgreSQL.

sqlxmq A job queue built on sqlx and PostgreSQL. This library allows a CRUD application to run background jobs without complicating its deployment. Th

Diggory Blake 85 Jan 5, 2023
A better message queue built by rust

bettermq A better message queue built by rust I start this project to study Rust

Sun Junyi 13 Dec 16, 2022
A scalable message queue powered by a segmented, partitioned, replicated and immutable log.

A scalable message queue powered by a segmented, partitioned, replicated and immutable log. This is currently a work in progress. laminarmq is intende

Arindam Das 20 Dec 16, 2022
A lightweight distributed message queue. Like AWS SQS and RSMQ but on Postgres.

Postgres Message Queue (PGMQ) A lightweight distributed message queue. Like AWS SQS and RSMQ but on Postgres. Features Lightweight - Built with Rust a

Tembo 15 Jul 25, 2023
A mini-game demo of BonsaiDb + Gooey

"Minority Game" Demo Why build this? This repository originally was created for a presentation at the December Rust Gamedev Meetup. @ecton recorded th

Khonsu Labs 2 Jan 2, 2022
A (flash) message framework for actix-web. A port to Rust of Django's message framework.

actix-web-flash-messages Flash messages for actix-web Web applications sometimes need to show a one-time notification to the user - e.g. an error mess

Luca Palmieri 31 Dec 29, 2022
RocksDB-based queue with python bindings

RocksQ An inproc RocksDB-based queue with Python bindings. It is implemented in Rust. Features: max capacity limit in number of elements; size calcula

In-Sight 4 Oct 17, 2023
Fang - Background job processing library for Rust.

Fang Background job processing library for Rust. Currently, it uses Postgres to store state. But in the future, more backends will be supported.

Ayrat Badykov 421 Dec 28, 2022
🕵️Scrape multiple media providers on a cron job and dispatch webhooks when changes are detected.

Jiu is a multi-threaded media scraper capable of juggling thousands of endpoints from different providers with unique restrictions/requirements.

Xetera 47 Dec 6, 2022
A CI inspired approach for local job automation.

nauman A CI inspired approach for local job automation. Features • Installation • Usage • FAQ • Examples • Job Syntax About nauman is an easy-to-use j

Egor Dmitriev 8 Oct 24, 2022
An unsafe botched job that doesn't rely on types being 'static lifetime.

An unsafe botched job that doesn't rely on types being 'static lifetime. Will panic if provided a 0 field struct. I will fix this when I figure out how.

twhite 0 Feb 4, 2022
Simple, extensible multithreaded background job processing library for Rust.

Apalis Apalis is a simple, extensible multithreaded background job processing library for Rust. Features Simple and predictable job handling model. Jo

null 50 Jan 2, 2023
A lightweight job framework for Bevy.

bevy_jobs A lightweight job framework for Bevy. Getting started Defining a job: pub struct FetchRequestJob { pub url: String, } impl bevy_jobs::J

Corey Farwell 3 Aug 31, 2022