Primitives.org.ai

Task Queue

Priority-based task queuing and worker management

Task Queue

The task queue manages work distribution, prioritization, and worker assignment for efficient task execution.

Basic Queue Operations

import { taskQueue, createTaskQueue } from 'digital-tasks'

// Use global queue
await taskQueue.add(task)

// Or create isolated queue
const myQueue = createTaskQueue({ name: 'my-queue' })
await myQueue.add(task)

Adding Tasks

// Add single task
await taskQueue.add(task)

// Add multiple tasks
await taskQueue.addBatch([task1, task2, task3])

// Add with options
await taskQueue.add(task, {
  priority: 'high',
  delayUntil: new Date('2024-12-01'),
})

Getting Next Task

// Get next task for a worker
const worker = { type: 'agent', id: 'agent_1', name: 'Worker' }
const next = await taskQueue.getNextForWorker(worker)

if (next) {
  console.log(`Starting task: ${next.name}`)
  await startTask(next.id, worker)
}

Worker Capabilities

// Worker with specific capabilities
const worker = {
  type: 'agent',
  id: 'agent_1',
  name: 'Specialized Agent',
  capabilities: ['code', 'generative'],  // Can handle these function types
}

// Queue matches tasks to worker capabilities
const next = await taskQueue.getNextForWorker(worker)
// Only returns tasks the worker can handle

Claiming Tasks

// Claim a specific task
const claimed = await taskQueue.claim(taskId, worker)

if (claimed) {
  console.log('Task claimed successfully')
} else {
  console.log('Task already claimed by another worker')
}

Releasing Tasks

// Release task back to queue (e.g., worker going offline)
await taskQueue.release(taskId)

// Release with reason
await taskQueue.release(taskId, {
  reason: 'Worker shutting down',
  requeue: true,  // Put back in queue vs leave unassigned
})

Query Tasks

// Query with filters
const results = await taskQueue.query({
  status: ['queued', 'in_progress'],
  priority: ['urgent', 'critical'],
  functionType: ['generative'],
  assignedTo: { type: 'agent', id: 'agent_1' },
  createdAfter: new Date('2024-01-01'),
  createdBefore: new Date('2024-12-31'),
})

// Sort results
const sorted = await taskQueue.query({
  status: ['queued'],
  sortBy: 'priority',    // 'priority' | 'createdAt' | 'scheduledFor'
  sortOrder: 'desc',     // 'asc' | 'desc'
  limit: 10,
  offset: 0,
})

Queue Statistics

const stats = await taskQueue.stats()

console.log(stats)
// {
//   total: 100,
//   byStatus: {
//     queued: 50,
//     in_progress: 20,
//     completed: 25,
//     failed: 5,
//   },
//   byPriority: {
//     critical: 5,
//     urgent: 10,
//     high: 20,
//     normal: 50,
//     low: 15,
//   },
//   byFunctionType: {
//     code: 30,
//     generative: 40,
//     agentic: 20,
//     human: 10,
//   },
//   avgWaitTime: 30000,      // ms
//   avgProcessingTime: 60000, // ms
// }

Priority Queue

Tasks are processed by priority:

// Critical tasks processed first
await taskQueue.add(normalTask)
await taskQueue.add(criticalTask)
await taskQueue.add(urgentTask)

// getNextForWorker returns in priority order:
// 1. criticalTask
// 2. urgentTask
// 3. normalTask

Priority Escalation

// Automatically escalate priority based on wait time
const queue = createTaskQueue({
  name: 'escalating-queue',
  escalation: {
    enabled: true,
    rules: [
      { waitTime: 5 * 60 * 1000, escalateTo: 'high' },    // 5 min -> high
      { waitTime: 15 * 60 * 1000, escalateTo: 'urgent' }, // 15 min -> urgent
      { waitTime: 30 * 60 * 1000, escalateTo: 'critical' }, // 30 min -> critical
    ],
  },
})

Fair Scheduling

Ensure all task types get processed:

const queue = createTaskQueue({
  name: 'fair-queue',
  scheduling: {
    strategy: 'fair',
    weights: {
      code: 1,
      generative: 2,    // Process more generative tasks
      agentic: 1,
      human: 0.5,       // Human tasks less frequently auto-assigned
    },
  },
})

Dead Letter Queue

Handle tasks that repeatedly fail:

const queue = createTaskQueue({
  name: 'main-queue',
  deadLetter: {
    enabled: true,
    maxRetries: 3,
    destination: 'failed-tasks',  // Move to this queue after max retries
  },
})

// Check dead letter queue
const failed = await deadLetterQueue.query({ status: ['failed'] })

Worker Pool

Manage multiple workers:

import { createWorkerPool } from 'digital-tasks'

const pool = createWorkerPool({
  queue: taskQueue,
  workers: [
    { type: 'agent', id: 'agent_1', capabilities: ['code'] },
    { type: 'agent', id: 'agent_2', capabilities: ['generative'] },
    { type: 'agent', id: 'agent_3', capabilities: ['agentic'] },
  ],
  concurrency: 3,  // Max concurrent tasks per worker
})

// Start processing
await pool.start()

// Stop gracefully
await pool.stop({ waitForCurrent: true })

Rate Limiting

Control task processing rate:

const queue = createTaskQueue({
  name: 'rate-limited',
  rateLimit: {
    tasksPerSecond: 10,
    burstSize: 20,
  },
})

Queue Events

taskQueue.on('task:added', (task) => {
  console.log(`Task added: ${task.id}`)
})

taskQueue.on('task:claimed', ({ task, worker }) => {
  console.log(`Task ${task.id} claimed by ${worker.name}`)
})

taskQueue.on('task:completed', (task) => {
  console.log(`Task ${task.id} completed`)
})

taskQueue.on('task:failed', ({ task, error }) => {
  console.log(`Task ${task.id} failed: ${error}`)
})

taskQueue.on('queue:empty', () => {
  console.log('Queue is empty')
})

Batch Processing

// Process tasks in batches
const batch = await taskQueue.getBatch({
  size: 10,
  worker,
  timeout: 5000,  // Wait up to 5s for batch to fill
})

// Process all tasks
const results = await Promise.all(
  batch.map(task => processTask(task))
)

// Complete all
await taskQueue.completeBatch(
  batch.map((task, i) => ({ taskId: task.id, result: results[i] }))
)

Isolation and Namespacing

// Create isolated queues for different purposes
const userQueue = createTaskQueue({ name: 'user-tasks' })
const systemQueue = createTaskQueue({ name: 'system-tasks' })
const batchQueue = createTaskQueue({ name: 'batch-jobs' })

// Each queue is independent
await userQueue.add(userTask)
await systemQueue.add(systemTask)

Best Practices

  1. Set appropriate priorities - Reserve critical/urgent for truly time-sensitive work
  2. Use worker capabilities - Match tasks to appropriate workers
  3. Monitor queue depth - Alert on growing backlogs
  4. Configure dead letter - Don't lose failed tasks
  5. Use rate limiting - Protect downstream services
  6. Implement fair scheduling - Prevent task type starvation
Was this page helpful?

On this page