Primitives.org.ai

Task Lifecycle

Managing task states, progress, and completion

Task Lifecycle

Tasks progress through a well-defined lifecycle from creation to completion, with full tracking and event history.

Task States

type TaskStatus =
  | 'pending'      // Created but not queued
  | 'scheduled'    // Waiting for scheduled time
  | 'queued'       // Ready to be worked on
  | 'assigned'     // Assigned to a worker
  | 'in_progress'  // Actively being worked on
  | 'blocked'      // Waiting on dependencies
  | 'review'       // Awaiting review/approval
  | 'completed'    // Successfully finished
  | 'failed'       // Failed with error
  | 'cancelled'    // Manually cancelled

Status Flow

┌──────────┐    ┌──────────┐    ┌─────────────┐    ┌───────────┐
│  pending │ -> │  queued  │ -> │ in_progress │ -> │ completed │
└──────────┘    └──────────┘    └─────────────┘    └───────────┘
     │               │                 │                  │
     v               v                 v                  v
┌──────────┐    ┌─────────┐      ┌──────────┐      ┌──────────┐
│ scheduled│    │ blocked │      │  failed  │      │ cancelled│
└──────────┘    └─────────┘      └──────────┘      └──────────┘

Creating Tasks

import { createTask } from 'digital-tasks'

const task = await createTask({
  function: {
    type: 'generative',
    name: 'summarize',
    description: 'Summarize a document',
    args: { text: 'The text to summarize' },
    output: 'string',
    promptTemplate: 'Summarize: {{text}}',
  },
  input: { text: 'Long article content...' },
  priority: 'high',
})

console.log(task.id)      // task_1234567890_abc123
console.log(task.status)  // 'queued'

Starting Tasks

import { startTask } from 'digital-tasks'

// Start task with worker assignment
await startTask(task.id, {
  type: 'agent',
  id: 'agent_1',
  name: 'Worker Agent',
})

// Task is now in_progress
const updated = await getTask(task.id)
console.log(updated.status)  // 'in_progress'
console.log(updated.assignedTo)  // { type: 'agent', id: 'agent_1', ... }

Progress Updates

import { updateProgress } from 'digital-tasks'

// Update with percentage
await updateProgress(task.id, 25, 'Loading data')
await updateProgress(task.id, 50, 'Processing')
await updateProgress(task.id, 75, 'Generating output')

// Access progress info
const task = await getTask(task.id)
console.log(task.progress)
// {
//   percent: 75,
//   currentStep: 'Generating output',
//   estimatedTimeRemaining: 30000
// }

Completing Tasks

import { completeTask } from 'digital-tasks'

// Complete with result
await completeTask(task.id, {
  summary: 'The article discusses...',
  wordCount: 150,
})

const completed = await getTask(task.id)
console.log(completed.status)  // 'completed'
console.log(completed.result)  // { output: { summary: '...', wordCount: 150 } }

Failing Tasks

import { failTask } from 'digital-tasks'

// Fail with error message
await failTask(task.id, 'API rate limit exceeded')

// Fail with structured error
await failTask(task.id, {
  code: 'RATE_LIMIT',
  message: 'API rate limit exceeded',
  retryAfter: 60000,
})

const failed = await getTask(task.id)
console.log(failed.status)  // 'failed'
console.log(failed.result?.error)  // { code: 'RATE_LIMIT', ... }

Cancelling Tasks

import { cancelTask } from 'digital-tasks'

// Cancel with reason
await cancelTask(task.id, 'No longer needed')

const cancelled = await getTask(task.id)
console.log(cancelled.status)  // 'cancelled'

Retrying Tasks

import { retryTask } from 'digital-tasks'

// Retry a failed task
const retried = await retryTask(failedTask.id)
console.log(retried.status)  // 'queued'
console.log(retried.metadata.retryCount)  // 1

// Retry with modified input
const retried2 = await retryTask(failedTask.id, {
  input: { text: 'Modified input...' },
})

Event History

Tasks maintain a complete event history:

const task = await getTask(taskId)

console.log(task.events)
// [
//   { type: 'created', timestamp: '...', message: 'Task created' },
//   { type: 'queued', timestamp: '...' },
//   { type: 'assigned', timestamp: '...', actor: { type: 'agent', ... } },
//   { type: 'started', timestamp: '...', actor: { type: 'agent', ... } },
//   { type: 'progress', timestamp: '...', data: { percent: 50 } },
//   { type: 'completed', timestamp: '...' },
// ]

Adding Comments

import { addComment } from 'digital-tasks'

await addComment(task.id, 'Found an edge case, handling it', worker)

// Comment appears in events
const task = await getTask(task.id)
const comments = task.events.filter(e => e.type === 'comment')

Task Priority

type TaskPriority = 'low' | 'normal' | 'high' | 'urgent' | 'critical'

// Create with priority
const urgentTask = await createTask({
  function: func,
  priority: 'urgent',
})

// Update priority
import { updateTask } from 'digital-tasks'

await updateTask(task.id, { priority: 'critical' })

Priority Order

const priorityOrder = {
  critical: 5,  // Do immediately
  urgent: 4,    // Do next
  high: 3,      // Important
  normal: 2,    // Default
  low: 1,       // When time permits
}

Scheduled Tasks

// Schedule for future execution
const scheduled = await createTask({
  function: func,
  scheduledFor: new Date('2024-12-31T00:00:00Z'),
})

console.log(scheduled.status)  // 'scheduled'

// Task will automatically move to 'queued' at scheduled time

Task Metadata

const task = await getTask(taskId)

console.log(task.metadata)
// {
//   createdAt: '2024-01-15T10:00:00Z',
//   startedAt: '2024-01-15T10:05:00Z',
//   completedAt: '2024-01-15T10:10:00Z',
//   duration: 300000,  // 5 minutes in ms
//   retryCount: 0,
//   executionCount: 1,
// }

Waiting for Completion

import { waitForTask } from 'digital-tasks'

// Wait with timeout
const result = await waitForTask(task.id, {
  timeout: 5 * 60 * 1000,  // 5 minutes
  pollInterval: 1000,       // Check every second
})

if (result.success) {
  console.log('Output:', result.output)
} else {
  console.log('Error:', result.error)
}

Bulk Operations

import { bulkUpdateStatus, bulkCancel } from 'digital-tasks'

// Update multiple tasks
await bulkUpdateStatus(['task_1', 'task_2', 'task_3'], 'queued')

// Cancel multiple tasks
await bulkCancel(['task_4', 'task_5'], 'Sprint cancelled')

State Transitions

Valid state transitions are enforced:

// Valid transitions
pending -> queued, scheduled, cancelled
scheduled -> queued, cancelled
queued -> assigned, blocked, cancelled
assigned -> in_progress, queued, cancelled
in_progress -> completed, failed, review, cancelled
blocked -> queued, cancelled
review -> completed, in_progress, cancelled

// Invalid transitions throw errors
await completeTask(pendingTask.id, result)  // Error: Cannot complete pending task
Was this page helpful?

On this page