Pegasus: A Multi-Node Command Runner

Related tags

Utilities pegasus
Overview

Pegasus: A Multi-Node Command Runner

Run a list of commands on a set of SSH nodes. With a bit of optional parametrization.

Demo

asciicast

Features

  • Passwordless SSH is all you need.
  • Simple config for simple use cases, flexible config for advanced ones.
  • Two modes:
    • Broadcast mode runs each command on every node.
    • Queue mode runs each command once on the next free node.
  • Modify the file-based queue (queue.yaml) while Pegasus is running.
  • Parametrize hosts and commands.

Getting Started with Examples

To use Pegasus,

  1. Clone this repo (I'll soon release binaries, too).
  2. Setup passwordless SSH for your nodes.
  3. Populate hosts.yaml and queue.yaml, and run Pegasus.

Pegasus will remove one entry at a time from the top of queue.yaml and move it to consumed.yaml as it begins to execute it.

Queue Mode: Getting a Bag of Jobs Done

Run four Python commands using two nodes.

# hosts.yaml
- node-1
- node-2
# queue.yaml
- . /opt/miniconda3/etc/profile.d/conda.sh; python train.py --bs 8
- . /opt/miniconda3/etc/profile.d/conda.sh; python train.py --bs 16
- . /opt/miniconda3/etc/profile.d/conda.sh; python train.py --bs 32
- . /opt/miniconda3/etc/profile.d/conda.sh; python train.py --bs 64
$ cargo run -- q  # stands for Queue

Broadcast Mode: Terraforming Nodes

Run identical commands for multiple nodes.

# queue.yaml
- mkdir workspace
- cd workspace && git clone https://github.com/jaywonchung/dotfiles.git
- . workspace/dotfiles/install.sh
$ cargo run -- b  # stands for Broadcast

Parallelizing Execution with Node Parameters

Split nodes into sub-nodes that run commands in parallel. Below, four SSH connections are kept, and four commands run in parallel.

# hosts.yaml
- hostname:
    - node-1
    - node-2
  container:
    - gpu0
    - gpu1

When parametrizing nodes, just make sure you specify the hostname key.

You can use these parameters in your commands. By the way, the templating engine is Handlebars.

# queue.yaml
- docker exec {{ container }} python train.py --bs 8
- docker exec {{ container }} python train.py --bs 16
- docker exec {{ container }} python train.py --bs 32
- docker exec {{ container }} python train.py --bs 64

Four sub-nodes and four jobs. So all jobs will start executing at the same time.

Parametrizing Commands for Conciseness

If you can parametrize nodes, why not commands?

# queue.yaml
- command:
    - docker exec {{ container }} python train.py --bs {{ bs }}
  bs: [8, 16, 32, 64]

This results in the exact same jobs with the example above. When parametrizing commands, just make sure you specify the command key.

Quiz

How many commands will execute in Queue mode?

# hosts.yaml
- hostname:
    - node-1
    - node-2
  laziness:
    - 1
- hostname:
    - node-3
  laziness:
    - 2
# queue.yaml
- echo hi from {{ hostname }}
- command:
    - for i in $(seq {{ low }} {{ high }}); do echo $i; sleep {{ laziness }}; done
    - echo bye from {{ hostname }}
  low:
    - 1
    - 2
  high:
    - 3
    - 4

Note that although echo bye from {{ hostname }} doesn't really use the low or high parameters, it will run 2 * 2 = 4 times regardless.

The answer is 1 + 2 * 2 * 2.

Lock Mode: Modifying the Queue

queue.yaml is actually the queue.

Pegasus removes the first entry in queue.yaml whenver there's a free host available. If you delete entries before Pegasus pulls it, they will not execute. If you add entreis to queue.yaml, they will execute.

Q. Why do I need this?

Think about when the number of remaining commands is less than the number of free nodes. Without a way to submit more jobs to Pegasus, those free nodes will stay idle until all the commands finish and you start a fresh new instance of Pegasus.

By providing a way to add to the queue while commands are still running, users may achieve higher node utilization. Being able to delete from the queue is just a byproduct; adding to the queue is the key feature.

Q. But that's a race condition on queue.yaml.

Lock mode will lock queue.yaml and launch a command line editor for you.

$ cargo run -- l --editor nvim  # l stands for Lock

Editor priority is --editor > $EDITOR > vim. When you save and exit, the queue lock is released and Pegasus is allowed access to queue.yaml.

Q. What if Pegasus terminates before I add to queue.yaml?

Enable daemon mode, and Pegasus will not terminate even if queue.yaml is empty. It will stand waiting for you to populate queue.yaml again, and execute them.

$ cargo run -- q --daemon

Details

queue.yaml

This is the queue file. Entries in queue.yaml are consumed from the top, one by one. Also, entries are consumed only when a new host is available to execute new commands. Consumed entries are immediately appended to consumed.yaml in "canonical form", where every entry has a command key. Thus you might do something like tail -n 2 consumed.yaml > queue.yaml to re-execute your previous single-line command.

As mentioned earlier, always use the Lock Mode when you need to modify queue.yaml.

Broadcast Mode

In broadcast mode, hosts are kept in sync with each other. That is, the next command is fetched from queue.yaml and executed on all hosts when all the hosts are done executing the previous command.

Consider the following situation:

              fast-host   slow-host
- command1     success     success
- command2     success      fail!
- command3     success
- command4     running

In this case, we would want to prepend a undo command for command2 (e.g., rm -rf repo || true) and restart from that, but fast-host is already far ahead, making things complicated. Thus, especially when you're terraforming nodes with Pegasus, keeping hosts in sync should be beneficial.

There is also a -e or --error-aborts flag in Broadcast Mode, which aborts Pegasus automatically when a host fails on a command.

Ctrl-c Behavior

Pegasus tries to implement graceful termination upon ctrl-c. The following happens:

  1. User presses ctrl-c on terminal.
  2. Pegasus's ctrl-c handler wakes up and sends out a cancellation notice.
  3. The scheduling loop detects this notice and breaks right before attempting to fetch from queue.yaml.
    • It may take some time for the scheduling loop to detect this based on what state it's currently in, but it is guaranteed that once the cancellation notice has been sent out, queue.yaml will not change and new commands will not start executing.
  4. Commands that are already running will run until completion. SSH sessions will close their connections whenever they're free.
    • If you really want everything to burn down, consider running something like killall pegasus; killall ssh; rm -rf .ssh-connection*.
  5. When all commands finish, Pegasus will exit.
Comments
  • Upgrade to openssh v0.9.0 with native-mux feature?

    Upgrade to openssh v0.9.0 with native-mux feature?

    openssh v0.9.0-rc1 is released with a new feature native-mux.

    It provides new functions like Session::connect_mux and SessionBuilder::connect_mux which communicates with the ssh multiplex master directly, through the control socket, instead of spawning a new process to communicate with it.

    The advantage of this is more robust error reporting, better performance and less memory usage.

    The old implementation (process-mux) checks the exit status of ssh for indication of error, then parse the output of it and the output of the ssh multiplex master to return an error.

    This method is obviously not so robust as native-mux, which directly communicates with ssh multiplex master through its multiplex protocol.

    The better performance and less memory usage part is mostly because we avoid creating a new process for every command you spawned on remote, every Session::check and every Session::request_port_forwarding.

    The new release also add new function Session::request_port_forwarding, which supports local/remote forwarding of tcp and unix socket stream.

    There are also other changes to API:

    • A new type Stdio is used for setting stdin/stdout/stderr.
    • ChildStd* types are now alias for tokio_pipe::{PipeRead, PipeWrite}.
    • Command::spawn and Command::status now confirms to std::process::Command and tokio::process::Command, in which stdin, stdout and stderr are inherit by default.
    • Command::spawn is now an async method.
    • RemoteChild::wait now takes self by value.
    • Error is now marked #[non_exhaustive] and new variants is added.

    I know this is a huge release and upgrading it is going to be quite difficult, but I sincerely want you to try it out, as the new implementation requires feedback.

    enhancement 
    opened by NobodyXu 10
  • Graceful shutdown on ctrl-c

    Graceful shutdown on ctrl-c

    This PR attempts to gracefully shutdown SSH sessions when the user presses ctrl-c.

    The overall flow of cancellation propagation is:

    1. The user presses ctrl-c.
    2. The handler closure (set by ctrlc::set_handler) assigns true to the variable cancelled: Arc<Mutex<bool>>.
    3. The scheduling loop checks the value of cancelled at the beginning of every iteration. When true, it breaks.
    4. All channel handles that are used to communicate with the SSH session tasks are explicitly droped.
    5. For each SSH session task, whenever it sends or recvs from any of its channels, the channel will return an Err and the task will break out of the task execution loop.
    6. For each SSH session, the session object will be dropped, terminating the SSH session.

    As a side note, along the way, this PR fixes stream in Session by explicitly locking stdout. Without this, multiple calls to print! and println! are not coalesced. This lead to output lines from different commands mixing with each other.

    Closes #3.

    @NobodyXu Will you be interested in reviewing? Just asking since you have reviewed a lot of my code recently (and it benefited both me and Pegasus so much). Plus, I can add you as a collaborator if you'd like :)

    opened by jaywonchung 9
  • Graceful shutdown

    Graceful shutdown

    Currently, Pegasus doesn't handle ctrl-c very well. When the user presses ctrl-c, Pegasus terminates without cancelling the SSH session tasks. So the ssh sessions remain open, .ssh-connection* directories stay as is, and in order to kill the processes spawned on remote nodes, I need to walk into each node and kill them manually.

    Potential solutions

    • One method would be to propagate the signal to all child ssh processes. Proper error handling inside the tasks would probably serve as an okay solution.
    • Another method I'd like to explore is to see if using the native-mux feature of the coming openssh crate will make graceful shutdown any easier.
    enhancement 
    opened by jaywonchung 1
  • Bump openssh to v0.9.0 and enable feature native-mux

    Bump openssh to v0.9.0 and enable feature native-mux

    Fixed #1

    Enabling feature native-mux requires once_cell to be bumped, so I bumped the Cargo.lock using cargo update.

    I also disabled default-feature process-mux of openssh, which then requires me to modify src/session.rs to use the new SSHSession::connect_mux API.

    openssh v0.9.0 also

    • use openssh::Stdio instead of std::process::Stdio in Command::{stdin, stdout, stderr}.
    • updates API of Command::spawn to an async method.

    Signed-off-by: Jiahao XU [email protected]

    opened by NobodyXu 0
  • Reduce num of features enabled for dep tokio

    Reduce num of features enabled for dep tokio

    Avoid pulling feature io-std, net, signal and fs, which added bloat to the binary that are completely unused.

    Signed-off-by: Jiahao XU [email protected]

    opened by NobodyXu 0
  • Consuming jobs in a finer granularity

    Consuming jobs in a finer granularity

    # queue.yaml
    - command:
        - long_job {{ param }}
        - another_long_job {{ param }}
      param:
        - resnet50
        - ViT
    

    Currently, Pegasus will consume the entire entry (virtually the entire file) from queue.yaml. However, consuming minimally would be nice. For instance, leaving queue.yaml in the following state would be nice:

    # queue.yaml
    - command:
        - another_long_job {{ param }}
       param:
        - resnet50
        - ViT
    

    Then in the next round of get_one_job, only then queue.yaml will be empty.

    enhancement 
    opened by jaywonchung 0
  • Run the scheduling loop until all commands finish executing

    Run the scheduling loop until all commands finish executing

    Currently, given that daemon mode is not set, the scheduling loop of both broadcast mode and queue mode just terminate after sending out the last command to a free SSH session. But the time gap between sending out the last command and the final command finishing execution is vast.

    In this time window, users will want to submit more jobs. Obviously since Pegasus seems to be up and running, it is more intuitive for users to assume that it will admit more jobs.

    enhancement 
    opened by jaywonchung 0
  • Ctrl-c is the wrong key to trigger cancellation

    Ctrl-c is the wrong key to trigger cancellation

    Pressing ctrl-c when Pegasus is running will also send ctrl-c to the entire foreground process group, thus also sending SIGINT to the ssh processes.

    Instead of waiting for ctrl-c with the tokio ctcl-c catcher, find another way to catch the user's intent to cancel.

    Possible solutions:

    • Listen to stdin for something random. Like q<Enter>.
    • Create a pegasus stop command. This should make sure to identify the specific pegasus process that is running on the current working directory. This will probably require a pid file.
      • It's okay to assume that there is only one instance of pegasus per directory (if not, that means more than one pegasus processes are manipulating queue.yaml and consumed.yaml).
    enhancement 
    opened by jaywonchung 4
  • Find a way to display progress

    Find a way to display progress

    For a large parametrized command that generates a lot of jobs and takes days, the user will want to figure out the progress of each command generated: queued, running, or done.

    Internal bookkeeping is easy. Maybe keep a file that we dump the commands in progress, and add a mode in pegasus that parses and displays that file.

    enhancement 
    opened by jaywonchung 0
  • Support dynamically updating `hosts.yaml`

    Support dynamically updating `hosts.yaml`

    Use case:

    • Add a node to hosts.yaml when you get access to more nodes, and the jobs you are currently running will automatically go there.
    • Remove a node from hosts.yaml when someone asks you to vacate the node.

    Removing a node from hosts.yaml does not kill the job. However, it is guaranteed that the removed node will not be given a new job once it's been removed from hosts.yaml.

    enhancement 
    opened by jaywonchung 0
  • Check SSH connection before sched loop

    Check SSH connection before sched loop

    Before entering the schedule loop, Pegasus should create and connect the SSH session to see if it connects. If not, it should gracefully terminate all previous connections and exit. The connected SSH session object can be moved into the tokio task.

    enhancement 
    opened by jaywonchung 0
  • Cancelling commands

    Cancelling commands

    Cancelling commands ran by Pegasus is very difficult. You essentially have to ssh into each node and manually figure out the PIDs of commands and kill them.

    Nested commands, so to say, make things more complicated. For instance, docker exec sh -c "python train.py" will run the following commands:

    • Ran by user: sh -c docker exec sh -c "python train.py"
    • Ran by user: docker exec sh -c "python train.py"
    • Ran by root:sh -c "python train.py"
    • Ran by root: python train.py

    Only killing the fourth python train.py command will truely achieve cancellation. The bottom line is, it is difficult for Pegasus to infer how to properly terminate a command.

    Potential solutions

    • We might ask the user for a cancellation command in queue.yaml. For example, sudo kill $(pgrep -f 'train.py'). Then the ctrl_c handler will create a new connection to the hosts and run the designated cancellation command.
    • Somehow figure out the PGID of the sh process and run sudo kill -- -PGID. Can we pgrep -f with the entire command? Shell escaping might become a problem. (pgrep -f with every single word in the command and kill the intersection of all PIDs returned?)
    enhancement 
    opened by jaywonchung 1
Owner
Jae-Won Chung
PhD student in CS. Systems plus Deep Learning. @SymbioticLab
Jae-Won Chung
A safe sync/async multi-producer, multi-consumer channel

Loole A safe async/sync multi-producer multi-consumer channel. Producers can send and consumers can receive messages asynchronously or synchronously:

Mahdi Shojaee 50 Oct 6, 2023
mdTranslation is a utility to prepare multi-lingual Markdown documents.

mdTranslation is a utility to prepare multi-lingual Markdown documents. There's also a mdBook preprocessor called mdbook-translation for

Charles Lew 15 Dec 26, 2022
Incremental, multi-version remote backup tool for block devices.

bsync Incremental, multi-version remote backup tool for block devices. The on-disk backup format is a SQLite database and I've been dogfooding this on

Heyang Zhou 7 Aug 21, 2022
A fast, multi-threaded line counting utility written in Rust.

xloc A fast, multi-threaded line counting utility written in Rust. What is xloc A drop in replacement for bash's wc -l. Your project has x lines of co

null 1 Nov 15, 2021
A working example of multi targets compilation for Rust using Github Actions.

A working example of multi targets compilation for Rust using Github Actions. Supports Windows, MacOSX, x86_64, ARM and Raspberry PI Linux.

Nicolas Vanhoren 41 Dec 17, 2022
Northstar is a horizontally scalable and multi-tenant Kubernetes cluster provisioner and orchestrator

Northstar Northstar is a horizontally scalable and multi-tenant Kubernetes cluster provisioner and orchestrator. Explore the docs » View Demo · Report

Lucas Clerisse 1 Jan 22, 2022
A timer based on a multi-time wheel structure

wheel-timer2 A timer based on a multi-time wheel structure This library uses a multi-layered time wheel structure. When a task is added to the wheel,

orange soeur 1 Jan 25, 2022
Rust library to scan files and expand multi-file crates source code as a single tree

syn-file-expand This library allows you to load full source code of multi-file crates into a single syn::File. Features: Based on syn crate. Handling

Vitaly Shukela 11 Jul 27, 2022
An embedded-hal driver for the TT21100 multi-touch touchscreen controller

tt21100 An embedded-hal driver for the TT21100 multi-touch touchscreen controller. If there is a feature which has not yet been implemented and which

Jesse Braham 5 Jan 9, 2023
Node/Electron library for global key listening.

GlobalKey Building cargo install nj-cli nj-cli build --release Calling from node npm i globalkey # or yarn add globalkey const globalkey = require(

Will 20 Dec 15, 2022
notify Node.js binding via napi-rs.

@napi-rs/notify notify Node.js binding via napi-rs. Install this package yarn add

LongYinan 9 Jun 6, 2022
swc node binding use wasm

node_swc swc node binding use wasm Build Make sure you have rust wasm-pack installed. $ yarn build # build wasm, node Usage import { parseSync, printS

伊撒尔 23 Sep 8, 2022
Another attempt at creating a wrapper for fastcdc in node.js

Another attempt at creating a wrapper for fastcdc in node.js. This time using wasmbindgen instead of neon.

Mikola Lysenko 5 Jul 28, 2022
A fresh FRAME-based Substrate node, ready for hacking

A substrate-based chain that runs in a single node, but simulates the existence of a large set of validators and nominators.

Kian Paimani 3 Aug 24, 2022
Extremely fast JavaScript minifier, available for Rust and Node.js

minify-js Extremely fast JavaScript minifier, written in Rust. Goals Fully written in Rust for maximum compatibility with Rust programs and derivative

Wilson Lin 78 Jul 13, 2023
Node.js bindings for feed_rs

Description Node.js bindings for feed_rs. Installation npm install @nooptoday/feed-rs Usage import { parse } from '@nooptoday/feed-rs' const response

null 5 Nov 17, 2023
Command line interface to manage clipboard

cb Command line interface to manage clipboard How to install Pre-Compiled you can download a pre-compiled executable, then you should copy that execut

Navid 74 Dec 18, 2022
A Command Line OTP Authenticator application.

cloak A Command Line OTP (One Time Password) Authenticator application written in Rust that generates time-based and counter-based OTP codes. Motivati

Evans Murithi 205 Jan 5, 2023
A command-line tool collection to assist development written in RUST

dtool dtool is a command-line tool collection to assist development Table of Contents Description Usage Tips Installation Description Now dtool suppor

GB 314 Dec 18, 2022