Build a read model across multiple aggregates
Some read models can’t be written by r.projection(def) against a
single entity. A “revenue per customer” number aggregates events from
invoices, refunds, and credit notes — three streams. A
“components-with-active-incidents” view crosses component and incident
streams. For these you use r.multiStreamProjection.
Prerequisites
Section titled “Prerequisites”- You’ve read Events and projections so the difference between single-stream and multi-stream projections is clear.
- The events you want to aggregate are already declared with
r.defineEventsomewhere.
The code
Section titled “The code”Define the projection’s table, then the projection itself:
import { integer, table as pgTable, text, uuid } from "@cosmicdrift/kumiko-framework/db";
export const customerRevenueTable = pgTable("mn_customer_revenue", { customerId: uuid("customer_id").primaryKey(), tenantId: uuid("tenant_id").notNull(), totalCents: integer("total_cents").notNull().default(0),});// In feature body:import { sql } from "drizzle-orm";
r.multiStreamProjection({ name: "customer-revenue", table: customerRevenueTable, apply: { "invoice:event:paid": async (event, tx) => { const p = event.payload as { customerId: string; amountCents: number }; await tx.insert(customerRevenueTable) .values({ customerId: p.customerId, tenantId: event.tenantId, totalCents: p.amountCents, }) .onConflictDoUpdate({ target: customerRevenueTable.customerId, set: { totalCents: sql`${customerRevenueTable.totalCents} + ${p.amountCents}` }, }); }, "invoice:event:refunded": async (event, tx) => { const p = event.payload as { customerId: string; amountCents: number }; await tx.update(customerRevenueTable) .set({ totalCents: sql`${customerRevenueTable.totalCents} - ${p.amountCents}` }) .where(sql`customer_id = ${p.customerId}`); }, },});How it differs from single-stream
Section titled “How it differs from single-stream”r.projection | r.multiStreamProjection | |
|---|---|---|
| Lifecycle | inline (write transaction) | async (event-dispatcher cursor) |
| Source | one entity | event-type-driven, cross-aggregate |
| Delivery | exactly-once (TX-atomic) | at-least-once, ordered per MSP |
| Latency | zero (same TX) | TCP roundtrip via LISTEN/NOTIFY |
A multi-stream projection runs after the transaction commits and
catches up via a cursor. If the process crashes, the cursor records
where it stopped; on restart the projection replays from there. The
apply function therefore must be idempotent — onConflictDoUpdate
or update … where, not blind insert.
Side-effect-only projections
Section titled “Side-effect-only projections”You can omit the table: field entirely. A table-less multi-stream
projection is the canonical place for “send a webhook on every paid
invoice” — the apply function does its work, the cursor records
progress, and there is no read model to query:
r.multiStreamProjection({ name: "stripe-webhooks", apply: { "invoice:event:paid": async (event, _tx) => { await fetch(STRIPE_WEBHOOK_URL, { method: "POST", body: JSON.stringify(event.payload), }); }, },});This replaces the classic pub/sub subscriber pattern: the cursor gives you ordering, retry, and deduplication out of the box.
Common gotchas
Section titled “Common gotchas”event.tenantIdis the source of truth for scoping. Always carry it into the rows you write — otherwise you leak across tenants in the read model.- Apply keys must match registered events. A typo like
"invoice:event:payd"is a boot error, not a silent no-op. - Rebuild is not automatic for MSPs. Schema changes to the
projection table are manual:
TRUNCATE+ reset the cursor + let the dispatcher catch up. Single-stream projections have akumiko project rebuildcommand; multi-stream does not yet.
Live example
Section titled “Live example”A revenue-per-customer projection fed by paid events from the
invoice stream, using onConflictDoUpdate to stay idempotent:
.where(sql`${invoiceDetailTable["invoiceId"]} = ${event.aggregateId}`); }, }, });
// Multi-stream projection: one row per customer. Fires ASYNC via the // event-dispatcher (runOnce in tests, NOTIFY/LISTEN in production). r.multiStreamProjection({ name: "customer-revenue", table: customerRevenueTable, apply: { [paid.name]: async (event, tx) => { // Resolve the customer: pull it from the invoice-detail projection // that the inline projection just populated. Cross-projection reads // are fine inside apply() — we're at the read model layer. const [detail] = await tx .select() .from(invoiceDetailTable) .where(sql`${invoiceDetailTable["invoiceId"]} = ${event.aggregateId}`); if (!detail) return; const p = typedPayload(event, paid); await tx .insert(customerRevenueTable) .values({ customer: detail["customer"], tenantId: event.tenantId, paidInvoices: 1, totalCents: p.amountCents, }) .onConflictDoUpdate({ target: customerRevenueTable["customer"], set: { paidInvoices: sql`${customerRevenueTable["paidInvoices"]} + 1`, totalCents: sql`${customerRevenueTable["totalCents"]} + ${p.amountCents}`, }, }); }, }, });
const invoiceExecutor = createEventStoreExecutor(invoiceTable, invoiceEntity, { entityName: "showcase-invoice", });
// --- Write handlers ---
r.writeHandler( "invoice:create",Full source: samples/recipes/event-sourcing.
See also
Section titled “See also”- Events and projections — single-stream vs. multi-stream contracts in detail.