Fault-tolerant Async Actors Built on Tokio

Related tags

Database actors tokio
Overview

Kameo 🧚🏻

Crates.io Version Crates.io Total Downloads Crates.io License GitHub Repo stars

Fault-tolerant Async Actors Built on Tokio

  • Async: Built on tokio, actors run asyncronously in their own isolated spawned tasks.
  • Supervision: Link actors, creating dependencies through child/parent/sibbling relationships.
  • MPSC Bounded/Unbounded Channels: Uses mpsc channels for messaging between actors with boundedness configurable.
  • Concurrent Queries: Support concurrent processing of queries when mutable state isn't necessary.
  • Panic Safe: Catches panics internally, allowing actors to be restarted.

Installing

Install using cargo add:

cargo add kameo

or adding to your dependencies manually:

[dependencies]
kameo = "*"

Defining an Actor without Macros

use kameo::Actor;
use kameo::message::{Context, Message};

// Define the actor state
struct Counter {
    count: i64,
}

impl Actor for Counter {}

// Define messages
struct Inc { amount: u32 }

impl Message<Inc> for Counter {
    type Reply = i64;

    async fn handle(&mut self, msg: Counter, _ctx: Context<'_, Self, Self::Reply>) -> Self::Reply {
        self.count += msg.0 as i64;
        self.count
    }
}

Defining an Actor with Macros

use kameo::{messages, Actor};

// Define the actor state
#[derive(Actor)]
struct Counter {
    count: i64,
}

// Define messages
#[messages]
impl Counter {
    #[message]
    fn inc(&mut self, amount: u32) -> i64 {
        self.count += amount as i64;
        self.count
    }
}
See generated macro code
// Derive Actor
impl kameo::actor::Actor for Counter {
    type Mailbox = kameo::actor::UnboundedMailbox<Self>;

    fn name(&self) -> Cow<'_, str> {
        Cow::Borrowed("Counter")
    }
}

// Messages
struct Inc { amount: u32 }

impl kameo::message::Message<Inc> for Counter {
    type Reply = i64;

    async fn handle(&mut self, msg: &mut Inc, _ctx: Context<'_, Self, Self::Reply>) -> Self::Reply {
        self.inc(msg.amount)
    }
}
Actor#[messages]

Spawning an Actor & Messaging

let counter_ref = kameo::spawn(Counter { count: 0 });

let count = counter_ref.ask(Inc(42)).send().await?;
println!("Count is {count}");
ActorRef::ask

Benchmarks

13x higher throughput when compared with Actix

benchmark

Above shows a basic benchmark for sending a message to an actor in Kameo and Actix. Always benchmark for yourself.

Benchmark results

Sending a message to an actor

Benchmark Time
Kameo Unsync Message 432.26 ns
Kameo Sync Message 503.89 ns
Kameo Query 1.3000 µs
Actix Message 5.7545 µs

Processing fibonachi sequence in an actor up to 20

Benchmark Time
Kameo Unsync Message 18.229 µs
Kameo Sync Message 18.501 µs
Kameo Query 19.257 µs
Actix Message 27.442 µs

Contributing

Contributions are welcome! Feel free to submit pull requests, create issues, or suggest improvements.

License

kameo is dual-licensed under either:

at your option.

Comments
  • Backpressure in ActorPools

    Backpressure in ActorPools

    For single-actors I think the unbounded channels work fine, because implicitly they're sequential, conceptually (of course you can do more complex things).

    However for actor pools, I want the ability to fill up all the actors in a given pool, and not keep spamming messages until they're ready to accept some more. I can't use .send().await like I would in a normal actor, because that's effectively just making the pool meaningless. I can't send_async unless it's trivially small data, because it's not going to have any backpressure, and will effectively memory leak.

    In this case, I think it's fine to have a model of something like...

    pool.send_async()
    pool.wait_until_available().await
    

    This does of course have an implicit race condition, which probably doesn't matter for a lot of use cases, but it could be baked into another send method like...

    pool.send_when_available().await
    

    For users who want more backpressure than the pool actor count provides, they can easily add a buffered channel on top of this (or another actor itself that buffers).

    enhancement good first issue 
    opened by johnpyp 6
  • Sync API

    Sync API

    I have a use case where I have an actor managing some GPU state and I want to interact with it from the synchronous context of a winit application. The way I can currently do this is by essentially using a tokio::sync::oneshot channel and spawning a task with tokio which populates the channel with the result of sending a message to the actor. I suspect that internally kameo is probably using a oneshot channel or similar already, and it might be possible to interact with it to receive the response in a sync context.

    I would propose a similar API to https://docs.rs/kameo/latest/kameo/actor/struct.ActorRef.html#method.send, which could be called ActorRef::send_sync. This would return an object that could be polled to see if it is complete. Polling the future directly in a sync context is not possible due to the lack of the async runtime's context that must be passed in.

    enhancement 
    opened by vadixidav 6
  • unbounded channel usage

    unbounded channel usage

    Hello,

    thanks for sharing the project. I'm tired of typing my own message passing with channels, an enum and tasks over and over again ;-)

    What's the rational behind using unbounded channels? I debugged channel usage a lot and always came to the conclusion that a stuck task/actor that is backpressured from somewhere is easier to find than something like a memory leak in an unbounded channel.

    thanks

    @flxo

    question 
    opened by flxo 6
  • Remove Sync trait from into_boxed_err method signature in DeriveReply

    Remove Sync trait from into_boxed_err method signature in DeriveReply

    The derive Reply include Sync trait, but BoxDebug doesn't include it.

    #[inline]
    fn into_boxed_err(self) -> ::std::option::Option<::std::boxed::Box<dyn ::std::fmt::Debug + ::std::marker::Send + ::std::marker::Sync + 'static>> {
        ::std::option::Option::None
    }
    
    opened by liutaon 3
  • Attach `Streams` to actors

    Attach `Streams` to actors

    To process a Stream in a Kameo actor I think currently the only solution is to spawn a (Tokio) task and forward each item to the actor. Each item must be sent through the actors queue. Spawning the task and moving a ActorRef inside etc... is boilerplate and boring.

    I remember that at least actix has a feature to attach streams to actors and wondered if that could be considered a feature for kameo as well?

    If yes:

    I hacked a prototype and noticed that every message in kameo get's boxed and was very surprised because this is generally considered as a possible performance hit. Is there no way around that? Sadly I had to do the same for the stream feature.

    In the prototype an actor is not notified if a stream completes. This could be easily achieved by using StreamNotifyClose. This would be a API decision. I think it's a nice feature but adds noise and possible confusion because the handler need to have an Option around each message. This implicity must be know and may be hard to get for beginners.

    Thanks for your thoughts!

    opened by flxo 3
  • can not found generic type in struct

    can not found generic type in struct

    #[derive(Actor)]
    pub struct MyActor<T> {
        arena: Vec<T>,
    }
    
    #[messages]
    impl<T: Send + 'static> MyActor<T> {
        #[message]
        pub fn add(&mut self, t: T) {}
    }
    

    and i get the compile error:

    error[E0412]: cannot find type `T` in this scope
       |
    24 |     pub fn add(&mut self, t: T) {}
       |                              ^ not found in this scope
    
    opened by Liyixin95 2
  • how to implement (override) anything from `Kameo::Actor` if using the derive macro

    how to implement (override) anything from `Kameo::Actor` if using the derive macro

    If I declare an actor with the derive macro it's not possible to have an additional impl block that override the default implementations from here.

    Am I forced to not use the derive macro if I need a custom impl for anything in Actor?

    opened by flxo 2
  • async handler fn by macro

    async handler fn by macro

    The macro https://github.com/tqwewe/kameo/blob/bb4c8ffdbda8bd4f56cde00ffe8bc38e7d557197/crates/kameo/examples/macro.rs#L19

    example shows how to define a handler fn which is not async. Is that on purpose and what's the reason for that? I would have expected this to be async or return something impl Future<...>

    opened by flxo 2
  • feat!: return `SendError` from send methods allowing replies to be received blocking

    feat!: return `SendError` from send methods allowing replies to be received blocking

    ActorRef's send methods which receive replies now return SendResult, allowing message replies to be either received asyncronously, or blocking.

    actor_ref.send(msg).await?; // Async recv
    actor_ref.send(msg).blocking_recv()?; // Blocking recv
    

    Closes https://github.com/tqwewe/kameo/issues/25

    opened by tqwewe 1
  • feat!: add stream messages to forward messages from a stream to an actor

    feat!: add stream messages to forward messages from a stream to an actor

    Closes https://github.com/tqwewe/kameo/issues/15

    This PR removes stateless actors, as the implementation for it conflicts with features required by this PR. This is a breaking change.

    A new trait StreamMessage is provided, which can be implemented by actors to allow them to be attached to streams using the ActorRef::attach_stream method.

    ping @flxo

    opened by tqwewe 1
  • Profiling and benchmarking of sync message passing

    Profiling and benchmarking of sync message passing

    Hello,

    Just want to share this. I was curious about the overhead of kameo compared to "plain" channels and a task.

    1. Profiling

    Lets see where kameo spends it's cpu time. The following minimal program built in release mode (+ debug=true in the profile) and executed on a macOS on 2,4 GHz 8-Core Intel Core i9.

    use kameo::{
        message::{Context, Message},
        Actor,
    };
    
    #[derive(Default)]
    pub struct MyActor;
    
    impl Actor for MyActor {}
    
    impl Message<()> for MyActor {
        type Reply = ();
    
        async fn handle(&mut self, _: (), _ctx: Context<'_, Self, Self::Reply>) -> Self::Reply {}
    }
    
    #[tokio::main(flavor = "current_thread")]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let my_actor_ref = kameo::spawn(MyActor::default());
        loop {
            my_actor_ref.send(()).await?;
        }
    }
    

    produces this profile. Captured with samply(❤️).

    You can see the two big blocks: kameo::actor::actor_ref::ActorRef::send::{{closure}} and <kameo::actor_kind::SyncActor as kameo::actor_kind::ActorState>::handle_message::{{closure}} in the flame graph beside the usual Tokio runtime stuff. Interesting is: ~5.4% overhead due to Box::new in the tx path. ~8.3% (+4% free) for boxing on the rx side.

    1. Comparison to a plain channel + Tokio task

    I used criterion to compare a simple echo actor that is processing sync calls with a plain Tokio task that sends a reply on a received on shot Sender. Think this gives an impression about the overhead of the convenience that kameo brings.

    use criterion::Criterion;
    use criterion::{criterion_group, criterion_main};
    use kameo::{
        message::{Context, Message},
        Actor,
    };
    use tokio::sync::mpsc;
    use tokio::sync::oneshot;
    use tokio::task;
    
    fn actor(c: &mut Criterion) {
        let rt = tokio::runtime::Builder::new_current_thread()
            .build()
            .unwrap();
        let _guard = rt.enter();
    
        struct BenchActor;
    
        impl Actor for BenchActor {}
    
        impl Message<u32> for BenchActor {
            type Reply = u32;
    
            async fn handle(&mut self, msg: u32, _ctx: Context<'_, Self, Self::Reply>) -> Self::Reply {
                msg
            }
        }
        let actor_ref = kameo::actor::spawn(BenchActor {});
    
        c.bench_function("actor_sync_messages", |b| {
            b.to_async(&rt).iter(|| async {
                actor_ref.send(0).await.unwrap();
            });
        });
    }
    
    fn plain(c: &mut Criterion) {
        let rt = tokio::runtime::Builder::new_current_thread()
            .build()
            .unwrap();
        let _guard = rt.enter();
    
        // Echo task - pendant to the actor.
        let (tx, mut rx) = mpsc::unbounded_channel::<(u32, oneshot::Sender<u32>)>();
        task::spawn(async move {
            while let Some((msg, tx)) = rx.recv().await {
                tx.send(msg).unwrap();
            }
        });
    
        c.bench_function("plain_sync_messages_unbounded", |b| {
            b.to_async(&rt).iter(|| async {
                let (reply_tx, reply_rx) = oneshot::channel();
                tx.send((0, reply_tx)).unwrap();
                reply_rx.await.unwrap();
            });
        });
    
        // Echo task bounded - pendant to the actor.
        let (tx, mut rx) = mpsc::channel::<(u32, oneshot::Sender<u32>)>(10);
        task::spawn(async move {
            while let Some((msg, tx)) = rx.recv().await {
                tx.send(msg).unwrap();
            }
        });
        
        c.bench_function("plain_sync_messages_bounded", |b| {
            b.to_async(&rt).iter(|| async {
                let (reply_tx, reply_rx) = oneshot::channel();
                tx.send((0, reply_tx)).await.unwrap();
                reply_rx.await.unwrap();
            });
        });
    }
    
    criterion_group! {
        name = benches;
        config = Criterion::default();
        targets = actor, plain
    }
    
    criterion_main!(benches);
    

    Question: Did I do something wrong here?

    Output:

    actor_sync_messages     time:   [750.69 ns 756.38 ns 763.39 ns]
                            change: [+0.1536% +2.0887% +4.0079%] (p = 0.03 < 0.05)
                            Change within noise threshold.
    Found 14 outliers among 100 measurements (14.00%)
      3 (3.00%) low mild
      6 (6.00%) high mild
      5 (5.00%) high severe
    
    plain_sync_messages_unbounded
                            time:   [270.36 ns 271.76 ns 273.30 ns]
                            change: [+4.3599% +5.3530% +6.4111%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 14 outliers among 100 measurements (14.00%)
      2 (2.00%) low mild
      10 (10.00%) high mild
      2 (2.00%) high severe
    
    plain_sync_messages_bounded
                            time:   [309.61 ns 312.05 ns 314.86 ns]
                            change: [-3.6089% -1.9707% -0.2564%] (p = 0.02 < 0.05)
                            Change within noise threshold.
    Found 7 outliers among 100 measurements (7.00%)
      5 (5.00%) high mild
      2 (2.00%) high severe
    

    The difference is quite huge (~2.5). Clearly everything in between you and the raw channel costs performance but I wouldn't have expect that. I didn't examine async messages.

    I'm evaluation this all here because we're thinking about using it in some networking application and thoughput matters.

    Let me know if I missed something or there's a systematic error.

    cheers,

    @flxo

    opened by flxo 1
Releases(v0.9.0)
  • v0.9.0(Jun 25, 2024)

    Added

    • BREAKING: Add support for bounded/unbounded mailboxes (#29) #29
    • Add Send + 'static bounds to Reply trait </>
    • Add pubsub actor (#31) </>
    • Add support for async pool factory functions (#33) #33
    • Add async spawn_with function (#34) #34

    Changed

    • BREAKING: Return SendError from send methods allowing replies to be received blocking (#27) #27

    Fixed

    • Buffered messages not being applied correctly (#32) #32

    Misc

    • Update CHANGELOG.md </>
    • Move crates out of nested crates dir </>

    See the full CHANGELOG.md

    Source code(tar.gz)
    Source code(zip)
  • v0.8.1(May 24, 2024)

  • v0.8.0(Apr 19, 2024)

    • @liutaon made their first contribution in #21

    Added

    • Allow ActorPool itself to be spawned as an actor </>
    • Add SendError::flatten method </>
    • Implement internal buffering whilst actor is starting up </>

    Changed

    • BREAKING: Use StreamMessage enum instead of trait </>
    • BREAKING: Use Display implementation for handler errors </>

    Removed

    • Remove Sync requirement from Reply macro #21

    Fixed

    • is_alive returning the opposite value </>

    Documentation

    • Add example to Reply trait code docs </>

    Misc

    • Add CHANGELOG.md </>
    • Update cliff.toml </>
    • Add newline for new contributors in cliff config </>

    See the full CHANGELOG.md

    Source code(tar.gz)
    Source code(zip)
  • v0.7.0(Apr 15, 2024)

    Added

    • BREAKING: Add values to StreamMessage::on_start and StreamMessage::on_finish </>
    • Add support for actor generics in messages macro </>
    • Add stream messages to forward messages from a stream to an actor </>

    Removed

    • BREAKING: Remove stateless actors </>

    Misc

    • Remove unused dependency trait-variant </>
    • Add overhead benchmark </>
    • Remove commented stateless actor code </>
    • Add git cliff integration </>
    • Add CHANGELOG.md </>

    See the full CHANGELOG.md

    Source code(tar.gz)
    Source code(zip)
  • v0.6.0(Apr 15, 2024)

    Added

    • BREAKING: Add delegated reply with context type </>

    Changed

    • BREAKING: Move all types to separate modules and improve documentation </>

    Removed

    • BREAKING: Remove Spawn trait and use spawn functions </>

    Documentation

    • Improve docs for spawn functions </>
    • Add note to Actor derive macro </>
    • Add missing Context param from docs </>

    See the full CHANGELOG.md

    Source code(tar.gz)
    Source code(zip)
Owner
Ari Seyhun
i like to code n stuff
Ari Seyhun
Simple crate that wraps a tokio::process into a tokio::stream

tokio-process-stream tokio-process-stream is a simple crate that wraps a tokio::process into a tokio::stream Having a stream interface to processes is

Leandro Lisboa Penz 8 Sep 13, 2022
Cassandra DB native client written in Rust language. Find 1.x versions on https://github.com/AlexPikalov/cdrs/tree/v.1.x Looking for an async version? - Check WIP https://github.com/AlexPikalov/cdrs-async

CDRS CDRS is looking for maintainers CDRS is Apache Cassandra driver written in pure Rust. ?? Looking for an async version? async-std https://github.c

Alex Pikalov 338 Jan 1, 2023
a tokio-enabled data store for triple data

terminusdb-store, a tokio-enabled data store for triple data Overview This library implements a way to store triple data - data that consists of a sub

TerminusDB 307 Dec 18, 2022
Asyncronous Rust Mysql driver based on Tokio.

mysql-async Tokio based asynchronous MySql client library for rust programming language. Installation Library hosted on crates.io. [dependencies] mysq

Anatoly I 292 Dec 30, 2022
A tokio-uring backed runtime for Rust

tokio-uring A proof-of-concept runtime backed by io-uring while maintaining compatibility with the Tokio ecosystem. This is a proof of concept and not

Tokio 726 Jan 4, 2023
Incomplete Redis client and server implementation using Tokio - for learning purposes only

mini-redis mini-redis is an incomplete, idiomatic implementation of a Redis client and server built with Tokio. The intent of this project is to provi

Tokio 2.3k Jan 4, 2023
Thin wrapper around [`tokio::process`] to make it streamable

process-stream Wraps tokio::process::Command to future::stream. Install process-stream = "0.2.2" Example usage: From Vec<String> or Vec<&str> use proc

null 4 Jun 25, 2022
AsyncRead/AsyncWrite interface for rustls-on-Tokio

rustls-tokio-stream rustls-tokio-stream is a Rust crate that provides an AsyncRead/AsyncWrite interface for rustls. Examples Create a server and clien

Deno 7 May 17, 2023
Awesome books, tutorials, courses, and resources for the Tokio asynchronous runtime ecosystem. ⚡

Awesome Tokio Tokio is an asynchronous runtime for the Rust programming language. It provides the building blocks needed for writing network applicati

Marcus Cvjeticanin 59 Oct 27, 2023
🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, SQLite, and MSSQL.

SQLx ?? The Rust SQL Toolkit Install | Usage | Docs Built with ❤️ by The LaunchBadge team SQLx is an async, pure Rust† SQL crate featuring compile-tim

launchbadge 7.6k Dec 31, 2022
TDS 7.2+ (mssql / Microsoft SQL Server) async driver for rust

Tiberius A native Microsoft SQL Server (TDS) client for Rust. Supported SQL Server versions Version Support level Notes 2019 Tested on CI 2017 Tested

Prisma 189 Dec 25, 2022
Simple, async embedded Rust

Cntrlr - Simple, asynchronous embedded Cntrlr is an all-in-one embedded platform for writing simple asynchronous applications on top of common hobbyis

Branan Riley 11 Jun 3, 2021
An async executor based on the Win32 thread pool API

wae An async executor based on the Win32 thread pool API use futures::channel::oneshot; #[wae::main] async fn main() { let (tx, rx) = oneshot::ch

Raphaël Thériault 10 Dec 10, 2021
Async positioned I/O with io_uring.

uring-positioned-io Fully asynchronized positioned I/O with io_uring. Basic Usage let files = vec![File::open("test.txt").unwrap()]; let context = Uri

Alex Chi 30 Aug 24, 2022
An Async SDR Runtime for Heterogeneous Architectures

FutureSDR An experimental asynchronous SDR runtime for heterogeneous architectures that is: Extensible: custom buffers (supporting accelerators like G

FutureSDR 169 Jan 8, 2023
Rust async runtime based on io-uring.

Monoio A thread-per-core Rust runtime with io_uring. 中文说明 Design Goal As a runtime based on io_uring, Monoio is designed to be the most efficient and

Bytedance Inc. 2.4k Jan 6, 2023
🐚 An async & dynamic ORM for Rust

SeaORM ?? An async & dynamic ORM for Rust SeaORM SeaORM is a relational ORM to help you build web services in Rust with the familiarity of dynamic lan

SeaQL 3.5k Jan 6, 2023
Quick Pool: High Performance Rust Async Resource Pool

Quick Pool High Performance Rust Async Resource Pool Usage DBCP Database Backend Adapter Version PostgreSQL tokio-postgres qp-postgres Example use asy

Seungjae Park 13 Aug 23, 2022
Macros that allow for implicit await in your async code.

suspend fn Disclaimer: this was mostly made as a proof of concept for the proposal below. I haven't tested if there is a performance cost to this macr

null 6 Dec 22, 2021