A lightweight, opinionated CQRS and event sourcing framework targeting serverless architectures.

Overview

cqrs

A lightweight, opinionated CQRS and event sourcing framework targeting serverless architectures.

Command Query Responsibility Segregation (CQRS) is a pattern in Domain Driven Design that uses separate write and read models for application objects and interconnects them with events. Event sourcing uses the generated events as the source of truth for the state of the application.

Together these provide a number of benefits:

  • Removes coupling between tests and application logic allowing limitless refactoring.
  • Greater isolation of the aggregate.
  • Ability to create views that more accurately model our business environment.
  • A horizontally scalable read path.

Things that could be helpful:

Three backing data stores are supported:

Crates.io docs CodeBuild

Comments
  • Some questions and suggestions

    Some questions and suggestions

    First some basic suggestion: please enable Discussion for this project and/or make some Matrix channel etc. for users to hang out in, ask for help and socialize. Had it been already there, I would have just asked there. :)

    I wanted to start using "event processing" (write stuff down in a log, and then have other stuff follow that log and react to it) in my Rust projects. I did not specifically thought about ES/CQRS but I think what I wanted to achieve might fit in this model.

    Your crate looks like a solid, more informed base and I went through the book, read through the demo code and some of the source code and I think I can find myself here. I like that it's lightweight, to the point and supports Postgres. Great job.

    I basically want to write a CI/CD bot to power my fake Rust/Nix software shop. I'm thinking - I consume github webhooks as "Commands", I track aggregates like "PR" (PR, commits in it, all the pushes, all the builds).

    Q1: Are Querieseagerly and reactive? I mean - on every new event, I can expect the "simple query" to be called eagerly and in it's code I can emit any side effects(?) like starting a new CI build? Is it guaranteed? The ("it's useless in production"](https://github.com/serverlesstechnology/cqrs-demo/blob/50029766363aa0ff324ccd264bdeed532e5a236e/src/queries.rs#L12) is a bit confusing. I guess the framework itself does not keep track of which events each query processed already, so I would have to persist some "pointer" to skip emitting side effects for events I already reacted to before (some idempotency-like considerations), but otherwise that's it? BTW. You might want to consider building a concept of a "Reactor" or something that would wrap a Query and keep track of position in the event stream automatically - seems rather generic.

    Q2: How do I approach multiple event streams including splitting between multiple "microservices"? The demos are kind of simplistic and include only one aggregate. I might just need to read more literature, since it's more of an architecture question, but if you could give me a pointer or two, I would appreciate it.

    I guess I can have multiple programs using cqrs library pointing at the same DB in Postgres and one (or even more) of them writing events, and all following new events. I do realize that I can have things like CDC with Kafka/Kinesis etc. but it's kind of heavy for my tiny internal tools use, at least for now.

    Anyway, thanks for your work on this project, and sooner or later I'll figure it out and maybe post some links with results.

    opened by dpc 5
  • How to compose a single view from multiple aggregates where an aggregate references another aggregate?

    How to compose a single view from multiple aggregates where an aggregate references another aggregate?

    Hi,

    Given a single view composed by two aggregates roots, Vendor and Product:

    pub struct VendorView {
        pub id: String,
        pub name: String,
        pub products: Vec<VendorViewProduct>,
    }
    
    pub struct VendorViewProduct {
        pub id: String,
        pub name: String,
        pub price: u32,
    }
    

    where id is the aggregate ID of each aggregate root, and the actual view_id is VendorView.id copied from the Vendor. This view stores all products of a single vendor. The Product aggregate has a vendor_id field to reference the Vendor.

    I also wondered whether Product should not be a aggregate root at all, but in many scenarios/commands the Product itself is the root to be referenced, with its own stock, prices and so on. So it seems for this case the model is easier if Vendor and Products are separate aggregates.

    To dispatch events to this view from separate CQRS instances of each aggregate we implement both View<Vendor> and View<Product> for VendorView. However we have no way to pass this vendor_id as view_id down to the Query because:

    pub trait Query<A: Aggregate>: Send + Sync {
        async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<A>]);
    }
    
    // cqrs.rs
    impl<A, ES> CqrsFramework<A, ES> {
        pub async fn execute_with_metadata(...) -> Result<(), AggregateError<A::Error>> {
    let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
            // here the aggregate_id is the Product's. Ok
            let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
            // .....
            for processor in &self.queries {
                let dispatch_events = committed_events.as_slice();
                // for most queries, specially the ones exclusive to the Product, this is OK
                // but for VendorView this is not ok since Project.id != VendorView.id
                processor.dispatch(aggregate_id, dispatch_events).await;
            }
        }
    }
    

    One workaround I ended up doing was:

    pub trait Query<A: Aggregate>: Send + Sync {
        async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<A>], secondary_id: Option<&str>);
    }
    
    pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send {
        fn secondary_id(&self) -> Option<String> {
            None
        }
    }
    
    impl Aggregate for Product {
        fn secondary_id(&self) -> Option<String> {
            Some(self.vendor_id.clone())
        }
    }
    
    impl<A, ES> CqrsFramework<A, ES> {
        pub async fn execute_with_metadata(...) -> Result<(), AggregateError<A::Error>> {
            // ..... I had to load_aggregate again to get the aggregate with the newly applied event
            let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
            let aggregate = aggregate_context.aggregate();
            let secondary_id = aggregate.secondary_id();
           
            for processor in &self.queries {
                let dispatch_events = committed_events.as_slice();
                processor
                    .dispatch(aggregate_id, dispatch_events, secondary_id.as_deref())
                    .await;
            }
        }
    }
    

    Thanks!

    opened by jonaslimads 4
  • Garbage collect old events

    Garbage collect old events

    Hi,

    I've just read the book about this repo and am really excited about it!

    However I am a bit worried about the size of the event store. Is there some kind of garbage collection for old events?

    For instance let's say we have a system where only a known set of agents may write commands. For some time it makes sense to store the set of commands since due to networking latency, some commands may arrive delayed. However when we received some message from all writing agents, we may safely assume that the state of our system does not change at the point where we are sure to have received all commands to that point in time. Therefore we might calculate the state at that point, take it as new "start" and delete the events before, saving a lot of memory. Kind of like a concept called snapshots I read about, but to save storage, not computation time.

    Or do I overestimate the problem the indefinitely growing event/command logs are?

    opened by peteole 4
  • Propagating the timestamp through to the EventEnvelope

    Propagating the timestamp through to the EventEnvelope

    Hi, I'm working on some views where I want to send the traditional created_at and updated_at fields to the front end. All the data for this can be easily derived from the timestamp field, so I'd like to use it, but it isn't surfaced by the EventEnvelope. I realise that currently there is no dependency on any DateTime libraries, so this isn't a trivial change, but it seems like the alternative is to store the timestamp in each event, which is less than ideal. Do you have any other thoughts on how this can be approached?

    Thanks!

    documentation 
    opened by bennichols 4
  • Either make EventStore public or have some kind of accessor?

    Either make EventStore public or have some kind of accessor?

    Hi again! One use case I've run into recently is sometimes I want to build an aggregate without a command to send it. It might be in lieu of a query where the query would be overkill or as part of a query processor chain. In order to do this, I can call load_aggregate on the EventStore object and call the aggregate() method. Unfortunately, there is no public handle to the EventStore member in the CqrsFramework struct, so I have to create duplicate EventSource objects that I pass along to my axum routes. This works fine, but it would be a lot nicer if either the EventSource was public or I could access it somehow. What are your thoughts?

    invalid 
    opened by bennichols 3
  • Is this crate no longer maintained?

    Is this crate no longer maintained?

    I noticed that there is a CQRS-ES2 crate that is forked from this project; that project has made big updates about ownership and makes large changes, so should this project be considered unmaintained and should we instead be using that other crate?

    I guess what I'm really asking here is if the CQRS-ES2 crate should be considered the modern replacement of this one that we should actually use instead?

    I'm seeking guidance on the right CQRS crate to use for a new project.

    opened by duaneking 3
  • Possibility of queries loosing an event.

    Possibility of queries loosing an event.

    https://github.com/serverlesstechnology/cqrs/blob/86d83dc5bb773742513bd2a60e42d219dff92e58/src/cqrs.rs#L187

            let committed_events = self
                .store
                .commit(resultant_events, aggregate_context, metadata)
                .await?;
            for processor in &self.queries {
                let dispatch_events = committed_events.as_slice();
                processor.dispatch(aggregate_id, dispatch_events).await;
            }
    

    if the program crashes between committing and dispatching to queries, the queries might never see events. Is this on purpose? I would expect that to guarantee at least once delivery the dispatch should be done first, and commit later, but I'm somewhat of a noob.

    opened by dpc 2
  • About event replay and read model changes

    About event replay and read model changes

    A view payload {"name": "John"} is compatible with the view

    struct ProductView {
        name: String,
    }
    

    But it will be incompatible if we change this read model to

    struct ProductView {
        name: String,
        price: u32
    }
    

    It will then result in DeserializationError(Error("missing fieldevents", line: 0, column: 0). Is it safe to assume that we should add a deserialization fallback in case it fails or even an explicit "view upcast" (I don't even know if this is technically correct) in the same fashion as the event upcast?

    Incompatible payloads will exist from the moment the new read model is deployed until the events are replayed to rebuild the views, which can take some time to be fully replayed and updated.

    About event replay, I see that there is only PersistedEventRepository::get_events (by single aggregate_id) and ViewRepository::update_view (for single view). I forked the repos to add (it's still WIP):

    // PersistedEventRepository
       async fn get_multiple_aggregate_events<A: Aggregate>(
            &self,
            aggregate_ids: Vec<&str>,
        ) -> Result<HashMap<String, Vec<SerializedEvent>>, PersistenceError>;
        
    // ViewRepository
        async fn update_views(&self, views: Vec<(V, ViewContext)>) -> Result<(), PersistenceError>;
    

    So we can batch select and update aggregate and views. What are your thoughts?

    enhancement 
    opened by jonaslimads 2
  • Support for multiple event tables?

    Support for multiple event tables?

    Hi, thanks for all your work. I'm learning a lot about how crqs/es works using your module and applying it against a real world modelling scenario. I saw in your book that you say:

    A single table can be used for all aggregate events and this is usually the ideal setup for development and testing. For production systems it is recommended that each aggregate have a table solely dedicated to its' events.

    I looked through the code, but it appears that this isn't supported at this point unless I'm missing something. I didn't see anything else in the upcoming 0.3 commit log either, but I may have missed something. Is this in development or on the roadmap? Cheers!

    enhancement 
    opened by bennichols 2
  • Aggregate Commands may need Deserialize trait

    Aggregate Commands may need Deserialize trait

    Sending events over channels is possible because DomainEvents can be serialized and deserialized before sending and after receiving.

    However, in cases where a command is to be sent, a channel sending and receiving aggregate commands cannot generally be constructed and has to be handled on a case by case basis for each aggregate command type.

    One way to fix this is to add a constraint to the Aggregate trait such as:

    #[async_trait]
    pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send {
        /// Specifies the inbound command used to make changes in the state of the Aggregate.
        type Command: Serialize + DeserializeOwned + fmt::Debug + Sync + Send;
    ...
    }
    

    See https://github.com/serverlesstechnology/cqrs/pull/32

    opened by tavakyan 1
  • How about updating queries FROM the event store and not from the command handler ?

    How about updating queries FROM the event store and not from the command handler ?

    Hello guys

    We were PoCing around your crate when found this :

    https://github.com/serverlesstechnology/cqrs/blob/09fc47b886739d9fb34d7cc05015b05162a13f48/src/cqrs.rs#L154

    So the events are consumed to the projections from the command handler ? What if the machine stops ? What if the view database have issues ? What will happen when the service restarts ?

    opened by jrouaix 1
  • [experiment] remove upcast API

    [experiment] remove upcast API

    this is an experiment to test the approach described in #46

    removing the upcast API reduces the amount of code to maintain by around 500 lines. Would require some documentation updates to demonstrate to users how to implement upcasting using serde.

    Benefits:

    • simplify the API (smaller surface, less maintenance)
    • replaces a stringly-typed callback interface with a strongly-typed declarative approach
    • (optionally) separates internal and serialised representations of an event

    Todo:

    • [ ] update documentation to show upcast pattern using serde
    • [ ] investigate helpers/macros for reducing boilerplate in pattern
    opened by danieleades 1
  • Alternative pattern for Event upcasting

    Alternative pattern for Event upcasting

    I wanted to present an alternative pattern for event upcasting. This relies on serde's ability to deserialize untagged enum representations.

    I've used this pattern before for backwards compatibility of configuration files-

    The gist is to separate the internal representation of an event from its serialised representation. It's serialised representation is an untagged union of all historical versions of the Event. You then add an infallible conversion from the union to the current version, and let serde do the rest.

    use serde::{Deserialize, Serialize};
    
    mod legacy {
        //! Previous versions of the `Event` enum, for backwards compatibility
        use serde::{Deserialize, Serialize};
    
        #[derive(Serialize, Deserialize)]
        pub enum V1 {}
    
        #[derive(Serialize, Deserialize)]
        pub enum V2 {}
    }
    
    // This is version 3 of the 'event'
    #[derive(Serialize, Deserialize)]
    #[serde(from = "EventRep")]
    pub enum Event {}
    
    #[derive(Serialize, Deserialize)]
    #[serde(untagged)]
    enum EventRep {
        V1(legacy::V1),
        V2(legacy::V2),
        V3(Event)
    }
    
    impl From<EventRep> for Event {
        fn from(value: EventRep) -> Self {
            match value {
                EventRep::V1(_) => todo!(),
                EventRep::V2(_) => todo!(),
                EventRep::V3(event) => event,
            }
        }
    }
    

    For fallible conversions, you could also use #[serde(try_from = "EventRep"].

    Implementing upcasting this way simplifies the implementation of the framework, and removes the 'stringly' typed upcasting API in favour of a strongly-typed pattern. The downside is possibly more cognitive load on downstream users to implement this themselves and to get it right.

    Obviously this is a breaking change, but i'm interested to get your thoughts.

    I'd say it's likely to be possible to simplify some of the boilerplate with a derive macro, if such a thing doesn't already exist in the wild

    opened by danieleades 3
  • move the demo application into this repo

    move the demo application into this repo

    the demo application should probably be moved into this repo, and this repo modified to be a cargo workspace

    this ensures that this library and the demo application are updated in lockstep.

    I had a quick go at this, but ran into some subtle version issues. I suspect postgres-es would also need to be added to the workspace to prevent multiple different versions of cqrs being pulled into the dependency tree

    enhancement 
    opened by danieleades 2
  • add continuous integration workflows

    add continuous integration workflows

    adds github continuous integration workflows to check formatting and tests

    note that this PR is a prerequisite for merging #36 (since this PR prevents dependabot from inadvertently bumping the minimum compiler version for downstream users). #36 is very high-value for this crate since it is security conscious. Dependabot will automatically open PRs to address security vulnerabilities (as well as plain old outdated dependencies)

    you can see the results of these checks here - https://github.com/danieleades/cqrs/pull/1

    opened by danieleades 1
Owner
Serverless Technology
Serverless Technology
A toy event store and event messaging system.

RDeeBee Follow this blog series for more details on this project. This system is inspired by Martin Kleppman's arguments that Event Sourcing system an

null 4 Nov 6, 2022
Open-source Rust framework for building event-driven live-trading & backtesting systems

Barter Barter is an open-source Rust framework for building event-driven live-trading & backtesting systems. Algorithmic trade with the peace of mind

Barter 157 Feb 18, 2023
MinMon - an opinionated minimal monitoring and alarming tool

MinMon - an opinionated minimal monitoring and alarming tool (for Linux) This tool is just a single binary and a config file. No database, no GUI, no

Florian Wickert 177 Jan 5, 2023
A opinionated and fast static analyzer for PHP.

TLDR; A static analyzer for PHP. It helps you catch common mistakes in your PHP code. These are the current checks implemented. Extending undefined cl

Denzyl Dick 11 Mar 6, 2023
An event replay tool for the Trento storage backend.

photofinish - a little, handy tool to replay events This tiny CLI tool aims to fulfill the need to replay some events and get fixtures. Photofinish re

null 5 Nov 10, 2022
An implementation of a Windows Event Collector server running on GNU/Linux.

OpenWEC OpenWEC is a free and open source (GPLv3) implementation of a Windows Event Collector server running on GNU/Linux and written in Rust. OpenWEC

CEA IT Security 15 Jun 15, 2023
A very opinionated, zero-configuration shell prompt

A very opinionated, zero-configuration shell prompt

amy null 8 Nov 4, 2021
Beautiful, minimal, opinionated CLI prompts inspired by the Clack NPM package

Effortlessly build beautiful command-line apps with Rust ?? ✨ Beautiful, minimal, opinionated CLI prompts inspired by the @clack/prompts npm package.

Alexander Fadeev 7 Jul 23, 2023
Opinionated set of extensions for use with rust-script

rust-script-ext Opinionated set of extensions for use with rust-script. Using rust-script to run Rust like a shell script is great! This crate provide

Kurt Lawrence 13 Sep 3, 2023
Opinionated, zero-config linter for JavaScript monorepos

Sherif: Opinionated, zero-config linter for JavaScript monorepos About Sherif is an opinionated, zero-config linter for JavaScript monorepos. It runs

Tom Lienard 219 Oct 10, 2023
A simple, opinionated way to run containers for tests in your Rust project.

rustainers rustainers is a simple, opinionated way to run containers for tests. TLDR More information about this crate can be found in the crate docum

wefox 4 Nov 23, 2023
Super-lightweight Immediate-mode Embedded GUI framework, based on the awesome embedded-graphics library. Written in Rust.

Kolibri - A GUI framework made to be as lightweight as its namesake What is Kolibri? Kolibri is an embedded Immediate Mode GUI mini-framework very str

null 6 Jun 24, 2023
A lightweight and high-performance order-book designed to process level 2 and trades data. Available in Rust and Python

ninjabook A lightweight and high-performance order-book implemented in Rust, designed to process level 2 and trades data. Available in Python and Rust

Ninja Quant 134 Jul 22, 2024
A lightweight but incredibly powerful and feature-rich BitTorrent tracker. Supports UDP + HTTP(S) and a private tracker mode.

Torrust Tracker Project Description Torrust Tracker is a lightweight but incredibly powerful and feature-rich BitTorrent tracker made using Rust. Feat

Torrust 162 Dec 31, 2022
A lightweight and super fast cli todo program written in rust under 200 sloc

todo A lightweight and super fast cli todo program written in rust under 200 sloc installation AUR package: todo-bin use cargo build --release to comp

sioodmy 243 Dec 24, 2022
82 fun and easy to use, lightweight, spinners for Rust, with minimal overhead.

Spinners for Rust 82 fun and easy to use, lightweight, spinners for Rust, with minimal overhead, all the way from simple dots, to fun emoji based "spi

Juliette Cordor 2 May 17, 2022
A simple, lightweight and extensible command line argument parser for rust codebases

A simple, lightweight and extensible command line argument parser for rust codebases. This crate aims to provide you with an easy-to-use and extensibl

Victor Ndaba 20 Nov 12, 2022
Yet another lightweight and easy to use HTTP(S) server

Raptor Web server Raptor is a HTTP server written in Rust with aims to use as little memory as possible and an easy configuration. It is built on top

Volham 5 Oct 15, 2022