Primitives.org.ai

Steps

Define and execute durable workflow steps

Steps

Steps are the building blocks of durable workflows. Each step is persisted, enabling automatic recovery and exactly-once execution guarantees.

Defining Steps

import { workflow, step } from 'ai-workflows'

const myWorkflow = workflow({
  name: 'process-order',
  execute: async (ctx, order) => {
    // Each step is durably persisted
    const validated = await step('validate', () =>
      validateOrder(order)
    )

    const payment = await step('charge', () =>
      chargePayment(order.paymentMethod, validated.total)
    )

    const shipped = await step('ship', () =>
      createShipment(order.items, order.address)
    )

    return { validated, payment, shipped }
  },
})

Step Options

Configure retry behavior, timeouts, and more:

await step('risky-operation', () => callExternalAPI(data), {
  // Retry configuration
  retry: {
    maxAttempts: 3,
    backoff: 'exponential',
    initialDelay: 1000,
    maxDelay: 30000,
  },

  // Timeout
  timeout: '30s',

  // Idempotency key for deduplication
  idempotencyKey: `op-${data.id}`,
})

Step Results

Steps automatically cache their results:

const myWorkflow = workflow({
  execute: async (ctx, input) => {
    // First execution: runs the function
    const result = await step('expensive', () => expensiveOperation())

    // If workflow restarts, uses cached result
    // The function won't run again
  },
})

Conditional Steps

Execute steps conditionally:

const result = await step('validate', () => validate(data))

if (result.needsReview) {
  // This step only runs when needed
  await step('human-review', () =>
    humanTask({ type: 'review', data })
  )
}

Step Metadata

Access step execution details:

const result = await step('process', async () => {
  return processData(data)
})

// Step metadata is available
console.log(ctx.steps['process'].startedAt)
console.log(ctx.steps['process'].completedAt)
console.log(ctx.steps['process'].duration)
console.log(ctx.steps['process'].attempts)

Compensating Steps

Handle failures with compensating transactions:

const myWorkflow = workflow({
  execute: async (ctx, order) => {
    const payment = await step('charge', () =>
      chargePayment(order)
    )

    try {
      await step('fulfill', () => fulfillOrder(order))
    } catch (error) {
      // Compensate by refunding
      await step('refund', () =>
        refundPayment(payment.transactionId)
      )
      throw error
    }
  },
})

Step Types

Sync Steps

await step('sync', () => {
  return computeValue(data)  // Synchronous
})

Async Steps

await step('async', async () => {
  return await fetchData(url)  // Asynchronous
})

Human Steps

await step('approval', () =>
  humanTask({
    type: 'approve',
    assignTo: 'manager',
    data: { request },
  })
)

AI Steps

await step('analyze', () =>
  generateContent({
    prompt: 'Analyze this data',
    data: input,
  })
)

Best Practices

  1. Keep steps atomic - Each step should do one thing
  2. Use descriptive names - Names appear in logs and UI
  3. Handle failures explicitly - Use try/catch for compensating actions
  4. Avoid side effects in retries - Ensure operations are idempotent
Was this page helpful?

On this page