Hi,
Given a single view composed by two aggregates roots, Vendor and Product:
pub struct VendorView {
pub id: String,
pub name: String,
pub products: Vec<VendorViewProduct>,
}
pub struct VendorViewProduct {
pub id: String,
pub name: String,
pub price: u32,
}
where id
is the aggregate ID of each aggregate root, and the actual view_id is VendorView.id
copied from the Vendor. This view stores all products of a single vendor. The Product aggregate has a vendor_id
field to reference the Vendor.
I also wondered whether Product should not be a aggregate root at all, but in many scenarios/commands the Product itself is the root to be referenced, with its own stock, prices and so on. So it seems for this case the model is easier if Vendor and Products are separate aggregates.
To dispatch events to this view from separate CQRS instances of each aggregate we implement both View<Vendor>
and View<Product>
for VendorView. However we have no way to pass this vendor_id
as view_id
down to the Query because:
pub trait Query<A: Aggregate>: Send + Sync {
async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<A>]);
}
// cqrs.rs
impl<A, ES> CqrsFramework<A, ES> {
pub async fn execute_with_metadata(...) -> Result<(), AggregateError<A::Error>> {
let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
// here the aggregate_id is the Product's. Ok
let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
// .....
for processor in &self.queries {
let dispatch_events = committed_events.as_slice();
// for most queries, specially the ones exclusive to the Product, this is OK
// but for VendorView this is not ok since Project.id != VendorView.id
processor.dispatch(aggregate_id, dispatch_events).await;
}
}
}
One workaround I ended up doing was:
pub trait Query<A: Aggregate>: Send + Sync {
async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<A>], secondary_id: Option<&str>);
}
pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send {
fn secondary_id(&self) -> Option<String> {
None
}
}
impl Aggregate for Product {
fn secondary_id(&self) -> Option<String> {
Some(self.vendor_id.clone())
}
}
impl<A, ES> CqrsFramework<A, ES> {
pub async fn execute_with_metadata(...) -> Result<(), AggregateError<A::Error>> {
// ..... I had to load_aggregate again to get the aggregate with the newly applied event
let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
let aggregate = aggregate_context.aggregate();
let secondary_id = aggregate.secondary_id();
for processor in &self.queries {
let dispatch_events = committed_events.as_slice();
processor
.dispatch(aggregate_id, dispatch_events, secondary_id.as_deref())
.await;
}
}
}
Thanks!