A system to programmatically run data pipelines

Overview

Factotum

Release Apache License 2.0

A dag running tool designed for efficiently running complex jobs with non-trivial dependency trees.

The zen of Factotum

  1. A Turing-complete job is not a job, it's a program
  2. A job must be composable from other jobs
  3. A job exists independently of any job schedule

User quickstart

Assuming you're running 64 bit Linux:

wget https://github.com/snowplow/factotum/releases/download/0.6.0/factotum_0.6.0_linux_x86_64.zip
unzip factotum_0.6.0_linux_x86_64.zip
./factotum --version

Factotum requires one argument, which is a factotum factfile that describes the job to run. For example, to run the sample sleep.factfile:

wget https://raw.githubusercontent.com/snowplow/factotum/master/samples/sleep.factfile
./factotum run sleep.factfile

Specifying variables in the job file can be done using --env JSON (or -e JSON). The JSON here is free-form and needs to correspond to the placeholders you've set in your job.

For example, the following will print "hello world!":

wget https://raw.githubusercontent.com/snowplow/factotum/master/samples/variables.factfile
./factotum run variables.factfile --env '{ "message": "hello world!" }'

Starting from an arbitrary task can be done using the --start TASK or -s TASK arguments, where TASK is the name of the task you'd like to start at.

For example, to start at the "echo beta" task in this job, you can run the following:

wget https://raw.githubusercontent.com/snowplow/factotum/master/samples/echo.factfile
./factotum run echo.factfile --start "echo beta"

To get a quick overview of the options provided, you can use the --help or -h argument:

./factotum --help

For more information on this file format and how to write your own jobs, see the Factfile format section below.

Factfile format

Factfiles are self-describing JSON which declare a series of tasks and their dependencies. For example:

{
    "schema": "iglu:com.snowplowanalytics.factotum/factfile/jsonschema/1-0-0",
    "data": {
        "name": "Factotum demo",
        "tasks": [
            {
                "name": "echo alpha",
                "executor": "shell",
                "command": "echo",
                "arguments": [ "alpha" ],
                "dependsOn": [],
                "onResult": {
                    "terminateJobWithSuccess": [],
                    "continueJob": [ 0 ]
                }
            },
            {
                "name": "echo beta",
                "executor": "shell",
                "command": "echo",
                "arguments": [ "beta" ],
                "dependsOn": [ "echo alpha" ],
                "onResult": {
                    "terminateJobWithSuccess": [],
                    "continueJob": [ 0 ]
                }
            },
            {
                "name": "echo omega",
                "executor": "shell",
                "command": "echo",
                "arguments": [ "and omega!" ],
                "dependsOn": [ "echo beta" ],
                "onResult": {
                    "terminateJobWithSuccess": [],
                    "continueJob": [ 0 ]
                }
            }
        ]
    }
}

This example defines three tasks that run shell commands - echo alpha, echo beta and echo omega. echo alpha has no dependencies - it will run immediately. echo beta depends on the completion of the echo alpha task, and so will wait for echo alpha to complete. echo omega depends on the echo beta task, and so will wait for echo beta to be complete before executing.

Given the above, the tasks will be executed in the following sequence: echo alpha, echo beta and finally, echo omega. Tasks can have multiple dependencies in factotum, and tasks that are parallelizable will be run concurrently. Check out the samples for more sample factfiles or the wiki for a more complete description of the factfile format.

Developer quickstart

Factotum is written in Rust.

Using Vagrant

  • Clone this repository - git clone [email protected]:snowplow/factotum.git
  • cd factotum
  • Set up a Vagrant box and ssh into it - vagrant up && vagrant ssh
    • This will take a few minutes
  • cd /vagrant
  • Compile and run a demo - cargo run -- run samples/echo.factfile

Using stable Rust without Vagrant

  • Install Rust
    • on Linux/Mac - curl -sSf https://static.rust-lang.org/rustup.sh | sh
  • Clone this repository - git clone [email protected]:snowplow/factotum.git
  • cd factotum
  • Compile and run a demo - cargo run -- run samples/echo.factfile

Copyright and license

Factotum is copyright 2016-2021 Snowplow Analytics Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Comments
  • Configurable max event size for webhook updates

    Configurable max event size for webhook updates

    Waiting for webhook to finish sending events... done! Warning: some events failed to send
    

    I think it's related to overly-large payloads (too much stdout/err). We should be truncating this farely aggressively...

    opened by alexanderdean 12
  • Add ability to visualize DAG at CLI

    Add ability to visualize DAG at CLI

    We should be able to:

    • Visualize a DAG at the CLI when editing a DAG
    • The failure output should visualize the DAG
    • The success output should be able to optionally visualize the DAG, maybe with a --verbose setting
    code-complete 
    opened by alexanderdean 10
  • Release/0.4.0

    Release/0.4.0

    Hey @alexanderdean this PR adds support for --constraint="host,{{some_host}}"

    At the moment if a host-value set is found we go ahead and attempt to assert this fact. If this fails for whatever reason we exit with return code 3 and print the error message.

    Please let me know if you would like to see any different behaviour in this.

    opened by jbeemster 7
  • Unfriendly error if user attempts to add argument to command

    Unfriendly error if user attempts to add argument to command

    Changing:

    "command": "/opt/mt-scripts/common/scripts/r77/emr-etl-runner-r77.sh",
    

    to:

    "command": "/opt/mt-scripts/common/scripts/r77/emr-etl-runner-r77.sh staging",
    

    throws:

    thread '<main>' panicked at 'called `Result::unwrap()` on an `Err` value: Error { repr: Os { code: 2, message: "No such file or directory" } }', ../src/libcore/result.rs:746
    note: Run with `RUST_BACKTRACE=1` for a backtrace.
    
    opened by alexanderdean 5
  • Define job schema

    Define job schema

    Current working idea:

    {
      "schema": "iglu:com.snowplowanalytics.factotum/job/jsonschema/1-0-0",
      "data": {
        "name": "My First DAG",
        "arguments": [ "clientTag" ],
        "assignments": {
          "configDir": "/opt/mt-configs2/{{ $.arguments.clientTag }}",
          "scriptDir": "/opt/mt-scripts/acme"
        },
        "steps": [
          {
            "name": "EmrEtlRunner",
            "type": "shell",
            "command": "{{ $.assignments.scriptDir }}/acme-emr-etl-runner.sh",
            "arguments": [ "{{ $.assignments.configDir }}" ],
            "dependsOn": [],
            "response": {
              "noOp": [ 3 ]
            }
          },
          {
            "name": "StorageLoader",
            "type": "shell",
            "command": "{{ $.assignments.scriptDir }}/acme-storage-loader.sh",
            "arguments": [ "{{ $.assignments.configDir }}" ],
            "dependsOn": [ "EmrEtlRunner" ]
          },
          {
            "name": "SQL Runner",
            "type": "shell",
            "command": "/opt/sql-runner-0.2.0/sql-runner",
            "arguments": [ "--playbook", "{{ $.assignments.configDir }}/sql-runner/playbooks/stage-1.yml", "--sqlroot", "{{ $.aliases.configDir }}/sql-runner/sql" ],
            "dependsOn": [ "StorageLoader" ]
          }
        ]
      }
    }
    
    dag-formats 
    opened by alexanderdean 5
  • Release/0.5.0

    Release/0.5.0

    Factotum "server mode":

    • [x] http server
    • [x] json
      • [x] parse
      • [x] construct
      • [x] validate
    • [x] state
      • ~~log file/database~~
      • [x] consul
    • [x] dispatcher
      • [x] one-to-many queue
      • [x] add to queue
      • [x] get queue status
    • [x] worker process job
    • [x] cli
    • [x] ~~merge into main factotum Cargo.toml~~ move to separate repo
    • [ ] ci/cd
    • [ ] docs
    opened by ungn 4
  • Add

    Add "transition" information to webhook output

    Splitting up updates into "task_updates" and "job_updates".

    Job updates contain the previous state, and the current state (this change is the reason the event was emitted) (e.g. RUNNING->SUCCEEDED)

    Task updates contain a list of task transitions (the reason for the event to be emitted) e.g. RUNNING->SUCCEEDED_NO_OP

    code-complete 
    opened by ninjabear 4
  • Broken links in README

    Broken links in README

    Links to echo.factfile and sleep.factfile referring to 0.2.0 branch, which is deleted. They will also be broken in 0.3.0 when according branch will be deleted.

    code-complete 
    opened by chuwy 3
  • Placeholder for client bug with clojure-cloudfront-tmp.factfile

    Placeholder for client bug with clojure-cloudfront-tmp.factfile

    This factfile:

    {
        "schema": "iglu:com.snowplowanalytics.factotum/factfile/jsonschema/1-0-0",
        "data": {
            "name": "snplow2",
            "tasks": [
                {
                    "name": "cloudfront-emr-etl-runner",
                    "executor": "shell",
                    "command": "/opt/mt-scripts/common/scripts/r77/emr-etl-runner-r77.sh",
                    "arguments": [ "a-cloudfront", "staging" ],
                    "dependsOn": [],
                    "onResult": {
                        "terminateJobWithSuccess": [ 3, 4 ],
                        "continueJob": [ 0 ]
                    }
                },
                {
                    "name": "clojure-storage-loader",
                    "executor": "shell",
                    "command": "/opt/mt-scripts/common/scripts/r77/storage-loader-r77.sh",
                    "arguments": [ "a-clojure" ],
                    "dependsOn": [],
                    "onResult": {
                        "terminateJobWithSuccess": [],
                        "continueJob": [ 0 ]
                    }
                },
                {
                    "name": "cloudfront-storage-loader",
                    "executor": "shell",
                    "command": "/opt/mt-scripts/common/scripts/r77/storage-loader-r77.sh",
                    "arguments": [ "a-cloudfront" ],
                    "dependsOn": [ "cloudfront-emr-etl-runner", "clojure-storage-loader" ],
                    "onResult": {
                        "terminateJobWithSuccess": [],
                        "continueJob": [ 0 ]
                    }
                }
            ]
        }
    }
    

    causes a file not found error when run.

    opened by alexanderdean 3
  • Use .factfile as file extension in README and all tests and samples

    Use .factfile as file extension in README and all tests and samples

    Should factotum facfiles be:

    • .factotum
    • .factfile
    • .fact
    • .job
    • .ff
    • .json (this one is unrelated, but it makes sure most editors pick it up as json)
    • something else?
    code-complete 
    opened by ninjabear 3
  • Add

    Add "server mode" to Factotum

    We need to create a daemon that can accept jobs remotely and start them immediately.

    This can be called "factotumd", "factotum server" or another snazzy name (this may change).

    We should stick to a process boundary for now (ie. the server should use child processes of the factotum cli tool) to execute jobs. This could change at a later date.

    The intent behind this is to permit starting (pre-shared) factotum jobs on a remote machine, so a user wouldn't need to ssh in and run it manually, and as a springboard for later work in creating a whole server-based factotum ecosystem.

    /cc @alexanderdean

    opened by ninjabear 2
  • Allow templating of command path

    Allow templating of command path

    In systems with variable system paths (like if we are running out of Docker) it can be useful to be able to template the command parameter so that local testing and testing in remote systems do not require manual changes to work as the path always needs to be absolute.

    opened by jbeemster 0
  • Hyper upgrade (RUSTSEC-2021-0079)

    Hyper upgrade (RUSTSEC-2021-0079)

    We need to upgrade hyper to a modern version (part of the webhook plumbing).

    This may well result in the adoption of reqwest (built on hyper to make things easier).

    opened by jamessnowplow 0
  • Respect SIGTERM & SIGINT so that application can exit cleanly

    Respect SIGTERM & SIGINT so that application can exit cleanly

    If Factotum is running in a containerized environment it can be issued with SIGTERM - when this happens the application is killed instantly with no chance of sending out the webhooks it needs to send nor are the scripts / applications it is running able to cleanly exit.

    Factotum should respect and if possible propagate this signal so that it can exit cleanly.

    Docs here: https://rust-cli.github.io/book/in-depth/signals.html

    opened by jbeemster 0
  • Add ability to limit step concurrency to prevent overloading limited systems

    Add ability to limit step concurrency to prevent overloading limited systems

    When multiple steps in the DAG execute concurrently it can overload very limited dockerized solutions as it becomes difficult to predict how much compute will be needed at different stages of the execution.

    By limiting concurrency this can be better controlled.

    opened by jbeemster 0
Releases(0.6.0)
A fast data collector in Rust

Flowgger is a fast, simple and lightweight data collector written in Rust. It reads log entries over a given protocol, extracts them, decodes them usi

Amazon Web Services - Labs 739 Jan 7, 2023
Provides a way to use enums to describe and execute ordered data pipelines. 🦀🐾

enum_pipline Provides a way to use enums to describe and execute ordered data pipelines. ?? ?? I needed a succinct way to describe 2d pixel map operat

Ben Greenier 0 Oct 29, 2021
A fast and robust MLOps tool for managing data and pipelines

xvc A Fast and Robust MLOps Swiss-Army Knife in Rust ⌛ When to use xvc? Machine Learning Engineers: When you manage large quantities of unstructured d

Emre Sahin 6 Dec 15, 2022
Execution of and interaction with external processes and pipelines

subprocess The subprocess library provides facilities for execution of and interaction with external processes and pipelines, inspired by Python's sub

Hrvoje Nikšić 375 Jan 2, 2023
Rust native ready-to-use NLP pipelines and transformer-based models (BERT, DistilBERT, GPT2,...)

rust-bert Rust native Transformer-based models implementation. Port of Hugging Face's Transformers library, using the tch-rs crate and pre-processing

null 1.3k Jan 8, 2023
Connect GStreamer pipelines to Jitsi Meet conferences

gst-meet: Integrate Jitsi Meet conferences with GStreamer pipelines Note: gst-meet is in an alpha state and is under active development. The command-l

AVStack 41 Nov 10, 2022
Framework for large distributed pipelines

Rain Rain is an open-source distributed computational framework for processing of large-scale task-based pipelines. Rain aims to lower the entry barri

Substantic 705 Dec 27, 2022
chain nats.io servers with transformation & processing pipelines

NATS proxy service Simple tool to forward specific topics from one nats.io cluster to the same server or another. Provides support to process messages

Marquitos 8 Sep 19, 2022
Using bevy and custom render pipelines in order to render many objects in a forest using chunks for performance.

bevy_efficient_forest_example Using bevy and custom render pipelines in order to render many objects in a forest using chunks for performance. Grass i

Henrik Djurestål 43 Jan 5, 2023
OpenAPI-based test coverage analysis tool that helps teams improve integration test coverage in CI/CD pipelines

Ready-to-use OpenAPI test coverage analysis tool that helps teams improve integration CoveAPI is an advanced test coverage analysis tool based on the

Yasser Tahiri 18 Aug 3, 2023
Write CI/CD pipelines using TypeScript

Katoa Katoa is a community fork of Cidada, a tool created by Fig which was sunset in late 2023 following acquisition by AWS. This fork and the underly

Katoa 47 Oct 6, 2023
EmbedAnything is a powerful python library designed to streamline the creation and management of embedding pipelines

EmbedAnything is a powerful python library designed to streamline the creation and management of embedding pipelines. Built in Rust with no heavy dependencies.

Starlight 39 May 7, 2024
Unified directories for different use cases of an application, providing standard directories for local development, when run as service or when run by a user.

UniDirs Unified directories for different use cases of an application, providing standard directories for local development, when run as service or wh

Dominik Nakamura 3 Sep 30, 2022
A guide for Mozilla's developers and data scientists to analyze and interpret the data gathered by our data collection systems.

Mozilla Data Documentation This documentation was written to help Mozillians analyze and interpret data collected by our products, such as Firefox and

Mozilla 75 Dec 1, 2022
Scalable and fast data store optimised for time series data such as financial data, events, metrics for real time analysis

OnTimeDB Scalable and fast data store optimised for time series data such as financial data, events, metrics for real time analysis OnTimeDB is a time

Stuart 2 Apr 5, 2022
ArbOS operating system, to run at Layer 2 on Arbitrum chains. Also a compiler for Mini, the language in which ArbOS is written.

ArbOS and Mini compiler ArbOS is the "operating system" that runs at Layer 2 on an Arbitrum chain, to manage the chain's operation, maintain security,

Offchain Labs 88 Nov 6, 2022
Export MacOS iMessage data + run iMessage Diagnostics

imessage-export This crate provides both a library to interact with iMessage data as well as a binary that can perform some useful read-only operation

Christopher Sardegna 10 Dec 30, 2022
A lightweight microkernel/IPC based operating system built with Rust which is not a clone of any existing operating system

Noble Operating System Noble is a lightweight microkernel and IPC based operating system built with Rust which is not a clone of any existing operatin

Revolution Xenon 3 Jan 10, 2022
Elemental System Designs is an open source project to document system architecture design of popular apps and open source projects that we want to study

Elemental System Designs is an open source project to document system architecture design of popular apps and open source projects that we want to study

Jason Shin 9 Apr 10, 2022
Experimental package manager/system configurator for system hoppers

mascara An experimental package manager/config initializer tool for system hoppers. mascara.toml [mascara] feature = "Debian" logs = { stdout = "blue"

Ethan Gallucci 1 Apr 15, 2022