Task Queue
Priority-based task queuing and worker management
Task Queue
The task queue manages work distribution, prioritization, and worker assignment for efficient task execution.
Basic Queue Operations
import { taskQueue, createTaskQueue } from 'digital-tasks'
// Use global queue
await taskQueue.add(task)
// Or create isolated queue
const myQueue = createTaskQueue({ name: 'my-queue' })
await myQueue.add(task)Adding Tasks
// Add single task
await taskQueue.add(task)
// Add multiple tasks
await taskQueue.addBatch([task1, task2, task3])
// Add with options
await taskQueue.add(task, {
priority: 'high',
delayUntil: new Date('2024-12-01'),
})Getting Next Task
// Get next task for a worker
const worker = { type: 'agent', id: 'agent_1', name: 'Worker' }
const next = await taskQueue.getNextForWorker(worker)
if (next) {
console.log(`Starting task: ${next.name}`)
await startTask(next.id, worker)
}Worker Capabilities
// Worker with specific capabilities
const worker = {
type: 'agent',
id: 'agent_1',
name: 'Specialized Agent',
capabilities: ['code', 'generative'], // Can handle these function types
}
// Queue matches tasks to worker capabilities
const next = await taskQueue.getNextForWorker(worker)
// Only returns tasks the worker can handleClaiming Tasks
// Claim a specific task
const claimed = await taskQueue.claim(taskId, worker)
if (claimed) {
console.log('Task claimed successfully')
} else {
console.log('Task already claimed by another worker')
}Releasing Tasks
// Release task back to queue (e.g., worker going offline)
await taskQueue.release(taskId)
// Release with reason
await taskQueue.release(taskId, {
reason: 'Worker shutting down',
requeue: true, // Put back in queue vs leave unassigned
})Query Tasks
// Query with filters
const results = await taskQueue.query({
status: ['queued', 'in_progress'],
priority: ['urgent', 'critical'],
functionType: ['generative'],
assignedTo: { type: 'agent', id: 'agent_1' },
createdAfter: new Date('2024-01-01'),
createdBefore: new Date('2024-12-31'),
})
// Sort results
const sorted = await taskQueue.query({
status: ['queued'],
sortBy: 'priority', // 'priority' | 'createdAt' | 'scheduledFor'
sortOrder: 'desc', // 'asc' | 'desc'
limit: 10,
offset: 0,
})Queue Statistics
const stats = await taskQueue.stats()
console.log(stats)
// {
// total: 100,
// byStatus: {
// queued: 50,
// in_progress: 20,
// completed: 25,
// failed: 5,
// },
// byPriority: {
// critical: 5,
// urgent: 10,
// high: 20,
// normal: 50,
// low: 15,
// },
// byFunctionType: {
// code: 30,
// generative: 40,
// agentic: 20,
// human: 10,
// },
// avgWaitTime: 30000, // ms
// avgProcessingTime: 60000, // ms
// }Priority Queue
Tasks are processed by priority:
// Critical tasks processed first
await taskQueue.add(normalTask)
await taskQueue.add(criticalTask)
await taskQueue.add(urgentTask)
// getNextForWorker returns in priority order:
// 1. criticalTask
// 2. urgentTask
// 3. normalTaskPriority Escalation
// Automatically escalate priority based on wait time
const queue = createTaskQueue({
name: 'escalating-queue',
escalation: {
enabled: true,
rules: [
{ waitTime: 5 * 60 * 1000, escalateTo: 'high' }, // 5 min -> high
{ waitTime: 15 * 60 * 1000, escalateTo: 'urgent' }, // 15 min -> urgent
{ waitTime: 30 * 60 * 1000, escalateTo: 'critical' }, // 30 min -> critical
],
},
})Fair Scheduling
Ensure all task types get processed:
const queue = createTaskQueue({
name: 'fair-queue',
scheduling: {
strategy: 'fair',
weights: {
code: 1,
generative: 2, // Process more generative tasks
agentic: 1,
human: 0.5, // Human tasks less frequently auto-assigned
},
},
})Dead Letter Queue
Handle tasks that repeatedly fail:
const queue = createTaskQueue({
name: 'main-queue',
deadLetter: {
enabled: true,
maxRetries: 3,
destination: 'failed-tasks', // Move to this queue after max retries
},
})
// Check dead letter queue
const failed = await deadLetterQueue.query({ status: ['failed'] })Worker Pool
Manage multiple workers:
import { createWorkerPool } from 'digital-tasks'
const pool = createWorkerPool({
queue: taskQueue,
workers: [
{ type: 'agent', id: 'agent_1', capabilities: ['code'] },
{ type: 'agent', id: 'agent_2', capabilities: ['generative'] },
{ type: 'agent', id: 'agent_3', capabilities: ['agentic'] },
],
concurrency: 3, // Max concurrent tasks per worker
})
// Start processing
await pool.start()
// Stop gracefully
await pool.stop({ waitForCurrent: true })Rate Limiting
Control task processing rate:
const queue = createTaskQueue({
name: 'rate-limited',
rateLimit: {
tasksPerSecond: 10,
burstSize: 20,
},
})Queue Events
taskQueue.on('task:added', (task) => {
console.log(`Task added: ${task.id}`)
})
taskQueue.on('task:claimed', ({ task, worker }) => {
console.log(`Task ${task.id} claimed by ${worker.name}`)
})
taskQueue.on('task:completed', (task) => {
console.log(`Task ${task.id} completed`)
})
taskQueue.on('task:failed', ({ task, error }) => {
console.log(`Task ${task.id} failed: ${error}`)
})
taskQueue.on('queue:empty', () => {
console.log('Queue is empty')
})Batch Processing
// Process tasks in batches
const batch = await taskQueue.getBatch({
size: 10,
worker,
timeout: 5000, // Wait up to 5s for batch to fill
})
// Process all tasks
const results = await Promise.all(
batch.map(task => processTask(task))
)
// Complete all
await taskQueue.completeBatch(
batch.map((task, i) => ({ taskId: task.id, result: results[i] }))
)Isolation and Namespacing
// Create isolated queues for different purposes
const userQueue = createTaskQueue({ name: 'user-tasks' })
const systemQueue = createTaskQueue({ name: 'system-tasks' })
const batchQueue = createTaskQueue({ name: 'batch-jobs' })
// Each queue is independent
await userQueue.add(userTask)
await systemQueue.add(systemTask)Best Practices
- Set appropriate priorities - Reserve critical/urgent for truly time-sensitive work
- Use worker capabilities - Match tasks to appropriate workers
- Monitor queue depth - Alert on growing backlogs
- Configure dead letter - Don't lose failed tasks
- Use rate limiting - Protect downstream services
- Implement fair scheduling - Prevent task type starvation
Was this page helpful?