A job queue built on sqlx and PostgreSQL.

Related tags

Miscellaneous sqlxmq
Overview

sqlxmq

CI Status Documentation crates.io

A job queue built on sqlx and PostgreSQL.

This library allows a CRUD application to run background jobs without complicating its deployment. The only runtime dependency is PostgreSQL, so this is ideal for applications already using a PostgreSQL database.

Although using a SQL database as a job queue means compromising on latency of delivered jobs, there are several show-stopping issues present in ordinary job queues which are avoided altogether.

With most other job queues, in-flight jobs are state that is not covered by normal database backups. Even if jobs are backed up, there is no way to restore both a database and a job queue to a consistent point-in-time without manually resolving conflicts.

By storing jobs in the database, existing backup procedures will store a perfectly consistent state of both in-flight jobs and persistent data. Additionally, jobs can be spawned and completed as part of other transactions, making it easy to write correct application code.

Leveraging the power of PostgreSQL, this job queue offers several features not present in other job queues.

Features

  • Send/receive multiple jobs at once.

    This reduces the number of queries to the database.

  • Send jobs to be executed at a future date and time.

    Avoids the need for a separate scheduling system.

  • Reliable delivery of jobs.

  • Automatic retries with exponential backoff.

    Number of retries and initial backoff parameters are configurable.

  • Transactional sending of jobs.

    Avoids sending spurious jobs if a transaction is rolled back.

  • Transactional completion of jobs.

    If all side-effects of a job are updates to the database, this provides true exactly-once execution of jobs.

  • Transactional check-pointing of jobs.

    Long-running jobs can check-point their state to avoid having to restart from the beginning if there is a failure: the next retry can continue from the last check-point.

  • Opt-in strictly ordered job delivery.

    Jobs within the same channel will be processed strictly in-order if this option is enabled for the job.

  • Fair job delivery.

    A channel with a lot of jobs ready to run will not starve a channel with fewer jobs.

  • Opt-in two-phase commit.

    This is particularly useful on an ordered channel where a position can be "reserved" in the job order, but not committed until later.

  • JSON and/or binary payloads.

    Jobs can use whichever is most convenient.

  • Automatic keep-alive of jobs.

    Long-running jobs will automatically be "kept alive" to prevent them being retried whilst they're still ongoing.

  • Concurrency limits.

    Specify the minimum and maximum number of concurrent jobs each runner should handle.

  • Built-in job registry via an attribute macro.

    Jobs can be easily registered with a runner, and default configuration specified on a per-job basis.

  • Implicit channels.

    Channels are implicitly created and destroyed when jobs are sent and processed, so no setup is required.

  • Channel groups.

    Easily subscribe to multiple channels at once, thanks to the separation of channel name and channel arguments.

  • NOTIFY-based polling.

    This saves resources when few jobs are being processed.

Getting started

Database schema

This crate expects certain database tables and stored procedures to exist. You can copy the migration files from this crate into your own migrations folder.

All database items created by this crate are prefixed with mq, so as not to conflict with your own schema.

Defining jobs

The first step is to define a function to be run on the job queue.

use sqlxmq::{job, CurrentJob};

// Arguments to the `#[job]` attribute allow setting default job options.
#[job(channel_name = "foo")]
async fn example_job(
    mut current_job: CurrentJob,
) -> sqlx::Result<()> {
    // Decode a JSON payload
    let who: Option<String> = current_job.json()?;

    // Do some work
    println!("Hello, {}!", who.as_deref().unwrap_or("world"));

    // Mark the job as complete
    current_job.complete().await?;

    Ok(())
}
sqlx::Result<()> { // Decode a JSON payload let who: Option = current_job.json()?; // Do some work println!("Hello, {}!", who.as_deref().unwrap_or("world")); // Mark the job as complete current_job.complete().await?; Ok(()) } " aria-label="Copy" class="ClipboardButton btn js-clipboard-copy m-2 p-0 tooltipped-no-delay" data-copy-feedback="Copied!" data-tooltip-direction="w">

Listening for jobs

Next we need to create a job runner: this is what listens for new jobs and executes them.

use sqlxmq::JobRegistry;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // You'll need to provide a Postgres connection pool.
    let pool = connect_to_db().await?;

    // Construct a job registry from our single job.
    let mut registry = JobRegistry::new(&[example_job]);
    // Here is where you can configure the registry
    // registry.set_error_handler(...)

    let runner = registry
        // Create a job runner using the connection pool.
        .runner(&pool)
        // Here is where you can configure the job runner
        // Aim to keep 10-20 jobs running at a time.
        .set_concurrency(10, 20)
        // Start the job runner in the background.
        .run()
        .await?;

    // The job runner will continue listening and running
    // jobs until `runner` is dropped.
}

Spawning a job

The final step is to actually run a job.

example_job.builder()
    // This is where we can override job configuration
    .set_channel_name("bar")
    .set_json("John")
    .spawn(&pool)
    .await?;
Comments
  • Sqlxmq starts the same job multiple times concurrently

    Sqlxmq starts the same job multiple times concurrently

    Hi!

    I just noticed something weird happening on low-performance systems: When starting only a single job on a VM set to using 5% of the CPU only (4 cores), it executes the job twice concurrently (checked job UUID, it is the same). Automatic keep_alive is activated. Another keep_alive in the job itself does not help either. So it seems to start the same job multiple times before it even starts executing.

    This causes issues on a real system under heavy load, as things that should only run once actually execute multiple times concurrently :/

    Thank you in advance and for your work!

    opened by FlixCoder 20
  • Why manual job completion?

    Why manual job completion?

    It feels surprising and error prone. Naturally I'd expect completion on Ok and retry on Err. Is it due to the implementation limitations?

    Also in the current implementation shouldn't CurrentJob::complete methods rather consume self?

    opened by imbolc 17
  • What is the expected throughput?

    What is the expected throughput?

    Hi! Thank you for your great crate!

    I am testing sqlxmq_stress and I dont see any high load for cores.

    My results:

    num_jobs = 1000; set_concurrency(50, 1000)

    min: 8.296434179s
    max: 9.840498547s
    median: 8.851534467s
    95th percentile: 9.600073887s
    throughput: 99.19286908241159/s
    

    num_jobs = 10000; set_concurrency(50, 1000)

    Took more than 2 hours and still works on Ryzen 5900HX / SSD.

    I think may be it is hung? How to prevent such situations and what is the expected throughput on recent hardware?

    opened by makorne 15
  • Global objects in jobs

    Global objects in jobs

    How would I get an object like a Postgres Pool into a job that is spawned by the registry if my job wants to do SQL queries? (might be a different pool than the sqlxmq pool).

    Sorry for asking here, should I use StackOverflow / Discord / ... ?

    opened by StephanSchmidt 6
  • Duplicate job runners

    Duplicate job runners

    Hello @Diggsey,

    I am back with more details. It is perplexing but I think I see it. A coworker wrote the code and shared a branch. It might be important but note they are on a m1 mac pro. The idea is simple: Change the rust code and run 2 runners while it is recompiling. when rust say it is built they both run at pretty much the same time. On my coworker's system both runners call polling and get the same answer. They both process the same tasks.

    https://github.com/Diggsey/sqlxmq/blob/master/migrations/20210316025847_setup.up.sql#L125

    I was wondering if it was a nansosecond issue. where mq_poll is called at the same nano second because postgres only goes down to a microsecond. Both jobs call now() and they both return the same value for now() but the rows are not updated yet?

    Reading over the code what locks polling from having two polling tasks at the same time? I dont know postgres well enough to answer that. The table select and update should be instant or can both mq_poll calls update the same value?

    Thanks for reading. I dont think the code is much to note but I included the runner and job below. Like I said I cant reproduce the problem on my desktop with the same code running. It doesnt matter if its one job or more. My coworker can reproduce it every time. I have not reproduced it once. I am going to ask a 3rd coworker to try and reproduce the issue.

    Thanks again for reading and any ideas about this. Becker

    [edit] I have a pr on my fork with a much simpler fix that I think works well. Please let me know your thoughts.

    #[sqlxmq::job]
    async fn test_job(
        mut current_job: CurrentJob,
    ) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
        tracing::info!("start");
        tokio::time::sleep(std::time::Duration::from_secs(120)).await;
        tracing::info!("end");
        current_job.complete().await.unwrap();
        Ok(())
    }
    
    //the runner code
    let database_url = env::var("DATABASE_URL").unwrap();
        let pool = sqlx::postgres::PgPoolOptions::new()
            .connect_lazy(database_url.as_str())
            .unwrap();
    // used to insert the job . While it runs to insert the job we kill the process
    // comment out this line. while it is recompiling start two runners.
        // test_job.builder().spawn(&pool).await.unwrap();
    
        let runner = sqlxmq::JobRegistry::new(&[test_job])
            .runner(&pool)
            .run()
            .await
            .unwrap();
    
    opened by sbeckeriv 4
  • Check on job status (by uuid)

    Check on job status (by uuid)

    Hi.

    Is there any way I could check on a job's status by UUID? I would like to wait until it is finished or at least poll whether it is already done. Is this possible somehow or easy to integrate?

    Thank you in advance!

    opened by FlixCoder 4
  • Bug: Unique constraint violation on ordered messages

    Bug: Unique constraint violation on ordered messages

    There's a race condition when you send ordered messages at almost the same time that can cause more than one message to have the same after_message_id, causing the constraint

    CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id);
    

    to be violated.

    I have yet to create a minimal test case that reproduces the bug, but I intend to do so.

    opened by agraven 4
  • sqlx and sqlxmq

    sqlx and sqlxmq

    Not sure if this is an issue, and how sqlxmq chooses the runtime, but I get

      error: only one of ['runtime-actix-native-tls', 'runtime-async-std-na
      tive-tls', 'runtime-tokio-native-tls', 'runtime-actix-rustls', 'runti
      me-async-std-rustls', 'runtime-tokio-rustls'] can be enabled
        --> /home/stephan/.cargo/registry/src/github.com-1ecc6299db9ec823/s
      qlx-rt-0.5.5/src/lib.rs:21:1
         |
       | / compile_error!(
       | |     "only one of ['runtime-actix-native-tls', 'runtime-async-s
      td-native-tls', \
       | |      'runtime-tokio-native-tls', 'runtime-actix-rustls', 'runt
      ime-async-std-rustls', \
       | |      'runtime-tokio-rustls'] can be enabled"
       | | );
         | |__^
    

    with

      sqlx = { version = "0.5.5", features = [ "postgres", "json", "runtime-tokio-native-tls" ] }
      sqlxmq = "0.2.1"
    

    in Cargo.toml

    opened by StephanSchmidt 4
  • `clear_all` removes initial message

    `clear_all` removes initial message

    After SELECT mq_clear_all() a couple of tests fail with Some("Key ( after_message_id)=(00000000-0000-0000-0000-000000000000) is not present in table \"mq_msgs\".")

    opened by imbolc 3
  • Rate limiting the number of tasks executed in a given unit of time

    Rate limiting the number of tasks executed in a given unit of time

    This is great library and makes task queuing and async processing very easy. However I was trying to figure it there was a way to rate limit the throughput and but couldn't find a way to do it.

    Suppose there's a task that calls an API and that API is rate limited to 100 calls per 100 secs. If each task takes 1 sec to complete this is what will happen.

    100 tasks submitted to the queue at t = 0 secs and max concurrency 100 for runner. Total 1 sec to complete. 10 tasks submitted at t = 1 secs - all jobs will throttle and fail (depending on timeout)

    Now suppose there was some kind of rate limiting feature enabled the expected experience would be this -

    100 tasks submitted to the queue at t = 0 secs and max concurrency 100 for runner. total 1 sec to complete. 10 tasks submitted at t = 1 secs - all jobs will wait until the consumption rate goes below rate limit and they can scheduled

    • Is rate limiting in the current implementation? Maybe by manipulating the concurrency and retry backoff times?
    • Is it a planned feature?
    opened by twitu 3
  • Document the macro-generated static

    Document the macro-generated static

    #[warn(missing_docs)] complains about the generated static as it misses documentation. This should fix it.

    Feel free the change the actual documentation.

    opened by FlixCoder 3
  • Question: prevent duplicate jobs

    Question: prevent duplicate jobs

    Dearest Maintainer,

    I was wondering if there was a way to prevent duplicate jobs where the payload might be the same. In a hand made system we used a hash to generate an optional number. Each job provided its own value. If it was present we had a partial index on the hash and the channel name that prevented the inserting of the duplicate.

    I have tried using the ordering flag for this but i have found that one job dies it all dies.

    please let me know any thoughts. Becker

    opened by sbeckeriv 0
  • refactor: Replace dotenv with dotenvy

    refactor: Replace dotenv with dotenvy

    dotenv, one of the sqlxmq's dependencies, is unmaintained. The project has been forked, and the fork has been actively maintained since. The public API remains unchanged, so it should work as a drop-in replacement.

    opened by emgrav 0
  • Type-safe context

    Type-safe context

    Currently context registration / consumption isn't type-safe and can fail at runtime. Axum somehow figured out how to make it type-safe. Didn't have the chance yet to look into the details, but thought I leave this here as a reminder / hint:

    https://tokio.rs/blog/2022-08-whats-new-in-axum-0-6-0-rc1

    opened by johannescpk 0
  • Graceful shutdown

    Graceful shutdown

    After changes in jobs code previously added payloads may not be valid anymore. A solution could be to stop spawning new jobs and wait until all the remaining jobs are done before restarting the runner. The only way to do it I could find for now is to query db directly .. from mq_msgs where attempts > 0. Should we add a method for this so users won't rely on the implementation details?

    opened by imbolc 11
  • Tasks as structs

    Tasks as structs

    Have you considered representing tasks by structs of their context instead of functions? This way we could avoid explicit context deserialization and related bugs. E.g. this would be impossible:

    struct Foo;
    struct Bar;
    
    #[job]
    pub async fn foo(job: CurrentJob) -> Result<..> {
        let context: Foo = job.json()?;
    }
    foo.builder().set_json(&Bar).spawn(&db).await?;
    

    It would be something like:

    #[derive(Job)]
    struct MyJob;
    
    impl CurrentJob for MyJob {
       fn run(self) -> Result<..> {
       }
    }
    
    MyJob { ... }.spawn(&db).await?;
    
    opened by imbolc 12
Owner
Diggory Blake
Diggory Blake
Wojak quits his job at McDonalds and becomes a Takeaway driver.

DeliveryGuy ?? Summary This repo is a crypto cross-exchange arbitrage implementation in Rust. work for Binance | OKX | Bybit | GateIO | Kucoin How to

null 20 Oct 29, 2022
An unsafe botched job that doesn't rely on types being 'static lifetime.

An unsafe botched job that doesn't rely on types being 'static lifetime. Will panic if provided a 0 field struct. I will fix this when I figure out how.

twhite 0 Feb 4, 2022
Beanstalk is a simple, fast work queue.

beanstalkd Simple and fast general purpose work queue.

Beanstalkd 6.3k Dec 30, 2022
A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping

Swap Queue A lock-free thread-owned queue whereby tasks are taken by stealers in entirety via buffer swapping. This is meant to be used [thread_local]

Thomas Sieverding 20 Sep 9, 2022
🦜 A hassle-free, highly performant, host it yourself Discord music bot built with Serenity in Rust. Powered by youtube-dl and Genius.

?? A hassle-free, highly performant and fast evolving Discord music bot built with Serenity in Rust. Deployment Usage Just create a bot account, copy

Miguel Mano 82 Dec 14, 2022
🚀 Fast and 100% API compatible postcss replacer, built in Rust

?? Fast and 100% API compatible postcss replacer, built in Rust

迷渡 472 Jan 7, 2023
Announce blogs and send emails. Originally built for ConnectDome.

ConnectDome - Rust Notify A simple service to announce new blogs on your Slack/Discord + send emails to your list via your TES. Feature 1: Posts blogs

ConnectDome 3 May 8, 2022
A clean, custom-built modular kernel ready to boot on x86_64.

Lateral is a work-in-progress multitasking monolithic kernel + OS meant as a fun summer project. It has recently expanded into more than I could imagi

Carter Reeb 109 May 7, 2022
Software adapter for various Chunithm slider controllers with a built-in Brokenithm web controller

slidershim Software adapter for various Chunithm slider controllers with a built-in Brokenithm web controller. Has support for keyboard/gamepad output

Si Yuan 45 Dec 17, 2022
Risc-V assembly interpreter built with pure Rust

Risc-V Interpreter Interpreter for Risc-V assembly built with Rust Report bug · Request feature Table of contents Quick start Exemple Program Instruct

null 2 Aug 24, 2022
Blazingly fast spam classification API built using Rocket Web Framework.

Telegram Antispam API Blazingly fast spam classification API built using Rocket Web Framework. Notes The classifier works in aggressive mode, it can s

Akshay Rajput 13 May 5, 2023
Shows my Spotify status on a small screen. Powered by ESP-IDF & built with 🧡 in Rust! 🦀

ESP Display A small embedded project that shows what I'm listening to on Spotify by using my Spotify service. Example Notable Features ESP32-S3 board

Isaiah Gamble 3 Apr 10, 2024
A Rust proc-macro crate which derives functions to compile and parse back enums and structs to and from a bytecode representation

Bytecode A simple way to derive bytecode for you Enums and Structs. What is this This is a crate that provides a proc macro which will derive bytecode

null 4 Sep 3, 2022
A library and tool for automata and formal languages, inspired by JFLAP

Sugarcubes is a library and application for automata and formal languages. It is inspired by JFLAP, and is intended to eventually to be an alternative to JFLAP.

Henry Sloan 22 Nov 2, 2022
A stupid macro that compiles and executes Rust and spits the output directly into your Rust code

inline-rust This is a stupid macro inspired by inline-python that compiles and executes Rust and spits the output directly into your Rust code. There

William 19 Nov 29, 2022
This is a Discord bot written in Rust to translate to and from the Bottom Encoding Standard using bottom-rs and Serenity.

bottom-bot This is a Discord bot written in Rust to translate to and from the Bottom Encoding Standard using bottom-rs and Serenity. Ever had this pro

Bottom Software Foundation 11 Dec 10, 2022
An implementation of Code Generation and Factoring for Fast Evaluation of Low-order Spherical Harmonic Products and Squares

sh_product An implementation of Code Generation and Factoring for Fast Evaluation of Low-order Spherical Harmonic Products and Squares (paper by John

Simon Brown 7 Dec 2, 2022
lightweight and customizable rust s-expression (s-expr) parser and printer

s-expr Rust library for S-expression like parsing and printing parser keeps track of spans, and representation (e.g. number base) number and decimal d

Vincent Hanquez 5 Oct 26, 2022
Crates Registry is a tool for serving and publishing crates and serving rustup installation in offline networks.

Crates Registry Description Crates Registry is a tool for serving and publishing crates and serving rustup installation in offline networks. (like Ver

TalYRoni 5 Jul 6, 2023