Skip to content

Build a read model across multiple aggregates

Some read models can’t be written by r.projection(def) against a single entity. A “revenue per customer” number aggregates events from invoices, refunds, and credit notes — three streams. A “components-with-active-incidents” view crosses component and incident streams. For these you use r.multiStreamProjection.

  • You’ve read Events and projections so the difference between single-stream and multi-stream projections is clear.
  • The events you want to aggregate are already declared with r.defineEvent somewhere.

Define the projection’s table, then the projection itself:

src/features/billing/projections/customer-revenue.ts
import { integer, table as pgTable, text, uuid } from "@cosmicdrift/kumiko-framework/db";
export const customerRevenueTable = pgTable("mn_customer_revenue", {
customerId: uuid("customer_id").primaryKey(),
tenantId: uuid("tenant_id").notNull(),
totalCents: integer("total_cents").notNull().default(0),
});
// In feature body:
import { sql } from "drizzle-orm";
r.multiStreamProjection({
name: "customer-revenue",
table: customerRevenueTable,
apply: {
"invoice:event:paid": async (event, tx) => {
const p = event.payload as { customerId: string; amountCents: number };
await tx.insert(customerRevenueTable)
.values({
customerId: p.customerId,
tenantId: event.tenantId,
totalCents: p.amountCents,
})
.onConflictDoUpdate({
target: customerRevenueTable.customerId,
set: { totalCents: sql`${customerRevenueTable.totalCents} + ${p.amountCents}` },
});
},
"invoice:event:refunded": async (event, tx) => {
const p = event.payload as { customerId: string; amountCents: number };
await tx.update(customerRevenueTable)
.set({ totalCents: sql`${customerRevenueTable.totalCents} - ${p.amountCents}` })
.where(sql`customer_id = ${p.customerId}`);
},
},
});
r.projectionr.multiStreamProjection
Lifecycleinline (write transaction)async (event-dispatcher cursor)
Sourceone entityevent-type-driven, cross-aggregate
Deliveryexactly-once (TX-atomic)at-least-once, ordered per MSP
Latencyzero (same TX)TCP roundtrip via LISTEN/NOTIFY

A multi-stream projection runs after the transaction commits and catches up via a cursor. If the process crashes, the cursor records where it stopped; on restart the projection replays from there. The apply function therefore must be idempotentonConflictDoUpdate or update … where, not blind insert.

You can omit the table: field entirely. A table-less multi-stream projection is the canonical place for “send a webhook on every paid invoice” — the apply function does its work, the cursor records progress, and there is no read model to query:

r.multiStreamProjection({
name: "stripe-webhooks",
apply: {
"invoice:event:paid": async (event, _tx) => {
await fetch(STRIPE_WEBHOOK_URL, {
method: "POST",
body: JSON.stringify(event.payload),
});
},
},
});

This replaces the classic pub/sub subscriber pattern: the cursor gives you ordering, retry, and deduplication out of the box.

  • event.tenantId is the source of truth for scoping. Always carry it into the rows you write — otherwise you leak across tenants in the read model.
  • Apply keys must match registered events. A typo like "invoice:event:payd" is a boot error, not a silent no-op.
  • Rebuild is not automatic for MSPs. Schema changes to the projection table are manual: TRUNCATE + reset the cursor + let the dispatcher catch up. Single-stream projections have a kumiko project rebuild command; multi-stream does not yet.

A revenue-per-customer projection fed by paid events from the invoice stream, using onConflictDoUpdate to stay idempotent:

.where(sql`${invoiceDetailTable["invoiceId"]} = ${event.aggregateId}`);
},
},
});
// Multi-stream projection: one row per customer. Fires ASYNC via the
// event-dispatcher (runOnce in tests, NOTIFY/LISTEN in production).
r.multiStreamProjection({
name: "customer-revenue",
table: customerRevenueTable,
apply: {
[paid.name]: async (event, tx) => {
// Resolve the customer: pull it from the invoice-detail projection
// that the inline projection just populated. Cross-projection reads
// are fine inside apply() — we're at the read model layer.
const [detail] = await tx
.select()
.from(invoiceDetailTable)
.where(sql`${invoiceDetailTable["invoiceId"]} = ${event.aggregateId}`);
if (!detail) return;
const p = typedPayload(event, paid);
await tx
.insert(customerRevenueTable)
.values({
customer: detail["customer"],
tenantId: event.tenantId,
paidInvoices: 1,
totalCents: p.amountCents,
})
.onConflictDoUpdate({
target: customerRevenueTable["customer"],
set: {
paidInvoices: sql`${customerRevenueTable["paidInvoices"]} + 1`,
totalCents: sql`${customerRevenueTable["totalCents"]} + ${p.amountCents}`,
},
});
},
},
});
const invoiceExecutor = createEventStoreExecutor(invoiceTable, invoiceEntity, {
entityName: "showcase-invoice",
});
// --- Write handlers ---
r.writeHandler(
"invoice:create",

Full source: samples/recipes/event-sourcing.