Thin wrapper around [`tokio::process`] to make it streamable

Overview

process-stream

Wraps tokio::process::Command to future::stream.

This library provide ProcessExt to create your own custom process

Install

process-stream = "0.3.1"

Example usage:

From Vec or Vec<&str>

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let ls_home: Process = vec!["/bin/ls", "."].into();

    let mut stream = ls_home.spawn_and_stream()?;

    while let Some(output) = stream.next().await {
        println!("{output}")
    }

    Ok(())
}

From Path/PathBuf/str

>().await; println!("{outputs:#?}"); Ok(()) }">
use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut process: Process = "/bin/ls".into();

    // block until process completes
    let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;

    println!("{outputs:#?}");

    Ok(())
}

New

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut ls_home = Process::new("/bin/ls");
    ls_home.arg("~/");

    let mut stream = ls_home.spawn_and_stream()?;

    while let Some(output) = stream.next().await {
        println!("{output}")
    }

    Ok(())
}

Kill

use process_stream::{Process, ProcessExt, StreamExt};
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut long_process = Process::new("/bin/app");

    let mut stream = long_process.spawn_and_stream()?;

    tokio::spawn(async move {
      while let Some(output) = stream.next().await {
        println!("{output}")
      }
    })

    // process some outputs
    tokio::time::sleep(std::time::Duration::new(10, 0)).await;

    // close the process
    long_process.kill().await;

    Ok(())
}

Communicate with running process

use process_stream::{Process, ProcessExt, StreamExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut process: Process = Process::new("sort");

    // Set stdin (by default is set to null)
    process.stdin(Stdio::piped());

    // Get Stream;
    let mut stream = process.spawn_and_stream().unwrap();

    // Get writer from stdin;
    let mut writer = process.take_stdin().unwrap();

    // Spawn new async task and move stream to it
    let reader_thread = tokio::spawn(async move {
        while let Some(output) = stream.next().await {
            if output.is_exit() {
                println!("DONE")
            } else {
                println!("{output}")
            }
        }
    });

    // Spawn new async task and move writer to it
    let writer_thread = tokio::spawn(async move {
        writer.write(b"b\nc\na\n").await.unwrap();
        writer.write(b"f\ne\nd\n").await.unwrap();
    });

    // Wait till all threads finish
    writer_thread.await.unwrap();
    reader_thread.await.unwrap();

    // Result
    // a
    // b
    // c
    // d
    // e
    // f
    // DONE
    Ok(())
}
You might also like...
Alternative StreamMap fork of tokio-stream

streammap-ext This is a fork of StreamMap from tokio-stream crate. The only difference between the implementations is that this version of StreamMap n

Graceful shutdown util for Rust projects using the Tokio Async runtime.

Shutdown management for graceful shutdown of tokio applications. Guard creating and usage is lock-free and the crate only locks when: the shutdown sig

Async Rust cron scheduler running on Tokio.

Grizzly Cron Scheduler A simple and easy to use scheduler, built on top of Tokio, that allows you to schedule async tasks using cron expressions (with

Utilities and tools based around Amazon S3 to provide convenience APIs in a CLI

s3-utils Utilities and tools based around Amazon S3 to provide convenience APIs in a CLI. This tool contains a small set of command line utilities for

The Dutch secret service (AIVD) has a yearly puzzle challenge around Christmas
The Dutch secret service (AIVD) has a yearly puzzle challenge around Christmas

AIVD kerstpuzzel 2021 18 solver The Dutch secret service (AIVD) has a yearly puzzle challenge around Christmas, called the 'AIVD kerstpuzzel'. This re

Fast, compact and all-around subdomain enumeration tool written in Rust
Fast, compact and all-around subdomain enumeration tool written in Rust

Fast, compact and all-around subdomain enumeration tool written in Rust, which uses dns bruteforce, internet search and recursive http content search.

🚧 (Alpha stage software) Binary that supports remote filesystem and process operations. 🚧

distant Binary to connect with a remote machine to edit files and run programs. 🚧 (Alpha stage software) This program is in rapid development and may

bustd is a lightweight process killer daemon for out-of-memory scenarios for Linux!

bustd: Available memory or bust! bustd is a lightweight process killer daemon for out-of-memory scenarios for Linux! Features Small memory usage! bust

Fox Ear is a Linux process behavior trace tool powered by eBPF
Fox Ear is a Linux process behavior trace tool powered by eBPF

Fox Ear Fox Ear is a Linux process behavior trace tool powered by eBPF. Banner image by Birger Strahl on Unsplash. Features Log process and its subpro

Comments
  • Question on sending input and listening to a process

    Question on sending input and listening to a process

    What I want:

    1. send some commands to a process;
    2. listen to the feedback from the process(even I don't send command first);

    For the 1st, Seems stdin() is the right method? Is there an example, For the 2ed, the example using next() seems ok. But in my toy process:

    fn main() {
        let mut rng = rand::thread_rng();
        // let v = vec!["ni", "hao", "san", "niu", "fdsfs"];
        let v = vec![1,2,3,4,5,6,7,8,9,10,11,12];
    
        loop {
            let mut input_str = String::new();
            io::stdin()
                .read_line(&mut input_str)
                .expect("get input failed");
            if input_str == "hi\r\n" {
                println!("got it");
            } else {
                let s = v.iter().choose_multiple(&mut rng, 1);
                println!("{}", s[0]);
            }
            sleep(Duration::from_secs(1));
        }
    }
    

    When using next() with it,even I didn't input anything, the process would keep output , which is not what I want. How to fix it?

    opened by dbsxdbsx 9
  • How to solve

    How to solve "stream did not contain valid UTF-8"?

    Sometimes the feedback from the process would give some output not follow UTF-8 codec. Is there a way to work around it? For example, when the output is detected as not UTF-8, then I could decode it with another codec.

    opened by dbsxdbsx 2
Releases(v0.3.0)
  • v0.2.3(Jun 17, 2022)

    Support writing to process

    
    #[tokio::main]
    async fn main() -> io::Result<()> {
        let mut process: Process = Process::new("sort");
    
        // Set stdin (by default is set to null)
        process.stdin(Stdio::piped());
    
        // Get Stream;
        let mut stream = process.spawn_and_stream().unwrap();
    
        // Get writer from stdin;
        let mut writer = process.take_stdin().unwrap();
    
        // Spawn new async task and move stream to it
        let reader_thread = tokio::spawn(async move {
            while let Some(output) = stream.next().await {
                if output.is_exit() {
                    println!("DONE")
                } else {
                    println!("{output}")
                }
            }
        });
    
        // Spawn new async task and move writer to it
        let writer_thread = tokio::spawn(async move {
            writer.write(b"b\nc\na\n").await.unwrap();
            writer.write(b"f\ne\nd\n").await.unwrap();
        });
    
        // Wait till all threads finish
        writer_thread.await.unwrap();
        reader_thread.await.unwrap();
    
    
        Ok(())
    }
    

    Full Changelog: https://github.com/tami5/process-stream/compare/v0.2.2...v0.2.3

    Source code(tar.gz)
    Source code(zip)
  • v0.2.0(May 23, 2022)

Owner
Wandering, exploring, making mistakes and actively pushing limits. (formerly tami5)
null
A thin-hypervisor that runs on aarch64 CPUs.

How to build the hypervisor By Rust toolchain (TBD) By docker Requirements Docker (Tested by Docker version 20.10.8, build 3967b7d28e) I tested by non

RIKEN R-CCS 54 Dec 12, 2022
The classic game Pong, written in lambda calculus, and a thin layer of Rust.

What? The good old game Pong, written in lambda calculus, and a thin layer of Rust. Why? I was bored. No, seriously, why? Everyone keeps saying that l

null 2 Aug 14, 2022
prelate-rs is an idiomatic, asynchronous Rust wrapper around the aoe4world API. Very much a WIP at this stage.

prelate-rs is an idiomatic, asynchronous Rust wrapper around the aoe4world API. Very much a WIP at this stage. Project Status We currently support the

William Findlay 4 Dec 29, 2022
A Wasm component optimizer (mostly a wrapper around wasm-opt)

component-opt An optimizer for Wasm Components Current Status This project currently only offers one optimization and does not allow it to be configur

Robin Brown 6 Mar 4, 2024
Mix async code with CPU-heavy thread pools using Tokio + Rayon

tokio-rayon Mix async code with CPU-heavy thread pools using Tokio + Rayon Resources Documentation crates.io TL;DR Sometimes, you're doing async stuff

Andy Barron 74 Jan 2, 2023
Provides utility functions to perform a graceful shutdown on an tokio-rs based service

tokio-graceful-shutdown IMPORTANT: This crate is in an early stage and not ready for production. This crate provides utility functions to perform a gr

null 61 Jan 8, 2023
An asynchronous IO utilities crate powered by tokio.

An asynchronous IO utilities crate powered by tokio.

Harry 2 Aug 18, 2022
dark-std an Implementation of asynchronous containers build on tokio

dark-std dark-std is an Implementation of asynchronous containers build on tokio. It uses a read-write separation design borrowed from Golang SyncHash

darkrpc 4 Dec 13, 2022
Pure Rust library for Apache ZooKeeper built on tokio

zookeeper-async Async Zookeeper client written 100% in Rust, based on tokio. This library is intended to be equivalent with the official (low-level) Z

Kamil Rojewski 16 Dec 16, 2022
Rc version `tokio-rs/bytes`

RcBytes The aim for this crate is to implement a Rc version bytes, which means that the structs in this crate does not implement the Sync and Send. Th

Al Liu 2 Aug 1, 2022