Workflow
A workflow is anything that reacts to events. Stateless or stateful, one trigger or many, no timeouts or seven — same primitive, three sophistication levels:
| Sophistication | What it does | What you write |
|---|---|---|
| Observer / translator | One event in → side effects out (publish, dispatch, enqueue) | on(Event, handler) — no data, no states |
| Reaction | One event in → side effects + control flow | on(Event, handler) returning a state (rare) |
| Saga | Multiple events correlated by key, state machine, timeouts, retries | full closure: data, states, top-level + state-scoped on() |
EventStorming readers: this is the purple sticky and the saga in one declaration. Whether the framework spins up an XState machine or a plain dispatch table is a runtime decision based on what you declared.
Why one primitive, not three
Reactions, translators, and sagas all answer the same question — "what happens after this event?". The split into three primitives was artifactual. By unifying them:
- Workflows grow without rewriting. A stateless
on(StationCreated, …)that becomes "wait for activation within 7 days, else expire" stays in the same file as the same primitive. - Studio renders one diagram. No special case for translators vs reactions vs sagas — every workflow has inputs, transitions, outputs.
- One mental model.
on(Event, …)everywhere. Closure shape mirrorsdefineActor.
Shape — stateless
The common case. No state, just effects.
import { defineWorkflow } from "@nwire/forge";
export const onWashComplete = defineWorkflow("on-wash-complete",
({ on, publish, enqueue }) => {
on(CarWasWashed, async (event) => {
await publish(WashRecorded_v1({
washId: event.washId,
stationId: event.stationId,
car: event.car,
status: event.status,
washedAt: event.washedAt,
}));
await enqueue(normalizeTraffic({ stationId: event.stationId }));
});
},
);No data, no states block. The framework detects this and uses a plain async dispatch — no XState instance, no state tracking, near-zero overhead.
Shape — multi-input translator
The pattern where N events fan into one integration event:
export const onCatalogChanged = defineWorkflow("on-catalog-changed",
({ on, publish }) => {
on(StationWasCreated, async (e) => {
await publish(StationCatalogChanged_v1({
stationId: e.stationId, change: "created",
name: e.name, location: e.location, at: e.createdAt,
}));
});
on(StationWasUpdated, async (e) => {
await publish(StationCatalogChanged_v1({
stationId: e.stationId, change: "updated", at: e.updatedAt,
}));
});
on(StationWasDeleted, async (e) => {
await publish(StationCatalogChanged_v1({
stationId: e.stationId, change: "deleted", at: e.deletedAt,
}));
});
},
);Three independent on() declarations, one workflow name, one Studio node.
Shape — saga (stateful)
When the workflow needs to remember things and react differently based on what it's seen so far:
export const subscriptionRenewal = defineWorkflow("subscription-renewal",
({ data, states, on, assign, send, publish, schedule, complete }) => {
const { notified, retrying, paid, suspended } = states;
const PaymentOverdue = timeout("payment-overdue", "7d");
const RetryOverdue = timeout("retry-overdue", "2d");
// ── Always-active: PaymentConfirmed wins in any state ──────────────
on(PaymentConfirmed, async () => {
await send(activateSubscription({ subscriptionId: data.subscriptionId }));
await publish(SubscriptionRenewed({ subscriptionId: data.subscriptionId }));
return paid;
});
// ── Entry: kicks off the flow, lands in `notified` ─────────────────
on(RenewalDue, async (e) => {
await assign({ subscriptionId: e.subscriptionId, amount: e.amount, attempts: 0 });
await send(sendRenewalNotice({ ...e }));
await schedule(PaymentOverdue);
return notified;
});
// ── State body = onEnter + state-scoped subscriptions ──────────────
notified(() => {
on(PaymentFailed, () => retrying);
on(PaymentOverdue, () => retrying);
});
retrying(async () => {
if (data.attempts >= 3) {
await send(suspendSubscription({ subscriptionId: data.subscriptionId }));
await publish(SubscriptionSuspended({ subscriptionId: data.subscriptionId }));
return suspended;
}
await assign({ attempts: data.attempts + 1 });
await send(sendPaymentReminder({ subscriptionId: data.subscriptionId }));
await schedule(RetryOverdue);
on(PaymentFailed, () => retrying);
on(RetryOverdue, () => retrying);
});
// ── Built-in: fires when entering ANY final state ──────────────────
on(complete, async () => {
await publish(RenewalFinished({ subscriptionId: data.subscriptionId }));
});
},
{
correlate: (map) => {
map(RenewalDue, (e) => e.subscriptionId);
map(PaymentConfirmed, (e) => e.subscriptionId);
map(PaymentFailed, (e) => e.subscriptionId);
},
data: z.object({
subscriptionId: z.string(),
amount: z.number(),
attempts: z.number().default(0),
}),
states: {
notified: {},
retrying: {},
paid: { final: true },
suspended: { final: true },
},
},
);The four rules
1. on() is context-aware
Same primitive at both levels — its scope is determined by where you call it:
Where you call on() | Scope |
|---|---|
| Top level of the workflow closure | Always-active. Fires in any state. |
Inside state(() => {…}) | State-scoped. Fires only while in that state. |
2. Overlap resolves like XState — state-scoped wins
If an event has both a top-level handler AND a state-scoped one for the current state, only the state-scoped handler runs. The top-level handler is shadowed. This matches XState's "most-specific match wins" walk up the hierarchy. Only one transition fires per event — there's no stacking.
If you need cross-cutting behavior (audit log on every PaymentFailed regardless of state), put it inside the handler or use telemetry — don't duplicate on() declarations.
3. State body is the onEnter
Calling notified(() => { … }) registers the closure body as the entry effect for that state. Anything inside runs when the workflow transitions INTO notified — assign, send, publish, schedule, and on() registrations that scope subscriptions to this state.
4. Return value = transition
A handler that returns a state from states transitions the workflow there. Returning nothing keeps the current state. Returning a final: true state ends the workflow and fires the complete event.
on(PaymentConfirmed, async () => {
// …
return paid; // transition + complete fires
});
on(SomeEvent, async () => {
// …
// implicit return = stay put
});The verbs
Workflows share most verbs with actors plus a few of their own:
| Verb | Where | Purpose |
|---|---|---|
on(Event, handler, opts?) | top-level + state body | Subscribe to an event |
assign({...}) | inside any handler | Mutate data (in a saga) |
send(action) | inside any handler | Cross-module: deliver to module, it decides |
execute(action) | inside module workflows only | Sync dispatch within this module |
enqueue(action) | inside module workflows only | Async push to queue within this module |
publish(event) | inside any handler | Emit integration event |
schedule(timeout) | inside any handler (saga only) | Start a named timeout |
complete | as event name in on(complete, …) | Subscribe to "workflow finished" |
Stateless vs stateful — what changes
Sticking the boundary on real configuration:
- No
dataschema declared → noassign()available, no state persistence. The framework runs a plain async dispatch table. Faster, simpler, no XState overhead. datadeclared ORstatesdeclared → XState machine. State transitions, timer scheduling, correlation by key, replay on restart.
You don't choose; the framework infers from what your closure uses.
complete — the finalization event
Every workflow can subscribe to complete — a framework-emitted pseudo-event that fires when the workflow enters any state marked final: true. Use it for publishing completion events, releasing resources, or logging the outcome.
For stateless workflows, complete fires after every successful on() handler invocation. Equivalent to "after each event handled."
on(complete, async () => {
await publish(RenewalFinished({ subscriptionId: data.subscriptionId }));
});opts — operational concerns
The third arg to on() carries operational metadata, mirroring defineAction:
on(PaymentFailed, async () => { … }, {
in: [notified, retrying], // state filter (default: all states)
retry: { max: 3, backoff: "exponential" },
dlq: "renewal-dlq",
idempotencyKey: (e) => `renewal-${e.subscriptionId}-${e.attempt}`,
});| Key | Purpose |
|---|---|
in: states[] | Equivalent to placing the on() inside each state's body. Lets you scope without nesting. |
retry | Handler-body retry policy (network calls, downstream actions). |
dlq | Dead-letter target on permanent failure. |
idempotencyKey | Pure function of the event for deduplication. |
Module vs app workflows
Workflows can live at two scopes:
| Permission | Module workflow (in defineModule) | App workflow (in defineApp) |
|---|---|---|
on(domain events) | ✅ | ❌ |
on(integration events) | ✅ | ✅ |
execute(internal actions) | ✅ | ❌ |
enqueue(internal actions) | ✅ | ❌ |
send(public actions) | ✅ | ✅ |
publish(integration events) | ✅ | ✅ |
Scope is determined by registration, not by annotation. You put the workflow where it belongs; the framework enforces the rules.
Workflow vs actor
Both share the closure shape (data, states, assign, transitions via return). The distinction:
| Actor | Workflow | |
|---|---|---|
| Triggered by | Methods called from action handlers | Events |
| State key | Aggregate id (one actor instance per id) | Correlation key (one workflow instance per saga key) |
| Emits | Domain events (recordThat) | Domain events via publish, no recordThat |
| Lifecycle | "Permanent" — bound to entity | Transient — ends in a final state |
If the thing you're modeling has an identity that persists (the station, the submission), it's probably an actor. If it's a process that runs to completion (the renewal flow, the wash-recorded translation), it's a workflow.
Testing
describe("subscription-renewal", () => {
it("suspends after 3 retries", async () => {
const h = await harness({ app: testApp });
await h.emit(RenewalDue, { subscriptionId: "sub_1", amount: 99 });
await h.idle();
await h.advanceTime("7d"); await h.idle();
await h.advanceTime("2d"); await h.idle();
await h.advanceTime("2d"); await h.idle();
await h.advanceTime("2d"); await h.idle();
const wf = h.workflow("subscription-renewal", "sub_1");
expect(wf.state).toBe("suspended");
expect(wf.data.attempts).toBe(3);
});
});See also
defineWorkflowreference- Actor — the sibling primitive for entities with identity
- Action — what workflows dispatch
- Event — what workflows subscribe to and publish