Parallel Execution
Run steps concurrently for better performance
Parallel Execution
Execute multiple independent operations concurrently to improve workflow performance.
Basic Parallel Execution
import { workflow, step, parallel } from 'ai-workflows'
const orderWorkflow = workflow({
name: 'order-processing',
execute: async (ctx, order) => {
// Run independent steps in parallel
const [inventory, payment, shipping] = await parallel([
step('check-inventory', () => checkInventory(order.items)),
step('validate-payment', () => validatePayment(order.payment)),
step('calculate-shipping', () => calculateShipping(order.address)),
])
// Continue with results
await step('create-order', () =>
createOrder({ inventory, payment, shipping })
)
},
})Named Parallel Steps
const results = await parallel({
user: step('fetch-user', () => getUser(userId)),
orders: step('fetch-orders', () => getOrders(userId)),
preferences: step('fetch-preferences', () => getPreferences(userId)),
})
// Access by name
console.log(results.user)
console.log(results.orders)
console.log(results.preferences)Parallel with Limits
Control concurrency to avoid overwhelming resources:
// Maximum 3 concurrent operations
const results = await parallel(
items.map((item) => step(`process-${item.id}`, () => processItem(item))),
{ concurrency: 3 }
)Error Handling in Parallel
Fail Fast (Default)
try {
// If any step fails, all are cancelled
await parallel([
step('a', () => operationA()),
step('b', () => operationB()),
step('c', () => operationC()),
])
} catch (error) {
// First error is thrown
console.error('Parallel execution failed:', error)
}Settle All
Continue even if some steps fail:
const results = await parallel(
[
step('a', () => operationA()),
step('b', () => operationB()),
step('c', () => operationC()),
],
{ settle: true }
)
// Check results individually
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`Step ${index} succeeded:`, result.value)
} else {
console.log(`Step ${index} failed:`, result.reason)
}
})Race Execution
Return as soon as one step completes:
import { race } from 'ai-workflows'
const fastest = await race([
step('primary', () => fetchFromPrimary(data)),
step('secondary', () => fetchFromSecondary(data)),
step('cache', () => fetchFromCache(data)),
])Parallel Workflows
Run entire workflows in parallel:
const processUsers = workflow({
execute: async (ctx, userIds) => {
// Process all users concurrently
const results = await parallel(
userIds.map((id) =>
ctx.invoke(processUserWorkflow, { userId: id })
),
{ concurrency: 10 }
)
return { processed: results.length }
},
})Fan-Out / Fan-In Pattern
const mapReduceWorkflow = workflow({
execute: async (ctx, data) => {
// Fan-out: Split work across parallel steps
const chunks = splitIntoChunks(data, 10)
const processed = await parallel(
chunks.map((chunk, i) =>
step(`process-chunk-${i}`, () => processChunk(chunk))
),
{ concurrency: 5 }
)
// Fan-in: Combine results
const combined = await step('combine', () =>
combineResults(processed)
)
return combined
},
})Conditional Parallel
const conditionalWorkflow = workflow({
execute: async (ctx, order) => {
const tasks = []
// Add tasks based on conditions
if (order.requiresApproval) {
tasks.push(step('approval', () => requestApproval(order)))
}
if (order.isInternational) {
tasks.push(step('customs', () => prepareCustomsForms(order)))
}
tasks.push(step('inventory', () => reserveInventory(order)))
// Execute whatever tasks were added
await parallel(tasks)
},
})Timeout for Parallel
const results = await parallel(
[
step('slow', () => slowOperation()),
step('fast', () => fastOperation()),
],
{
timeout: '30s',
timeoutValue: 'partial', // Return completed results on timeout
}
)Nested Parallel
const complexWorkflow = workflow({
execute: async (ctx, data) => {
const results = await parallel([
// First parallel group
parallel([
step('a1', () => opA1()),
step('a2', () => opA2()),
]),
// Second parallel group
parallel([
step('b1', () => opB1()),
step('b2', () => opB2()),
step('b3', () => opB3()),
]),
])
return results
},
})Best Practices
- Independent operations only - Only parallelize steps with no dependencies
- Set concurrency limits - Avoid overwhelming external services
- Handle partial failures - Use
settle: truewhen appropriate - Consider resource usage - Parallel execution uses more memory
- Use timeouts - Prevent hanging on slow parallel operations
Was this page helpful?