Primitives.org.ai

Workflow

Orchestrate complex processes with durable execution or state machines

Orchestrate complex processes that span multiple functions, systems, and time horizons with two complementary paradigms: durable execution for code-first workflows and state machines for explicit state management.

Durable Execution

Write workflows as regular code. The runtime handles persistence, retries, and recovery automatically:

import { workflow, step } from 'ai-workflows'

const onboardCustomer = workflow({
  name: 'customer-onboarding',
  execute: async (ctx, customer) => {
    // Each step is durably persisted
    const account = await step('create-account', () =>
      createAccount(customer)
    )

    // If this fails, workflow resumes from here on retry
    await step('send-welcome-email', () =>
      sendEmail(customer.email, welcomeTemplate)
    )

    // Human task - workflow pauses until completed
    const kycResult = await step('kyc-verification', () =>
      humanTask({
        type: 'kyc-review',
        assignTo: 'compliance-team',
        data: { customer, account },
      })
    )

    if (kycResult.approved) {
      await step('activate-account', () =>
        activateAccount(account.id)
      )
    }

    return { account, kycResult }
  },
})

Durability Guarantees

  • Exactly-once execution: Each step runs exactly once, even across failures
  • Automatic state persistence: Workflow state survives process restarts
  • Transparent recovery: Workflows resume from the last completed step
  • Long-running support: Workflows can run for hours, days, or weeks

Waiting and Timers

const subscriptionWorkflow = workflow({
  name: 'subscription-lifecycle',
  execute: async (ctx, subscription) => {
    await step('activate', () => activateSubscription(subscription))

    // Wait for 30 days - workflow suspends, no resources consumed
    await ctx.sleep('30d')

    await step('send-renewal-reminder', () =>
      sendRenewalReminder(subscription)
    )

    // Wait for external event or timeout
    const renewed = await ctx.waitForEvent('subscription.renewed', {
      timeout: '7d',
      timeoutValue: false,
    })

    if (!renewed) {
      await step('expire-subscription', () =>
        expireSubscription(subscription)
      )
    }
  },
})

State Machines

For complex business logic with explicit states and transitions, use state machines:

import { stateMachine, state, transition } from 'ai-workflows'

const orderMachine = stateMachine({
  name: 'order-lifecycle',
  initial: 'pending',
  context: { orderId: '', items: [], total: 0 },

  states: {
    pending: state({
      on: {
        PAYMENT_RECEIVED: transition('processing', {
          guard: (ctx, event) => event.amount >= ctx.total,
          action: (ctx) => ctx.paidAt = new Date(),
        }),
        CANCEL: transition('cancelled'),
      },
    }),

    processing: state({
      onEnter: async (ctx) => {
        await notifyWarehouse(ctx.orderId)
      },
      on: {
        SHIPPED: transition('shipped'),
        OUT_OF_STOCK: transition('backorder'),
      },
    }),

    shipped: state({
      onEnter: async (ctx) => {
        await sendShippingNotification(ctx.orderId)
      },
      on: {
        DELIVERED: transition('completed'),
        RETURN_REQUESTED: transition('returning'),
      },
    }),

    completed: state({ type: 'final' }),
    cancelled: state({ type: 'final' }),
  },
})

State Machine Benefits

  • Explicit states: All possible states are declared upfront
  • Guarded transitions: Conditions that must be met for transitions
  • Entry/exit actions: Code that runs when entering or leaving states
  • Visualizable: Generate diagrams from state machine definitions
  • Auditable: Complete history of states and transitions

Orchestrating AI and Human Work

Workflows seamlessly coordinate all function types:

const contentPipeline = workflow({
  name: 'content-creation',
  execute: async (ctx, brief) => {
    // AI generates initial draft
    const draft = await step('generate-draft', () =>
      generateContent({ type: 'agentic', brief })
    )

    // Human reviews and edits
    const reviewed = await step('human-review', () =>
      humanTask({
        type: 'content-review',
        assignTo: 'content-team',
        data: { draft, brief },
        ui: 'content-editor',
      })
    )

    // AI polishes based on human feedback
    const final = await step('polish', () =>
      polishContent({ type: 'generative', content: reviewed })
    )

    // Code function publishes
    await step('publish', () =>
      publishContent(final)
    )

    return final
  },
})

Parallel Execution

Run steps in parallel when they're independent:

const enrichCustomer = workflow({
  name: 'enrich-customer',
  execute: async (ctx, customer) => {
    // Run all enrichment in parallel
    const [credit, social, company] = await Promise.all([
      step('credit-check', () => checkCredit(customer)),
      step('social-lookup', () => lookupSocial(customer)),
      step('company-info', () => getCompanyInfo(customer.company)),
    ])

    return { ...customer, credit, social, company }
  },
})

Error Handling

const robustWorkflow = workflow({
  name: 'robust-process',
  execute: async (ctx, input) => {
    try {
      await step('risky-operation', () => riskyOperation(input), {
        retry: { maxAttempts: 3, backoff: 'exponential' },
      })
    } catch (error) {
      // Compensating transaction
      await step('rollback', () => rollback(input))
      throw error
    }
  },
})

Observability

Every workflow execution is fully observable:

const execution = await onboardCustomer.start(customer)

console.log(execution.status)    // 'running' | 'completed' | 'failed' | 'waiting'
console.log(execution.steps)     // List of completed steps with timing
console.log(execution.events)    // Events received during execution
console.log(execution.waitingOn) // What the workflow is waiting for
Was this page helpful?

On this page