Primitives.org.ai

Observability

Monitor, trace, and debug workflow execution

Observability

Gain visibility into workflow execution with built-in logging, tracing, metrics, and debugging tools.

Logging

Built-in Logger

import { workflow, step } from 'ai-workflows'

const myWorkflow = workflow({
  name: 'observable-workflow',
  execute: async (ctx, input) => {
    ctx.log.info('Starting workflow', { input })

    const result = await step('process', () => process(input))
    ctx.log.debug('Processing complete', { result })

    if (result.warnings.length > 0) {
      ctx.log.warn('Completed with warnings', { warnings: result.warnings })
    }

    return result
  },
})

Log Levels

ctx.log.debug('Detailed debugging info')
ctx.log.info('General information')
ctx.log.warn('Warning conditions')
ctx.log.error('Error conditions')

Structured Logging

ctx.log.info('Order processed', {
  orderId: order.id,
  customerId: order.customerId,
  amount: order.total,
  items: order.items.length,
})

Tracing

Automatic Traces

Every workflow execution generates traces automatically:

const execution = await myWorkflow.run(input)

// Access trace
console.log(execution.traceId)  // 'trace_abc123'

Custom Spans

const workflow = workflow({
  execute: async (ctx, data) => {
    // Create custom span
    await ctx.trace.span('custom-operation', async (span) => {
      span.setAttribute('data.size', data.length)

      const result = await processData(data)

      span.setAttribute('result.count', result.count)
      return result
    })
  },
})

Trace Context

// Pass trace context to external services
await step('external-call', async () => {
  const response = await fetch(url, {
    headers: {
      'traceparent': ctx.trace.getTraceParent(),
    },
  })
  return response.json()
})

Metrics

Built-in Metrics

Workflows automatically emit metrics:

  • workflow.started - Workflow executions started
  • workflow.completed - Successful completions
  • workflow.failed - Failed executions
  • workflow.duration - Execution duration histogram
  • step.duration - Individual step durations
  • step.retries - Retry counts

Custom Metrics

import { metrics } from 'ai-workflows'

const workflow = workflow({
  execute: async (ctx, order) => {
    // Counter
    metrics.increment('orders.processed')

    // Gauge
    metrics.gauge('orders.pending', await getPendingCount())

    // Histogram
    metrics.histogram('order.value', order.total)

    // With labels
    metrics.increment('orders.by_status', {
      status: order.status,
      region: order.region,
    })
  },
})

Workflow History

Execution History

// Get workflow execution
const execution = await workflow.get('wf_123')

// View all steps
const steps = execution.getSteps()
steps.forEach((step) => {
  console.log(`${step.name}: ${step.status}`)
  console.log(`  Started: ${step.startedAt}`)
  console.log(`  Completed: ${step.completedAt}`)
  console.log(`  Duration: ${step.duration}ms`)
  console.log(`  Attempts: ${step.attempts}`)
})

Step Details

const stepDetail = execution.getStep('payment')

console.log(stepDetail.input)    // Input to the step
console.log(stepDetail.output)   // Step result
console.log(stepDetail.error)    // Error if failed
console.log(stepDetail.logs)     // Logs from this step

Events

Workflow Events

import { events } from 'ai-workflows'

// Subscribe to workflow events
events.on('workflow.started', (event) => {
  console.log(`Workflow ${event.workflowName} started: ${event.id}`)
})

events.on('workflow.completed', (event) => {
  console.log(`Workflow ${event.id} completed in ${event.duration}ms`)
})

events.on('workflow.failed', (event) => {
  console.error(`Workflow ${event.id} failed:`, event.error)
})

Step Events

events.on('step.started', (event) => {
  console.log(`Step ${event.stepName} started`)
})

events.on('step.completed', (event) => {
  console.log(`Step ${event.stepName} completed: ${event.duration}ms`)
})

events.on('step.retrying', (event) => {
  console.warn(`Step ${event.stepName} retrying (attempt ${event.attempt})`)
})

Debugging

Dry Run

Test workflows without side effects:

const result = await myWorkflow.dryRun(input, {
  mockSteps: {
    'payment': { success: true, transactionId: 'mock_123' },
    'send-email': { sent: true },
  },
})

console.log(result.steps)  // All steps that would execute
console.log(result.output) // Expected output

Step-by-Step Execution

const debugger = await myWorkflow.debug(input)

// Execute step by step
await debugger.nextStep()
console.log(debugger.currentStep)

await debugger.nextStep()
console.log(debugger.state)

// Continue to end
await debugger.continue()

Replay

Replay a completed workflow:

const execution = await workflow.get('wf_123')

// Replay with same input
const replayed = await execution.replay()

// Replay with modified input
const modified = await execution.replay({
  input: { ...execution.input, amount: 200 },
})

Dashboard Integration

Export to OpenTelemetry

import { configureExporter } from 'ai-workflows'

configureExporter({
  type: 'otlp',
  endpoint: 'http://collector:4317',
  serviceName: 'my-workflows',
})

Export to DataDog

configureExporter({
  type: 'datadog',
  apiKey: process.env.DD_API_KEY,
  site: 'datadoghq.com',
})

Custom Exporter

configureExporter({
  type: 'custom',
  export: async (spans, metrics, logs) => {
    await myCustomBackend.send({ spans, metrics, logs })
  },
})

Health Checks

import { health } from 'ai-workflows'

// Check workflow engine health
const status = await health.check()

console.log(status.healthy)        // true/false
console.log(status.queueDepth)     // Pending workflows
console.log(status.activeWorkers)  // Running workers
console.log(status.latency)        // Processing latency

Alerting

import { alerts } from 'ai-workflows'

// Alert on high failure rate
alerts.create({
  name: 'high-failure-rate',
  condition: 'workflow.failed.rate > 0.05',  // 5% failure rate
  window: '5m',
  notify: ['ops@company.com'],
})

// Alert on slow execution
alerts.create({
  name: 'slow-execution',
  condition: 'workflow.duration.p99 > 60000',  // P99 > 1 minute
  workflow: 'order-processing',
  notify: ['ops@company.com'],
})

Best Practices

  1. Log meaningful context - Include IDs and relevant data
  2. Use structured logging - Enable querying and filtering
  3. Set up alerts early - Don't wait for production issues
  4. Trace across services - Propagate trace context
  5. Monitor key metrics - Track duration, throughput, and errors
  6. Review failed executions - Learn from failures
Was this page helpful?

On this page