Primitives.org.ai

Parallel Execution

Run steps concurrently for better performance

Parallel Execution

Execute multiple independent operations concurrently to improve workflow performance.

Basic Parallel Execution

import { workflow, step, parallel } from 'ai-workflows'

const orderWorkflow = workflow({
  name: 'order-processing',
  execute: async (ctx, order) => {
    // Run independent steps in parallel
    const [inventory, payment, shipping] = await parallel([
      step('check-inventory', () => checkInventory(order.items)),
      step('validate-payment', () => validatePayment(order.payment)),
      step('calculate-shipping', () => calculateShipping(order.address)),
    ])

    // Continue with results
    await step('create-order', () =>
      createOrder({ inventory, payment, shipping })
    )
  },
})

Named Parallel Steps

const results = await parallel({
  user: step('fetch-user', () => getUser(userId)),
  orders: step('fetch-orders', () => getOrders(userId)),
  preferences: step('fetch-preferences', () => getPreferences(userId)),
})

// Access by name
console.log(results.user)
console.log(results.orders)
console.log(results.preferences)

Parallel with Limits

Control concurrency to avoid overwhelming resources:

// Maximum 3 concurrent operations
const results = await parallel(
  items.map((item) => step(`process-${item.id}`, () => processItem(item))),
  { concurrency: 3 }
)

Error Handling in Parallel

Fail Fast (Default)

try {
  // If any step fails, all are cancelled
  await parallel([
    step('a', () => operationA()),
    step('b', () => operationB()),
    step('c', () => operationC()),
  ])
} catch (error) {
  // First error is thrown
  console.error('Parallel execution failed:', error)
}

Settle All

Continue even if some steps fail:

const results = await parallel(
  [
    step('a', () => operationA()),
    step('b', () => operationB()),
    step('c', () => operationC()),
  ],
  { settle: true }
)

// Check results individually
results.forEach((result, index) => {
  if (result.status === 'fulfilled') {
    console.log(`Step ${index} succeeded:`, result.value)
  } else {
    console.log(`Step ${index} failed:`, result.reason)
  }
})

Race Execution

Return as soon as one step completes:

import { race } from 'ai-workflows'

const fastest = await race([
  step('primary', () => fetchFromPrimary(data)),
  step('secondary', () => fetchFromSecondary(data)),
  step('cache', () => fetchFromCache(data)),
])

Parallel Workflows

Run entire workflows in parallel:

const processUsers = workflow({
  execute: async (ctx, userIds) => {
    // Process all users concurrently
    const results = await parallel(
      userIds.map((id) =>
        ctx.invoke(processUserWorkflow, { userId: id })
      ),
      { concurrency: 10 }
    )

    return { processed: results.length }
  },
})

Fan-Out / Fan-In Pattern

const mapReduceWorkflow = workflow({
  execute: async (ctx, data) => {
    // Fan-out: Split work across parallel steps
    const chunks = splitIntoChunks(data, 10)

    const processed = await parallel(
      chunks.map((chunk, i) =>
        step(`process-chunk-${i}`, () => processChunk(chunk))
      ),
      { concurrency: 5 }
    )

    // Fan-in: Combine results
    const combined = await step('combine', () =>
      combineResults(processed)
    )

    return combined
  },
})

Conditional Parallel

const conditionalWorkflow = workflow({
  execute: async (ctx, order) => {
    const tasks = []

    // Add tasks based on conditions
    if (order.requiresApproval) {
      tasks.push(step('approval', () => requestApproval(order)))
    }

    if (order.isInternational) {
      tasks.push(step('customs', () => prepareCustomsForms(order)))
    }

    tasks.push(step('inventory', () => reserveInventory(order)))

    // Execute whatever tasks were added
    await parallel(tasks)
  },
})

Timeout for Parallel

const results = await parallel(
  [
    step('slow', () => slowOperation()),
    step('fast', () => fastOperation()),
  ],
  {
    timeout: '30s',
    timeoutValue: 'partial', // Return completed results on timeout
  }
)

Nested Parallel

const complexWorkflow = workflow({
  execute: async (ctx, data) => {
    const results = await parallel([
      // First parallel group
      parallel([
        step('a1', () => opA1()),
        step('a2', () => opA2()),
      ]),

      // Second parallel group
      parallel([
        step('b1', () => opB1()),
        step('b2', () => opB2()),
        step('b3', () => opB3()),
      ]),
    ])

    return results
  },
})

Best Practices

  1. Independent operations only - Only parallelize steps with no dependencies
  2. Set concurrency limits - Avoid overwhelming external services
  3. Handle partial failures - Use settle: true when appropriate
  4. Consider resource usage - Parallel execution uses more memory
  5. Use timeouts - Prevent hanging on slow parallel operations
Was this page helpful?

On this page