Skip to content

Workflow

A workflow is anything that reacts to events. Stateless or stateful, one trigger or many, no timeouts or seven — same primitive, three sophistication levels:

SophisticationWhat it doesWhat you write
Observer / translatorOne event in → side effects out (publish, dispatch, enqueue)on(Event, handler) — no data, no states
ReactionOne event in → side effects + control flowon(Event, handler) returning a state (rare)
SagaMultiple events correlated by key, state machine, timeouts, retriesfull closure: data, states, top-level + state-scoped on()

EventStorming readers: this is the purple sticky and the saga in one declaration. Whether the framework spins up an XState machine or a plain dispatch table is a runtime decision based on what you declared.

Why one primitive, not three

Reactions, translators, and sagas all answer the same question — "what happens after this event?". The split into three primitives was artifactual. By unifying them:

  • Workflows grow without rewriting. A stateless on(StationCreated, …) that becomes "wait for activation within 7 days, else expire" stays in the same file as the same primitive.
  • Studio renders one diagram. No special case for translators vs reactions vs sagas — every workflow has inputs, transitions, outputs.
  • One mental model. on(Event, …) everywhere. Closure shape mirrors defineActor.

Shape — stateless

The common case. No state, just effects.

ts
import { defineWorkflow } from "@nwire/forge";

export const onWashComplete = defineWorkflow("on-wash-complete",
  ({ on, publish, enqueue }) => {
    on(CarWasWashed, async (event) => {
      await publish(WashRecorded_v1({
        washId:    event.washId,
        stationId: event.stationId,
        car:       event.car,
        status:    event.status,
        washedAt:  event.washedAt,
      }));
      await enqueue(normalizeTraffic({ stationId: event.stationId }));
    });
  },
);

No data, no states block. The framework detects this and uses a plain async dispatch — no XState instance, no state tracking, near-zero overhead.

Shape — multi-input translator

The pattern where N events fan into one integration event:

ts
export const onCatalogChanged = defineWorkflow("on-catalog-changed",
  ({ on, publish }) => {

    on(StationWasCreated, async (e) => {
      await publish(StationCatalogChanged_v1({
        stationId: e.stationId, change: "created",
        name: e.name, location: e.location, at: e.createdAt,
      }));
    });

    on(StationWasUpdated, async (e) => {
      await publish(StationCatalogChanged_v1({
        stationId: e.stationId, change: "updated", at: e.updatedAt,
      }));
    });

    on(StationWasDeleted, async (e) => {
      await publish(StationCatalogChanged_v1({
        stationId: e.stationId, change: "deleted", at: e.deletedAt,
      }));
    });
  },
);

Three independent on() declarations, one workflow name, one Studio node.

Shape — saga (stateful)

When the workflow needs to remember things and react differently based on what it's seen so far:

ts
export const subscriptionRenewal = defineWorkflow("subscription-renewal",
  ({ data, states, on, assign, send, publish, schedule, complete }) => {

    const { notified, retrying, paid, suspended } = states;
    const PaymentOverdue = timeout("payment-overdue", "7d");
    const RetryOverdue   = timeout("retry-overdue", "2d");

    // ── Always-active: PaymentConfirmed wins in any state ──────────────
    on(PaymentConfirmed, async () => {
      await send(activateSubscription({ subscriptionId: data.subscriptionId }));
      await publish(SubscriptionRenewed({ subscriptionId: data.subscriptionId }));
      return paid;
    });

    // ── Entry: kicks off the flow, lands in `notified` ─────────────────
    on(RenewalDue, async (e) => {
      await assign({ subscriptionId: e.subscriptionId, amount: e.amount, attempts: 0 });
      await send(sendRenewalNotice({ ...e }));
      await schedule(PaymentOverdue);
      return notified;
    });

    // ── State body = onEnter + state-scoped subscriptions ──────────────
    notified(() => {
      on(PaymentFailed,  () => retrying);
      on(PaymentOverdue, () => retrying);
    });

    retrying(async () => {
      if (data.attempts >= 3) {
        await send(suspendSubscription({ subscriptionId: data.subscriptionId }));
        await publish(SubscriptionSuspended({ subscriptionId: data.subscriptionId }));
        return suspended;
      }
      await assign({ attempts: data.attempts + 1 });
      await send(sendPaymentReminder({ subscriptionId: data.subscriptionId }));
      await schedule(RetryOverdue);

      on(PaymentFailed, () => retrying);
      on(RetryOverdue,  () => retrying);
    });

    // ── Built-in: fires when entering ANY final state ──────────────────
    on(complete, async () => {
      await publish(RenewalFinished({ subscriptionId: data.subscriptionId }));
    });
  },
  {
    correlate: (map) => {
      map(RenewalDue,       (e) => e.subscriptionId);
      map(PaymentConfirmed, (e) => e.subscriptionId);
      map(PaymentFailed,    (e) => e.subscriptionId);
    },
    data: z.object({
      subscriptionId: z.string(),
      amount:         z.number(),
      attempts:       z.number().default(0),
    }),
    states: {
      notified:  {},
      retrying:  {},
      paid:      { final: true },
      suspended: { final: true },
    },
  },
);

The four rules

1. on() is context-aware

Same primitive at both levels — its scope is determined by where you call it:

Where you call on()Scope
Top level of the workflow closureAlways-active. Fires in any state.
Inside state(() => {…})State-scoped. Fires only while in that state.

2. Overlap resolves like XState — state-scoped wins

If an event has both a top-level handler AND a state-scoped one for the current state, only the state-scoped handler runs. The top-level handler is shadowed. This matches XState's "most-specific match wins" walk up the hierarchy. Only one transition fires per event — there's no stacking.

If you need cross-cutting behavior (audit log on every PaymentFailed regardless of state), put it inside the handler or use telemetry — don't duplicate on() declarations.

3. State body is the onEnter

Calling notified(() => { … }) registers the closure body as the entry effect for that state. Anything inside runs when the workflow transitions INTO notifiedassign, send, publish, schedule, and on() registrations that scope subscriptions to this state.

4. Return value = transition

A handler that returns a state from states transitions the workflow there. Returning nothing keeps the current state. Returning a final: true state ends the workflow and fires the complete event.

ts
on(PaymentConfirmed, async () => {
  // …
  return paid;           // transition + complete fires
});

on(SomeEvent, async () => {
  // …
  // implicit return = stay put
});

The verbs

Workflows share most verbs with actors plus a few of their own:

VerbWherePurpose
on(Event, handler, opts?)top-level + state bodySubscribe to an event
assign({...})inside any handlerMutate data (in a saga)
send(action)inside any handlerCross-module: deliver to module, it decides
execute(action)inside module workflows onlySync dispatch within this module
enqueue(action)inside module workflows onlyAsync push to queue within this module
publish(event)inside any handlerEmit integration event
schedule(timeout)inside any handler (saga only)Start a named timeout
completeas event name in on(complete, …)Subscribe to "workflow finished"

Stateless vs stateful — what changes

Sticking the boundary on real configuration:

  • No data schema declared → no assign() available, no state persistence. The framework runs a plain async dispatch table. Faster, simpler, no XState overhead.
  • data declared OR states declared → XState machine. State transitions, timer scheduling, correlation by key, replay on restart.

You don't choose; the framework infers from what your closure uses.

complete — the finalization event

Every workflow can subscribe to complete — a framework-emitted pseudo-event that fires when the workflow enters any state marked final: true. Use it for publishing completion events, releasing resources, or logging the outcome.

For stateless workflows, complete fires after every successful on() handler invocation. Equivalent to "after each event handled."

ts
on(complete, async () => {
  await publish(RenewalFinished({ subscriptionId: data.subscriptionId }));
});

opts — operational concerns

The third arg to on() carries operational metadata, mirroring defineAction:

ts
on(PaymentFailed, async () => { … }, {
  in:             [notified, retrying],   // state filter (default: all states)
  retry:          { max: 3, backoff: "exponential" },
  dlq:            "renewal-dlq",
  idempotencyKey: (e) => `renewal-${e.subscriptionId}-${e.attempt}`,
});
KeyPurpose
in: states[]Equivalent to placing the on() inside each state's body. Lets you scope without nesting.
retryHandler-body retry policy (network calls, downstream actions).
dlqDead-letter target on permanent failure.
idempotencyKeyPure function of the event for deduplication.

Module vs app workflows

Workflows can live at two scopes:

PermissionModule workflow (in defineModule)App workflow (in defineApp)
on(domain events)
on(integration events)
execute(internal actions)
enqueue(internal actions)
send(public actions)
publish(integration events)

Scope is determined by registration, not by annotation. You put the workflow where it belongs; the framework enforces the rules.

Workflow vs actor

Both share the closure shape (data, states, assign, transitions via return). The distinction:

ActorWorkflow
Triggered byMethods called from action handlersEvents
State keyAggregate id (one actor instance per id)Correlation key (one workflow instance per saga key)
EmitsDomain events (recordThat)Domain events via publish, no recordThat
Lifecycle"Permanent" — bound to entityTransient — ends in a final state

If the thing you're modeling has an identity that persists (the station, the submission), it's probably an actor. If it's a process that runs to completion (the renewal flow, the wash-recorded translation), it's a workflow.

Testing

ts
describe("subscription-renewal", () => {
  it("suspends after 3 retries", async () => {
    const h = await harness({ app: testApp });
    await h.emit(RenewalDue, { subscriptionId: "sub_1", amount: 99 });
    await h.idle();

    await h.advanceTime("7d"); await h.idle();
    await h.advanceTime("2d"); await h.idle();
    await h.advanceTime("2d"); await h.idle();
    await h.advanceTime("2d"); await h.idle();

    const wf = h.workflow("subscription-renewal", "sub_1");
    expect(wf.state).toBe("suspended");
    expect(wf.data.attempts).toBe(3);
  });
});

See also

MIT licensed.