Simple, extensible multithreaded background job processing library for Rust.

Related tags

Command-line apalis
Overview

Apalis Build Status

Apalis is a simple, extensible multithreaded background job processing library for Rust.

Features

  • Simple and predictable job handling model.
  • Jobs handlers with a macro free API.
  • Take full advantage of the tower ecosystem of middleware, services, and utilities.
  • Workers take full of the actor model.
  • Fully Tokio compatible.
  • Optional Web interface to help you manage your jobs.

Apalis job processing is powered by tower::Service which means you have access to the tower and tower-http middleware.

Apalis has support for

  • Redis
  • SQlite
  • PostgresSQL
  • MySQL
  • Bring Your Own Job Source eg Cron or Twitter streams

Getting Started

To get started, just add to Cargo.toml

[dependencies]
apalis = { version = "0.3.1", features = ["redis"] }

Usage

use apalis::{redis::RedisStorage, JobError, JobRequest, JobResult, WorkerBuilder, Storage, Monitor, JobContext};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
struct Email {
    to: String,
}

async fn email_service(job: Email, _ctx: JobContext) -> Result<JobResult, JobError> {
    Ok(JobResult::Success)
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "debug");
    env_logger::init();
    let redis = std::env::var("REDIS_URL").expect("Missing env variable REDIS_URL");
    let storage = RedisStorage::new(redis).await.unwrap();
    Monitor::new()
        .register_with_count(2, move || {
            WorkerBuilder::new(storage.clone())
                .build_fn(email_service)
        })
        .run()
        .await
}

Then

//This can be in another part of the program or another application
async fn produce_route_jobs(storage: &RedisStorage<Email>) {
    let mut storage = storage.clone();
    storage
        .push(Email {
            to: "[email protected]".to_string(),
        })
        .await
        .unwrap();
}

Web UI

If you are running Apalis Board, you can easily manage your jobs. See a working Rest API here

UI

Feature flags

  • tracing (enabled by default) β€” Support Tracing πŸ‘€
  • redis β€” Include redis storage
  • postgres β€” Include Postgres storage
  • sqlite β€” Include SQlite storage
  • mysql β€” Include MySql storage
  • sentry β€” Support for Sentry exception and performance monitoring
  • prometheus β€” Support Prometheus metrics
  • retry β€” Support direct retrying jobs
  • timeout β€” Support timeouts on jobs
  • limit β€” πŸ’ͺ Limit the amount of jobs
  • filter β€” Support filtering jobs based on a predicate
  • extensions β€” Add a global extensions to jobs

Storage Comparison

Since we provide a few storage solutions, here is a table comparing them:

Feature Redis Sqlite Postgres Sled Mysql Mongo
Scheduled jobs βœ“ βœ“ βœ“ x βœ“ x
Retryable jobs βœ“ βœ“ βœ“ x βœ“ x
Persistence βœ“ βœ“ βœ“ x βœ“ x
Rerun Dead jobs βœ“ βœ“ βœ“ x * x

Thanks to

  • tower - Tower is a library of modular and reusable components for building robust networking clients and servers.
  • redis-rs - Redis library for rust
  • sqlx - The Rust SQL Toolkit

Roadmap

v 0.4

  • Improve monitoring
  • Improve Apalis Board
  • Add job progress
  • Add more sources

v 0.3

  • Standardize API (Storage, Worker, Data, Middleware, Context )
  • Introduce SQL
  • Implement layers for Sentry and Tracing.
  • Improve documentation
  • Organized modules and features.
  • Basic Web API Interface
  • Sql Examples
  • Sqlx migrations

v 0.2

  • Redis Example
  • Actix Web Example

Contributing

Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.

Authors

See also the list of contributors who participated in this project.

It was formerly actix-redis-jobs and if you want to use the crate name please contact me.

License

This project is licensed under the MIT License - see the LICENSE.md file for details

Acknowledgments

  • Inspiration: The redis part of this project is heavily inspired by Curlyq which is written in GoLang
Comments
  • Integration tests for each backend

    Integration tests for each backend

    Currently integrations tests are running on CI only for MySQL backend.

    Let's add integration tests for the following backends

    • [x] Mysql by @autotaker (added by #6)
    • [x] Redis by @geofmureithi (added by #27)
    • [x] Sqlite by @geofmureithi (added by #13)
    • [x] Postgres by @autotaker (added by #11)

    Let's strive to have different pull requests for each storage. This will help in collaboration. Also before starting on any backend, please add a comment and assigned.

    enhancement 
    opened by autotaker 9
  • Autopublish crates to crates.io

    Autopublish crates to crates.io

    Right now I do it manually. It would be nice if it was automatic. Steps needed:

    1. PR to master
    2. Running CI to master, must pass.
    3. PR merge to master
    4. Create new release
    5. Publish crates to crates.io
    opened by geofmureithi 8
  • Add Integration Tests for Postgres backend

    Add Integration Tests for Postgres backend

    Part of #9

    This PR adds Test Suite with Postgres action and implements 5 tests for Postgres backend.

    Tests

    • test_consume_last_pushed_job
    • test_acknowledge_job
    • test_kill_job
    • test_heartbeat_renqueueorphaned_pulse_last_seen_6min
    • test_heartbeat_renqueueorphaned_pulse_last_seen_4min

    Other changes

    Refactoring

    I extracted keep_alive_at(self, worker_id, last_seen) from keep_alive(self, worker_id) to improve testability.

    Changed SQL statement to upsert worker in keep_alive:

    When keep_alive_at(self, worker_id, last_seen) is called for worker_id already registered, the last_seen column of the worker row is updated with NOW() (system clock on the Postgres server). Although this behavior was trivial things, the value of last_seen can be different from the current time due to the refactoring.

    Therefore, I changed the updated value to EXCLUDED.last_seen, which is the same value as it is inserted if the worker_id is registered at first time.

     ON CONFLICT (id) DO
    -  UPDATE SET last_seen = NOW()
    +  UPDATE SET last_seen = EXCLUDED.last_seen
    
    opened by autotaker 6
  • Rerunning dead jobs in Mysql is not implemented

    Rerunning dead jobs in Mysql is not implemented

    The storage api for mysql doesnt support this feature.

        // Worker not seen in 5 minutes yet has running jobs
        StorageWorkerPulse::RenqueueOrpharned { count: _ } => {
        //...
        Ok(true)
    

    The goal is to rewrite the query at mysql.rs#L202 to be Mysql compatible and uncomment it.

    bug good first issue 
    opened by geofmureithi 5
  • how to set many Workers?

    how to set many Workers?

    I want to build multiple async function works. How can I do this? The following cannot be achieved

    Monitor::new()
        .register_with_count(6, move |_| {
            WorkerBuilder::new(worker_storage.clone())
                .layer(SentryJobLayer)
                .layer(TraceLayer::new())
                .build_fn(queue::send_email)
                .build_fn(queue::send_message)
                .build_fn(queue::and_more)
        })
    
    documentation 
    opened by rustdreamer 4
  • Extension does not work?

    Extension does not work?

    I cant find any apalis examples with using extensions for shared data.

    Only a wrong example in docs:

    /// Extension data for jobs.
    ///
    /// forked from [axum::Extensions]
    /// # In Context
    ///
    /// This is commonly used to share state across jobs.
    ///
    /// ```rust,ignore
    /// use apalis::{
    ///     Extension,
    ///     WorkerBuilder,
    ///     JobContext
    /// };
    /// use std::sync::Arc;
    ///
    /// // Some shared state used throughout our application
    /// struct State {
    ///     // ...
    /// }
    ///
    /// async fn email_service(email: Email, ctx: JobContext) {
    ///     let state: &Arc<State> = ctx.data_opt().unwrap();
    /// }
    ///
    /// let state = Arc::new(State { /* ... */ });
    ///
    /// let worker = WorkerBuilder::new(storage)
    ///     .layer(Extension(state))
    ///     .build_fn(email_service);
    /// ```
    
    

    Where even

    use apalis::{
        Extension,
         WorkerBuilder,
         JobContext
     };
    

    is throwing error apalis does not have Extension.

    Please make a working example with Shared state.

    opened by makorne 3
  • Update to sqlx 0.6

    Update to sqlx 0.6

    Currently apalis-sql is using version 0.5 of sqlx this make using it a bit harder than necessary to use in an application using sqlx 0.6.

    I'm unable to use the PostgresStorage::new method do to the differing sqlx versions.

    Workaround

    Pass a connection string to PostgresStorage::connect.

    opened by xanderio 2
  • Add more tests for MySQL backend

    Add more tests for MySQL backend

    This PR fixes the issue sharing the connection pool on MySQL backend, and adds more tests for MySQL backend.

    This PR also contains a bug fix that lock_at is not updated when a job is locked. (See stream_jobs method)

    opened by autotaker 2
  • add cd.yaml

    add cd.yaml

    #19 This PR adds a workflow (Continuous delivery) to publish crates to crates.io.

    Spec

    Trigger

    Continuous delivery workflow is triggered by pushing tags to the remote repository. Tag must be of the form vX.Y.Z or vX.Y.Z-SUFFIX. A tag with suffix means pre-release version and it enables --dry-run mode.

    Secrets

    Please set CARGO_REGISTRY_TOKEN for the authentication token for crates.io.

    Jobs

    • test
      • This job runs several tests defined in ci.yaml.
    • publish
      • This job publishes crates (apalis-core, apalis-cron, apalis-redis, apalis-sql, and apalis) in an appropriate order.
      • If there are some inconsistencies between the tag version and a package version (written in Cargo.toml), the job fails before pushing the crates.

    Test

    I have created v0.3.4-rc10 tag and confirmed that the workflow succeeds. https://github.com/autotaker/apalis/actions/runs/3355470587

    Other changes

    • Trigger of ci.yaml is changed
      • CI for topic branch runs only after creating a pull request. (push trigger is limited to master/develop)
      • Add workflow_call trigger to call CI workflow from CD workflow.
    opened by autotaker 1
  • s/Kill/Killed/ in storage.kill

    s/Kill/Killed/ in storage.kill

    This PR fixes a bug with killing a job.

    Bug details

    In MySQL/Postgres/Sqlite3 backends, Storage::kill(self, worker_id, job_id) sets the status to 'Kill', which is incompatible to the definition of JobStatus::Killed. https://github.com/geofmureithi/apalis/blob/3cfc8df9a9bcb288ea24c3892e2446c45247b947/packages/apalis-core/src/request.rs#L12-L26

    As a result, deserialization of job context will panic while converting Kill to JobState. https://github.com/geofmureithi/apalis/blob/3cfc8df9a9bcb288ea24c3892e2446c45247b947/packages/apalis-sql/src/from_row.rs#L53

    bug 
    opened by autotaker 1
  • s/Job/jobs/ in mysql.rs

    s/Job/jobs/ in mysql.rs

    I found a bug with table name in mysql.rs.

    In default configuration (lower_case_table_names=0) of MySQL on linux, table name are case-sensitive. The table name managing jobs should be jobs, but in some SQL it is referred as Jobs.

    As a result, this library does not work with such MySQL servers.

    With this PR, I modified every occurrence of Jobs in SQL statements with jobs.

    opened by autotaker 1
  • Example without code

    Example without code

    Hi! I found your strange example without any code. And nobody asks about it.

    May be I miss something and now everybody should use telepathy to use examples? :)

    twitter-screenshot-bot

    opened by makorne 0
  • Usage of sqlx migration with postgres

    Usage of sqlx migration with postgres

    It is currently not possible to use apalis on the same postgres database as the main application.

    This is do to the sqlx Migrator currently not support multiple applications using the same database. As the postgres migrations are already using a separate schema, it would be quite nice to be able to setup apalis in the same database as the main application it self.

    I currently see two options:

    1. Users have to copy our migrations into there sqlx migration folder.
    2. We don't use the sqlx migrator for postgres until sqlx supports mutliple applications in the same database.

    This isn't a problem for sqlite or mysql as either of these support postgresql like schemas.

    See: https://github.com/launchbadge/sqlx/issues/1698

    help wanted 
    opened by xanderio 0
  • Add surrealdb storage

    Add surrealdb storage

    Would be great if surrealdb support can be added. It can run in memory, single node with file as well as distributed, has events support and is written in Rust.

    SurrealDB in 100 Seconds gives a good overview.- https://www.youtube.com/watch?v=C7WFwgDRStM

    good first issue 
    opened by prabirshrestha 1
  • Axum application stuck after SIGINT and needs to be killed with SIGTERM

    Axum application stuck after SIGINT and needs to be killed with SIGTERM

    When trying to terminate the application with Ctrl + C (or by sending SIGINT directly) the monitor shuts down, but the application is stuck until it receives a SIGTERM signal.

    $ cargo run --package axum-example
        Finished dev [unoptimized + debuginfo] target(s) in 0.18s
         Running `target/debug/axum-example`
    2022-10-07T13:33:36.526309Z DEBUG axum_example: listening on 127.0.0.1:3000
    2022-10-07T13:33:36.526681Z DEBUG apalis_core::worker::monitor: Listening shut down command (ctrl + c)
    2022-10-07T13:33:46.759826Z DEBUG apalis_core::worker::monitor: Workers shutdown complete
    ^C^C
    

    I tested this with the axum example and my own app, both have the same result.

    The actix-web example does properly exit, so maybe axum needs some special handling for signals.

    $ cargo run --package actix-web-example
        Finished dev [unoptimized + debuginfo] target(s) in 0.19s
         Running `target/debug/actix-web-example`
    [2022-10-07T13:40:11Z INFO  actix_server::builder] Starting 4 workers
    [2022-10-07T13:40:11Z INFO  actix_server::server] Actix runtime found; starting in Actix runtime
    [2022-10-07T13:40:11Z DEBUG apalis_core::worker::monitor] Listening shut down command (ctrl + c)
    [2022-10-07T13:40:13Z INFO  actix_server::server] SIGINT received; starting forced shutdown
    [2022-10-07T13:40:13Z INFO  actix_server::worker] Shutting down idle worker
    [2022-10-07T13:40:13Z DEBUG actix_server::accept] Paused accepting connections on 127.0.0.1:8000
    [2022-10-07T13:40:13Z INFO  actix_server::worker] Shutting down idle worker
    [2022-10-07T13:40:13Z INFO  actix_server::accept] Accept thread stopped
    [2022-10-07T13:40:13Z INFO  actix_server::worker] Shutting down idle worker
    [2022-10-07T13:40:13Z INFO  actix_server::worker] Shutting down idle worker
    [2022-10-07T13:40:13Z DEBUG apalis_core::worker::monitor] Workers shutdown complete
    
    bug documentation 
    opened by valeth 2
  • Add documentation and examples of how workers process jobs

    Add documentation and examples of how workers process jobs

    Currently it only shows one worker send_email. Would be good to showcase more types of jobs being consumed.

        Monitor::new()
            .register_with_count(5, move |_| {
                WorkerBuilder::new(sqlite.clone())
                    .layer(TraceLayer::new())
                    .build_fn(send_email)
            })
            .run()
            .await
    
    documentation 
    opened by prabirshrestha 3
Releases(v0.3.5)
  • v0.3.5(Dec 23, 2022)

    What's Changed

    • s/Kill/Killed/ in storage.kill by @autotaker in https://github.com/geofmureithi/apalis/pull/10
    • Add Integration Tests for Postgres backend by @autotaker in https://github.com/geofmureithi/apalis/pull/11
    • Add more tests for MySQL backend by @autotaker in https://github.com/geofmureithi/apalis/pull/15
    • add cd.yaml by @autotaker in https://github.com/geofmureithi/apalis/pull/21
    • Introducing Sqlite integration tests by @geofmureithi in https://github.com/geofmureithi/apalis/pull/13
    • Introducing Redis tests by @geofmureithi in https://github.com/geofmureithi/apalis/pull/27
    • [Draft] v0.3.5 by @geofmureithi in https://github.com/geofmureithi/apalis/pull/24
    • Refactor alot of panics by @geofmureithi in https://github.com/geofmureithi/apalis/pull/31
    • Minor fixes by @geofmureithi in https://github.com/geofmureithi/apalis/pull/32

    Full Changelog: https://github.com/geofmureithi/apalis/compare/v0.3.4...v0.3.5

    Source code(tar.gz)
    Source code(zip)
  • v0.3.4(Sep 20, 2022)

    What's Changed

    • s/Job/jobs/ in mysql.rs by @autotaker in https://github.com/geofmureithi/apalis/pull/4
    • Mysql rerun dead jobs by @autotaker in https://github.com/geofmureithi/apalis/pull/6
    • Bump up to 0.3.4 by @geofmureithi in https://github.com/geofmureithi/apalis/pull/7

    New Contributors

    • @autotaker made their first contribution in https://github.com/geofmureithi/apalis/pull/4

    Full Changelog: https://github.com/geofmureithi/apalis/compare/0.3.3...v0.3.4

    Source code(tar.gz)
    Source code(zip)
  • 0.3.3(Aug 13, 2022)

  • v0.3.1(Jul 9, 2022)

    • Introduced Postgres helpers for adding jobs in sql.
    • Improved documentation and Api

    What's Changed

    • Chore/clippy n improvements by @geofmureithi in https://github.com/geofmureithi/apalis/pull/2

    Full Changelog: https://github.com/geofmureithi/apalis/compare/v0.3.0...v0.3.1

    Source code(tar.gz)
    Source code(zip)
  • v0.3.0(Jun 5, 2022)

Owner
For my old account see @geofmureithi-zz
null
ddi is a wrapper for dd. It takes all the same arguments, and all it really does is call dd in the background

ddi A safer dd Introduction If you ever used dd, the GNU coreutil that lets you copy data from one file to another, then you may have encountered a ty

TomΓ‘s Ralph 80 Sep 8, 2022
Multithreaded cp in rust.

MultiCP Just a multithreaded version of cp which uses the fcopyfile, copy_file_range, CopyFileEx syscalls on macOS, Linux and Windows respectively (ab

Steve Gremory 3 Nov 20, 2022
A CI inspired approach for local job automation.

nauman A CI inspired approach for local job automation. Features β€’ Installation β€’ Usage β€’ FAQ β€’ Examples β€’ Job Syntax About nauman is an easy-to-use j

Egor Dmitriev 8 Oct 24, 2022
job control from anywhere!

job-security - job control from anywhere! job-security is a tool that lets you put your running programs into background, then bring them to the foreg

Yuxuan Shui 15 Apr 23, 2023
This tool was developed as part of a course on forensic analysis and cybersecurity. It is intended to be used as a training resource to help students understand the structure and content of job files in Windows environments.

Job File Parser Job File Parser is a Rust-based tool designed for parsing both legacy binary job files and modern XML job files used by the Windows Ta

Mehrnoush 3 Aug 12, 2024
Rust Imaging Library's Python binding: A performant and high-level image processing library for Python written in Rust

ril-py Rust Imaging Library for Python: Python bindings for ril, a performant and high-level image processing library written in Rust. What's this? Th

Cryptex 13 Dec 6, 2022
A simple, lightweight and extensible command line argument parser for rust codebases

A simple, lightweight and extensible command line argument parser for rust codebases. This crate aims to provide you with an easy-to-use and extensibl

Victor Ndaba 20 Nov 12, 2022
An extensible plotting library for CLI applications.

termplot An extensible plotting library for CLI applications. Quick Start Documentation Examples Plotting a function Historigram Composing multiple pl

Xavier Hamel 3 Jan 1, 2023
skyWM is an extensible tiling window manager written in Rust. skyWM has a clear and distinct focus adhering to the KISS and Unix philosophy.

Please note: skyWM is currently in heavy development and is not usable as of yet. Documentation and versions will change quickly. skyWM skyWM is an ex

MrBeeBenson 74 Dec 28, 2022
Configurable, extensible, interactive line reader

linefeed linefeed is a configurable, concurrent, extensible, interactive input reader for Unix terminals and Windows console. API Documentation linefe

Murarth 176 Jan 3, 2023
Lightweight alternative Discord client with a smaller footprint and some fancy extensible features.

Dorion Dorion is an alternative Discord client aimed and lower-spec or storage-sensitive PCs that supports themes, plugins, and more! Table of Content

SpikeHD 20 Jan 2, 2023
Maccha is an extremely extensible and themable power menu for Windows, macOS, and Linux.

Maccha I hate coffee. Maccha is an extremely extensible and themable power menu for Windows, macOS, and Linux. Plugins Plugins are written in Rust (ot

Kyza 9 May 13, 2023
Dreamer is an extensible tool manager and shell for creating small, lightweight, dev environments instantly.

Dreamer ?? Does what you want, not what you say. Dreamer is a universal tool management system for any language, any build system, and any framework.

Luke Davis 4 Oct 23, 2023
A fully extensible command interface to navigate around your leptos application.

leptos-kbar A fully extensible command interface to navigate around your leptos application. See demo: https://leptos-kbar.vercel.app/ Roadmap leptos-

null 7 Mar 10, 2024
A versatile and extensible logging implementation.

Logforth Project Overview A versatile and extensible logging implementation. Usage Add the dependencies to your Cargo.toml with: cargo add log cargo a

FastLabs 36 Oct 19, 2024
Concurrent and multi-stage data ingestion and data processing with Rust+Tokio

TokioSky Build concurrent and multi-stage data ingestion and data processing pipelines with Rust+Tokio. TokioSky allows developers to consume data eff

DanyalMh 29 Dec 11, 2022
Shaping, Processing, and Transforming Data with the Power of Sulfur with Rust

Sulfur WIP https://www.youtube.com/watch?v=PAAvNmoqDq0 "Shaping, Processing, and Transforming Data with the Power of Sulfur" Welcome to the Sulfur pro

Emre 6 Aug 22, 2023
Allows processing of iterators of Result types

try-continue try-continue provides one method, try_continue, which allows you to work with iterators of type Result<T, _>, as if they were simply iter

Nick Krichevsky 3 Dec 26, 2021
convert images to ansi or irc, with a bunch of post-processing filters

img2irc (0.2.0) img2irc is a utility which converts images to halfblock irc/ansi art, with a lot of post-processing filters halfblock means that each

null 6 Apr 4, 2023