Fang - Background job processing library for Rust.

Related tags

Task scheduling fang
Overview

fang

Fang

Background job processing library for Rust.

Currently, it uses Postgres to store state. But in the future, more backends will be supported.

Installation

  1. Add this to your Cargo.toml
[dependencies]
fang = "0.3"
typetag = "0.1"
serde = { version = "1.0", features = ["derive"] }
  1. Create fang_tasks table in the Postgres database. The migration can be found in the migrations directory.

Usage

Defining a job

Every job should implement fang::Runnable trait which is used by fang to execute it.

    use fang::Error;
    use fang::Runnable;
    use serde::{Deserialize, Serialize};


    #[derive(Serialize, Deserialize)]
    struct Job {
        pub number: u16,
    }

    #[typetag::serde]
    impl Runnable for Job {
        fn run(&self) -> Result<(), Error> {
            println!("the number is {}", self.number);

            Ok(())
        }
    }

As you can see from the example above, the trait implementation has #[typetag::serde] attribute which is used to deserialize the job.

Enqueuing a job

To enqueue a job use Postgres::enqueue_task

use fang::Postgres;

...

Postgres::enqueue_task(&Job { number: 10 }).unwrap();

The example above creates a new postgres connection on every call. If you want to reuse the same postgres connection to enqueue several jobs use Postgres struct instance:

let postgres = Postgres::new();

for id in &unsynced_feed_ids {
    postgres.push_task(&SyncFeedJob { feed_id: *id }).unwrap();
}

Starting workers

Every worker runs in a separate thread. In case of panic, they are always restarted.

Use WorkerPool to start workers. WorkerPool::new accepts one parameter - the number of workers.

use fang::WorkerPool;

WorkerPool::new(10).start();

Configuration

To configure workers, instead of WorkerPool::new which uses default values, use WorkerPool.new_with_params. It accepts two parameters - the number of workers and WorkerParams struct.

Configuring the type of workers

You can start workers for a specific types of tasks. These workers will be executing only tasks of the specified type.

Add task_type method to the Runnable trait implementation:

String { "number".to_string() } } ">
...

#[typetag::serde]
impl Runnable for Job {
    fn run(&self) -> Result<(), Error> {
        println!("the number is {}", self.number);

        Ok(())
    }

    fn task_type(&self) -> String {
        "number".to_string()
    }
}

Set task_type to the WorkerParamas:

let mut worker_params = WorkerParams::new();
worker_params.set_task_type("number".to_string());

WorkerPool::new_with_params(10, worker_params).start();

Without setting task_type workers will be executing any type of task.

Configuring retention mode

By default, all successfully finished tasks are removed from the DB, failed tasks aren't.

There are three retention modes you can use:

pub enum RetentionMode {
    KeepAll,        \\ doesn't remove tasks
    RemoveAll,      \\ removes all tasks
    RemoveFinished, \\ default value
}

Set retention mode with set_retention_mode:

let mut worker_params = WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll);

WorkerPool::new_with_params(10, worker_params).start();

Configuring sleep values

You can use use SleepParams to confugure sleep values:

pub struct SleepParams {
    pub sleep_period: u64,     \\ default value is 5
    pub max_sleep_period: u64, \\ default value is 15
    pub min_sleep_period: u64, \\ default value is 5
    pub sleep_step: u64,       \\ default value is 5
}p

If there are no tasks in the DB, a worker sleeps for sleep_period and each time this value increases by sleep_step until it reaches max_sleep_period. min_sleep_period is the initial value for sleep_period. All values are in seconds.

Use set_sleep_params to set it:

let sleep_params = SleepParams {
    sleep_period: 2,
    max_sleep_period: 6,
    min_sleep_period: 2,
    sleep_step: 1,
};
let mut worker_params = WorkerParams::new();
worker_params.set_sleep_params(sleep_params);

WorkerPool::new_with_params(10, worker_params).start();

Potential/future features

  • Retries
  • Scheduled tasks
  • Extendable/new backends

Contributing

  1. Fork it!
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

Author

Ayrat Badykov (@ayrat555)

Comments
  • Simplifying api

    Simplifying api

    One thing I have found it confusing is that both producer and consumer uses AsyncRunnable which has methods such as cron and uniq. This makes it confusing on if the value is taken from producer or from consumer. Would be great if the apis were a bit more like faktory-rs.

    You create a producer that has that enqueue jobs with all the metadata.

    use faktory::{Producer, Job};
    let mut p = Producer::connect(None).unwrap();
    p.enqueue(Job::new("foobar", vec!["data1"])).unwrap();
    

    and to consume you register the handler only.

    use faktory::ConsumerBuilder;
    use std::io;
    let mut c = ConsumerBuilder::default();
    c.register("foobar", |job| -> io::Result<()> {
        println!("{:?}", job);
        Ok(())
    });
    let mut c = c.connect(None).unwrap();
    if let Err(e) = c.run(&["default"]) {
        println!("worker failed: {}", e);
    }
    

    One thing I like about faktory-rs is that it is few lines of code. To add other features such as schedule the following could be used.

    p.enqueue(Job::new("foobar", args)
        .on_queue("queue1")
        .schedule_at(Utc::now());
    

    Job that is enqueued should be a different struct than the job that was dequeued.

    opened by prabirshrestha 4
  • chore(cargo): update uuid requirement from 0.8 to 1.1

    chore(cargo): update uuid requirement from 0.8 to 1.1

    Updates the requirements on uuid to permit the latest version.

    Release notes

    Sourced from uuid's releases.

    1.1.0

    What's Changed

    New Contributors

    Full Changelog: https://github.com/uuid-rs/uuid/compare/1.0.0...1.1.0

    Commits
    • 99977bb Merge pull request #602 from uuid-rs/cargo/1.1.0
    • e7b58f4 prepare for 1.1.0 release
    • 7d037f5 Merge pull request #599 from dfaust/to_bytes_le
    • 7a2d4e4 Merge pull request #600 from Razican/doc_fix
    • d988df2 Fixed documentation link
    • b0ca5a0 add bytes_le roundtrip test
    • 9810dc8 add to_bytes_le method
    • e0f0837 Merge pull request #597 from uuid-rs/KodrAus-patch-1
    • 1a54a9f note that the Error display impl is public API
    • 9e0dc29 Merge pull request #596 from KodrAus/cargo/1.0.0
    • Additional commits viewable in compare view

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies rust 
    opened by dependabot[bot] 3
  • Support graceful shutdown of worker

    Support graceful shutdown of worker

    Did some work to support gracefully exiting individual worker threads, allowing in progress tasks to finish before exiting. Even though tasks should be idempotent, supporting graceful shutdown during normal scale-down events is useful. This PR allows currently running tasks to finish before allowing the process to exit.

    Also added thiserror and a FangError enum to allow Fang to return structured errors. Bumped version to 0.5.0 as I added the WorkerPool::shutdown public method, as well as updated WorkerPool::start to return a Result with a FangError.

    Here's an example usage:

    use fang::WorkerPool;
    use signal_hook::{consts::signal::*, iterator::Signals};
    
    fn main() {
        let mut job_pool = WorkerPool::new(10);
        job_pool.start().unwrap();
    
        let mut signals = Signals::new(&[SIGINT, SIGTERM])?;
        for sig in signals.forever() {
            match sig {
                SIGINT => {
                    println!("Received SIGINT");
                    job_pool.shutdown().unwrap();
                    break;
                },
                SIGTERM => {
                    println!("Received SIGTERM");
                    job_pool.shutdown().unwrap();
                    break;
                },
                _ => unreachable!(format!("Received unexpected signal: {:?}", sig)),
            }
        }
    }
    
    opened by jess-sol 3
  • chore(cargo): update diesel requirement from 1.4 to 2.0

    chore(cargo): update diesel requirement from 1.4 to 2.0

    Updates the requirements on diesel to permit the latest version.

    Release notes

    Sourced from diesel's releases.

    1.4.8

    This release fixes an incompatibility between features passed to diesel and diesel_migrations while using cargos new resolver feature (resolver = "2") which will become the new default with the upcoming Rust 2021 edition

    Changelog

    Sourced from diesel's changelog.

    [1.4.8] - 2021-09-20

    Fixed

    • Fixed a incompatibly between diesel and diesel_migrations when building both crates with cargos new resolver = "2" enabled. This change ensures compatibility with the upcomming 2021 rust edition.

    [1.4.7] - 2021-06-08

    Fixed

    • Updated libsqlite3-sys to allow version 0.22
    • Updated ipnetwork to allow version 0.18

    [1.4.6] - 2021-03-05

    Fixed

    • Fixed a use-after-free issue in the QueryableByName implementation of our Sqlite backend
    • Updated several dependencies

    [1.4.5] - 2020-06-09

    Fixed

    • Update several dependencies
    • Fixed an issue where transactions that would fail to commit would leave the connection in a broken non-committed non-rolled-back state.
    • Fix a bug that result in leaking sockets/file descriptors on failed connection attempts for postgresql
    • Fix an incompatibility with newer libmysqlclient versions
    • Remove some potential harmful usages of mem::uninitialized

    [1.4.4] - 2020-03-22

    Fixed

    • Update several dependencies
    • Fixed a bug with printing embeded migrations

    [1.4.3] - 2019-10-11

    Fixed

    • Updated several dependencies
    • Fixed an issue where the postgresql backend exploits implementation defined behaviour
    • Fixed issue where rustdoc failed to build the documentation
    • diesel_derives and diesel_migrations are updated to syn 1.0

    ... (truncated)

    Commits

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies rust 
    opened by dependabot[bot] 2
  • Implement async worker

    Implement async worker

    1. let's call it AsyncWorker. I think it will store AsyncQueue and some configuration parameters

    this struct can be created with TypedBuilder

    1. create looping that will be fetching tasks and executing them

    2. if there are no tasks, it should sleep

    opened by ayrat555 2
  • Question: Can tasks be scheduled in distributed systems with multiple workers on different machines.

    Question: Can tasks be scheduled in distributed systems with multiple workers on different machines.

    Looking at the documentation and blog-post as well as skimming through the code, I wasn't able to answer the question if fang allows scheduling tasks across distributed systems, with multiple workers on different machines.

    Is there some synchronization to make this work and to prevent duplicate work across multiple workers, as well as a mechanism for another worker to pick up a task if the worker that started it dies before reporting back the task as finished?

    opened by FSMaxB 1
  • Define

    Define "background processing" in README, documentation and website

    Define "background processing" in README, documentation and website.

    Consider linking to, and/or incorporating content from, https://fang.badykov.com/blog/async-processing/

    As someone new to systems and desktop/server application programming, I didn't properly understand the term "background processing" in this context (I only had a notion of what it might be). The readme and documentation currently assumes the reader already understands what "background processing" is and what it is for.

    And it turns out you've already got a good definition in your first blog post.

    In software engineering background processing is a common approach for solving several problems:

    • Carry out periodic tasks. For example, deliver notifications, update cached values.
    • Defer expensive work so your application stays responsive while performing calculations in the background

    It might be that perhaps if I don't understand the term I'm not the audience, however, now that I do understand what it's for, I can see myself using it. I think spelling it out would also be beneficial to developers whose first language isn't English as well.

    opened by tigregalis 1
  • chore(cargo): update postgres-types requirement from 0.X.X to 0.2.4

    chore(cargo): update postgres-types requirement from 0.X.X to 0.2.4

    Updates the requirements on postgres-types to permit the latest version.

    Commits
    • 5b95068 Release postgres-types v0.2.4
    • 34efb05 rustfmt
    • d6a6e9d Fix smol_str feature
    • 91b2187 Merge pull request #937 from NAlexPear/encode_format_types
    • 8158eed Move parameter count assertion above format encoding derivation
    • 569689d encode format with types
    • 8f955eb Merge pull request #935 from Some-Dood/deps/prefer-sub-crates
    • 44eac98 Fix: run cargo fmt
    • 5f3e7ae Fix: address Clippy warnings
    • a0f028a Fix(tokio-postgres): declare dependency on net feature
    • Additional commits viewable in compare view

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies rust 
    opened by dependabot[bot] 1
  • chore(cargo): update postgres-types requirement from 0.X.X to 0.2.3

    chore(cargo): update postgres-types requirement from 0.X.X to 0.2.3

    Updates the requirements on postgres-types to permit the latest version.

    Commits
    • 695067c Release postgres-types v0.2.3
    • 1d8aa0a Release postgres-derive v0.4.2
    • da78d4e Release postgres-protocol v0.6.4
    • 765395f Merge pull request #888 from mati865/uuid-1-impls
    • 1d9c93d Add conversions from Uuid 1.0
    • 024794a Merge pull request #886 from neondatabase/funbringer/fix-some-warnings
    • cf381ce Hide tokio_postgres::client::SocketConfig behind "runtime" feature
    • 812dfa7 Update ci.yml
    • 38da7fa Merge pull request #875 from halfmatthalfcat/ltree-support
    • 6fae655 Fix tests, replace match with matches!
    • Additional commits viewable in compare view

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    dependencies rust 
    opened by dependabot[bot] 1
  • fix Error struct export for `asynk`

    fix Error struct export for `asynk`

    There conflicts with blocking module's Error struct

    15 | use fang::Error as FangError;
       |     ^^^^^^-----^^^^^^^^^^^^^
       |     |     |
       |     |     help: a similar name exists in the module: `error`
       |     no `Error` in the root
    
    
    opened by ayrat555 1
  • Implement a worker pool for async processing

    Implement a worker pool for async processing

    A worker pool should

    • initialize a pool of db connection for workers
    • spawn a required number of workers (https://github.com/ayrat555/fang/issues/38)
    • restart them in case of panics
    opened by ayrat555 1
  • Add SurrealDB support

    Add SurrealDB support

    Would be great if Surrealdb support can be added. It can run in memory, single node with file as well as distributed, has events support and is written in Rust.

    SurrealDB in 100 Seconds gives a good overview.- https://www.youtube.com/watch?v=C7WFwgDRStM

    opened by prabirshrestha 0
  • Add support for SQLite

    Add support for SQLite

    I see that fang uses traits to implement AsyncQueueable. It would be great is SQLite is supported as it allows to build applications that just works without additional installation and configuration and they can always upgrade to better postgres support if needed.

    opened by prabirshrestha 4
Releases(0.10.0)
Owner
Ayrat Badykov
Ayrat Badykov
Ergo is a low-code IFTTT/Zapier style application, built with Rust and Svelte

Ergo is a low-code IFTTT/Zapier style application, built with Rust and Svelte. Tasks are customizable with Javascript and can contain state machines for more advanced task handling.

Daniel Imfeld 100 Dec 26, 2022
Task runner and process manager for Rust

Steward Task runner and process manager for Rust. If you're not happy managing your infrastructure with a pile of bash scripts, this crate might be he

Alex Fedoseev 24 Dec 26, 2022
delicate A lightweight and distributed task scheduling platform written in rust

A lightweight and distributed task scheduling platform written in rust.

BinCheng 529 Jan 9, 2023
Rust single-process scheduling. Ported from schedule for Python

Rust single-process scheduling. Ported from schedule for Python, in turn inspired by clockwork (Ruby), and "Rethinking Cron" by Adam Wiggins.

Ben Lovy 13 May 30, 2022
Simple, extensible multithreaded background job processing library for Rust.

Apalis Apalis is a simple, extensible multithreaded background job processing library for Rust. Features Simple and predictable job handling model. Jo

null 50 Jan 2, 2023
An MVP-worthy background job server for PostgreSQL, written in Rust

Pointguard An MVP-worthy background job server for PostgreSQL, written in Rust A simple background job server (database) on top of PostgreSQL, that ca

Gal Schlezinger 23 Nov 25, 2023
Background task processing for Rust applications with Tokio, Diesel, and PostgreSQL.

Async persistent background task processing for Rust applications with Tokio. Queue asynchronous tasks to be processed by workers. It's designed to be

Rafael Carício 22 Mar 27, 2023
Easy c̵̰͠r̵̛̠ö̴̪s̶̩̒s̵̭̀-t̶̲͝h̶̯̚r̵̺͐e̷̖̽ḁ̴̍d̶̖̔ ȓ̵͙ė̶͎ḟ̴͙e̸̖͛r̶̖͗ë̶̱́ṉ̵̒ĉ̷̥e̷͚̍ s̷̹͌h̷̲̉a̵̭͋r̷̫̊ḭ̵̊n̷̬͂g̵̦̃ f̶̻̊ơ̵̜ṟ̸̈́ R̵̞̋ù̵̺s̷̖̅ţ̸͗!̸̼͋

Rust S̵̓i̸̓n̵̉ I̴n̴f̶e̸r̵n̷a̴l mutability! Howdy, friendly Rust developer! Ever had a value get m̵̯̅ð̶͊v̴̮̾ê̴̼͘d away right under your nose just when

null 294 Dec 23, 2022
A simple, efficient Rust library for handling asynchronous job processing and task queuing.

job_queue Setup cargo add job_queue Usage Create a job use job_queue::{Error, Job, typetag, async_trait, serde}; #[derive(Debug, serde::Deserialize,

Georges KABBOUCHI 3 Nov 30, 2023
ddi is a wrapper for dd. It takes all the same arguments, and all it really does is call dd in the background

ddi A safer dd Introduction If you ever used dd, the GNU coreutil that lets you copy data from one file to another, then you may have encountered a ty

Tomás Ralph 80 Sep 8, 2022
A job queue built on sqlx and PostgreSQL.

sqlxmq A job queue built on sqlx and PostgreSQL. This library allows a CRUD application to run background jobs without complicating its deployment. Th

Diggory Blake 85 Jan 5, 2023
🕵️Scrape multiple media providers on a cron job and dispatch webhooks when changes are detected.

Jiu is a multi-threaded media scraper capable of juggling thousands of endpoints from different providers with unique restrictions/requirements.

Xetera 47 Dec 6, 2022
A CI inspired approach for local job automation.

nauman A CI inspired approach for local job automation. Features • Installation • Usage • FAQ • Examples • Job Syntax About nauman is an easy-to-use j

Egor Dmitriev 8 Oct 24, 2022
An unsafe botched job that doesn't rely on types being 'static lifetime.

An unsafe botched job that doesn't rely on types being 'static lifetime. Will panic if provided a 0 field struct. I will fix this when I figure out how.

twhite 0 Feb 4, 2022
Message/job queue based on bonsaidb, similar to sqlxmq.

Bonsaimq Simple database message queue based on bonsaidb. The project is highly influenced by sqlxmq. Warning: This project is in early alpha and shou

Flix 6 Nov 8, 2022
A lightweight job framework for Bevy.

bevy_jobs A lightweight job framework for Bevy. Getting started Defining a job: pub struct FetchRequestJob { pub url: String, } impl bevy_jobs::J

Corey Farwell 3 Aug 31, 2022
Wojak quits his job at McDonalds and becomes a Takeaway driver.

DeliveryGuy ?? Summary This repo is a crypto cross-exchange arbitrage implementation in Rust. work for Binance | OKX | Bybit | GateIO | Kucoin How to

null 20 Oct 29, 2022
job control from anywhere!

job-security - job control from anywhere! job-security is a tool that lets you put your running programs into background, then bring them to the foreg

Yuxuan Shui 15 Apr 23, 2023
Rust Imaging Library's Python binding: A performant and high-level image processing library for Python written in Rust

ril-py Rust Imaging Library for Python: Python bindings for ril, a performant and high-level image processing library written in Rust. What's this? Th

Cryptex 13 Dec 6, 2022
Rust-nlp is a library to use Natural Language Processing algorithm with RUST

nlp Rust-nlp Implemented algorithm Distance Levenshtein (Explanation) Jaro / Jaro-Winkler (Explanation) Phonetics Soundex (Explanation) Metaphone (Exp

Simon Paitrault 34 Dec 20, 2022