Skip to Content
FeaturesTask Queue & Orchestration

Task Queue & Orchestration

Purpose

Accept, route, and manage the full lifecycle of activities across all agents. Decouple activity submission from agent execution so the system handles load spikes, prioritises urgent work, retries failures, and enforces concurrency limits per agent type — all without the API layer needing to know about agent availability.

Related: Workflow Model — how activities are created | LLM Providers — adapter pattern for dispatching to agents | Scalability — queue capacity design


Technology

BullMQ (v5) backed by Redis (v7).

  • Reliable job delivery with at-least-once semantics
  • Priority queues (higher priority = dequeued first)
  • Delayed / scheduled jobs (monthly period triggers)
  • Per-queue concurrency limits
  • Automatic retry with configurable backoff
  • Dead letter queue (failed jobs that exhausted retries)
  • Job events for real-time status tracking

Queue Architecture

Queues are shared per agent role across all tenants — one queue per agent type, not one per tenant. Tenant isolation is enforced via tenantId in the job payload; workers verify it before processing. This mirrors the rag__ingestion pattern already in use.

Redis — shared queues (all tenants) ├── agent__client-researcher concurrency: variable ├── agent__competitor-researcher concurrency: variable ├── agent__context-file-writer concurrency: variable ├── agent__strategy-writer concurrency: variable ├── agent__deliverable-planner concurrency: variable ├── agent__activity-planner concurrency: variable ├── agent__keyword-researcher concurrency: variable ├── agent__content-brief-writer concurrency: variable ├── agent__site-auditor concurrency: variable ├── agent__backlink-researcher concurrency: variable ├── agent__backlink-outreach-writer concurrency: variable ├── agent__blog-writer concurrency: variable ├── agent__gbp-post-writer concurrency: variable ├── agent__social-post-writer concurrency: variable ├── agent__email-writer concurrency: variable ├── agent__landing-page-writer concurrency: variable ├── agent__google-ads-writer concurrency: variable ├── agent__meta-ads-writer concurrency: variable ├── agent__social-calendar-planner concurrency: variable ├── agent__ads-analyst concurrency: variable ├── agent__report-writer concurrency: variable ├── agent__anomaly-detector concurrency: variable ├── agent__topic-researcher concurrency: variable ├── agent__research-note-writer concurrency: variable ├── agent__review-response-writer concurrency: variable └── dead-letter concurrency: 1 (global alerting)

Why shared queues?

  • Simpler worker lifecycle — one worker process per agent role handles all tenants; no per-tenant worker spawn/teardown.
  • Matches the existing rag__ingestion and notifications__{channel} patterns already in the codebase.
  • Tenant isolation is maintained by tenantId in the job payload (verified by the worker before execution) and by BullMQ priority so high-priority jobs from any tenant are dequeued first.
  • Per-tenant concurrency limits from agent_configs.max_concurrency are enforced at enqueue time via BullMQ’s rate-limiter keyed on tenantId, not at the queue level.

Note: Concurrency limits above are worker-level totals across all tenants. Per-tenant caps are applied via the rate limiter. See Agent Hierarchy for per-plan limits.


Job Definition

All agent activity jobs share a single typed payload:

interface ActivityJobData { activityId: string; // PostgreSQL activities.id tenantId: string; // For tenant isolation verification agentRole: AgentRole; // Which agent queue this came from deliverableId: string; // Parent deliverable pipelineStep: number; // Which step in the activity template inputRef?: string; // MongoDB ObjectId of previous step's output priority: number; // 0 = normal, 1 = high, 2 = urgent // Wake reason — tells the agent why it was dispatched wakeReason: 'new_task' | 'rejection' | 'scheduled' | 'unblocked' | 'subtask_completed' | 'review_approved' | 'review_feedback'; // Populated only when wakeReason === 'rejection' rejectionRef?: string; // MongoDB ObjectId of the output that was rejected reviewerNotes?: string; // Human reviewer's written feedback // Populated only when wakeReason === 'unblocked' blockerResolution?: string; // Human's note explaining how the blocker was resolved // Populated only when wakeReason === 'subtask_completed' subtaskOutputRef?: string; // MongoDB ObjectId of the completed subtask's output subtaskName?: string; // Name of the subtask that completed // Populated only when wakeReason === 'review_approved' | 'review_feedback' reviewApproved?: boolean; // true = approved, false = feedback given reviewerFeedback?: string; // Reviewer's comments (present for both approved + feedback) }

wakeReason is threaded into the task prompt (see Prompt Construction below) so the agent understands its context without needing to infer it from prior session history.


Activity State Machine

┌─────────┐ created ──────► │ created │ └────┬────┘ │ worker picks up job ┌────▼─────┐ │ assigned │ └────┬─────┘ │ adapter dispatches ┌────▼──────────┐◄──── re-enqueued (any wakeReason) │ in_progress │ └──┬───┬───┬───┘ ┌─────────────┘ │ └──────────────────────────┐ │ ┌────┘ │ ┌────────▼──────┐ ┌──▼────┐ ┌─────────┐ ┌──────────┐ └──► failed │awaiting_ │ │ done │ │ blocked │ │awaiting_ │ /timeout │approval │ │ │ │ │ │subtask │ │ └───────┬───────┘ └───────┘ └────┬────┘ └────┬─────┘ retries? │ │ │ approved/rejected blocker subtask reassigned │ resolved completes (terminal) next step / re-enqueued re-enqueued revision activity (unblocked) (subtask_completed) ┌──────────────────┤ │ │ ┌────▼──────────┐ awaiting_ │ in_progress │ review └───────────────┘ │ approved / feedback re-enqueued (review_approved / review_feedback)

All activity statuses:

StatusSet byMeaning
createdControl planeActivity exists, not yet dispatched
assignedWorkerJob dequeued, about to dispatch to adapter
in_progressWorkerAdapter running
awaiting_approvalWorkerAgent done; predefined pipeline approval gate
awaiting_subtaskcreate_subtask toolAgent suspended; waiting for subtask to complete
awaiting_reviewrequest_human_review toolAgent suspended; waiting for ad-hoc human review
blockedreport_blocker toolSuspended; external dependency needs human action
doneWorkerAgent task completed successfully
approvedHumanHuman approval granted; pipeline advances
rejectedHumanHuman rejected; revision activity created
reassignedreassign_task toolActivity handed off; new activity created for target agent/human
failedWorkerAdapter error after all retries

State transitions are written to activities.status after every relevant event.

For human approval activities (assignee_type = 'human', is_approval_gate = true):

  • Job is not dispatched to an agent queue
  • Activity is surfaced in the DM Portal Activities Inbox
  • Status transitions on human action: created → in_progress → approved | rejected

Worker Factory

Each agent role gets a single shared BullMQ Worker. Workers are created by a factory using the shared agent__{agentRole} queue name — one queue for all tenants:

import { Worker } from 'bullmq'; function createAgentWorker(agentRole: AgentRole, concurrency: number) { const queueName = `agent__${agentRole}`; // shared across all tenants return new Worker( queueName, async (job: Job<ActivityJobData>) => { // tenantId is in the payload — worker verifies it before processing await processActivity(job.data); }, { connection: redisConnection, concurrency, limiter: { max: 10, // max 10 jobs per minute per worker (runaway protection) duration: 60_000, }, } ); }

Job Deduplication

Recurring cron-triggered jobs use a deduplication key to prevent the same work being enqueued twice if a cron fires while the previous run is still in the queue:

await queue.add( 'activity', jobData, { // Deduplication key: same activity cannot be queued twice with the same wake reason deduplication: { id: `${jobData.tenantId}:${jobData.activityId}:${jobData.wakeReason}`, }, priority: jobData.priority, } );

If a job with the same deduplication ID already exists in the waiting or active state, the new job is silently dropped. This prevents double-execution without blocking the enqueue call.

Deduplication key format: {tenantId}:{activityId}:{wakeReason} — the wakeReason is included because the same activity legitimately re-enters the queue with different wake reasons (e.g. new_taskreview_feedback).

Dedup key persistence after failure: BullMQ’s dedup key (bull:{queue}:de:{id}) is NOT cleared when a job fails. This means re-enqueueing the same key after a failure is silently dropped. Two mitigations are in place for the activity-planner:

  1. The failed event handler deletes the dedup key from Redis immediately after final failure.
  2. A 2-hour TTL on the dedup key ensures auto-expiry even if the handler doesn’t run (hard kill).

Worker Stall Handling

All agent workers set maxStalledCount: 2:

const worker = new Worker(queueName, processor, { connection: getRedisConnection(), concurrency: 1, lockDuration: 660_000, maxStalledCount: 2, // allow 2 stall-retries before giving up });

Why maxStalledCount: 2 not 0: When a worker process crashes unexpectedly (e.g. Windows DLL cold-start, OOM kill), BullMQ marks the in-progress job as “stalled”. With maxStalledCount: 0 the job moves directly to failed — bypassing the attempts/backoff retry mechanism entirely (BullMQ only retries on thrown errors, not stalls). With maxStalledCount: 2, a stalled job is moved back to waiting and re-picked by the next healthy worker, up to twice before being considered permanently failed.

Workers are spawned once at API server startup — one worker process per agent role, shared across all tenants. No per-tenant lifecycle management is needed.


Activity Processing in a Worker

async function processActivity(data: ActivityJobData): Promise<void> { // 1. Mark activity as in_progress await db.update(activities) .set({ status: 'in_progress', startedAt: new Date() }) .where(and(eq(activities.id, data.activityId), eq(activities.tenantId, data.tenantId))); // 2. Create activity_run record const [run] = await db.insert(activityRuns).values({ activityId: data.activityId, tenantId: data.tenantId, wakeReason: data.wakeReason, startedAt: new Date(), }).returning(); // 3. Resolve adapter for this tenant + agent role const agentConfig = await getAgentConfig(data.tenantId, data.agentRole); const adapter = resolveAdapter(agentConfig); // 4. Resolve skills — returns client context file (full) + manifest (metadata-only) const { contextFile, manifest } = await resolveSkillsForActivity(data.tenantId, data.agentRole); const skillsDir = await injectSkills(contextFile, manifest); // 5. Build task prompt — threads wake reason and rejection context const prompt = await buildActivityPrompt(data); // 6. Resolve the existing session for this campaign + agent role (if any) const session = await resolveSession(data.deliverableId, data.agentRole); try { // 7. Dispatch to adapter const result = await adapter.dispatch({ prompt, skillsDir, sessionId: session?.externalSessionId ?? undefined, activityRunId: run.id, wakeReason: data.wakeReason, }); // 8. Persist session state for next heartbeat await persistSession(session, result, data.deliverableId, data.agentRole); // 9. Update run record // Note: if the agent called report_blocker, status is already 'blocked' — // the tool handler sets it. The adapter returns normally but the activity // remains blocked until onBlockerResolved() re-enqueues it. const isBlocked = await isActivityBlocked(data.activityId); if (!isBlocked) { await db.update(activityRuns) .set({ status: 'completed', endedAt: new Date(), sessionId: result.sessionId }) .where(eq(activityRuns.id, run.id)); } } catch (err) { await db.update(activityRuns) .set({ status: 'failed', error: String(err), endedAt: new Date() }) .where(eq(activityRuns.id, run.id)); throw err; // BullMQ catches this and triggers retry } finally { await cleanupSkillsDir(skillsDir); } }

See also: LLM Providers for resolveAdapter() implementation and the phone-home callback handler.


Prompt Construction

buildActivityPrompt() assembles the full task prompt from the activity definition, the prior step’s output, and — critically — the wake reason context. This ensures the agent always knows why it was dispatched and what it is expected to do differently (if anything).

async function buildActivityPrompt(data: ActivityJobData): Promise<string> { const activity = await getActivity(data.activityId, data.tenantId); const input = data.inputRef ? await getActivityOutput(data.inputRef) : null; // Base task description let prompt = `# Task: ${activity.name}\n\n${activity.description}`; // Thread the prior pipeline step's output as context if (input) { prompt += `\n\n## Input from previous step\n\n${input.content}`; } // Rejection path — agent is revising work a human reviewed and rejected if (data.wakeReason === 'rejection') { if (data.rejectionRef) { const rejected = await getActivityOutput(data.rejectionRef); prompt += `\n\n## Your previous output (rejected by reviewer)\n\n${rejected.content}`; } if (data.reviewerNotes) { prompt += `\n\n## Reviewer feedback\n\n${data.reviewerNotes}`; } prompt += `\n\nRevise your output to address the reviewer's feedback above. ` + `Do not restate the feedback — produce the corrected output directly.`; } return prompt; }

Wake reason behaviour by case

wakeReasonWhat the agent receivesExpected behaviour
new_taskTask description + prior step output (if any)Produce the deliverable from scratch
rejectionTask description + prior step output + rejected output + reviewer notesRevise the rejected output per the feedback
scheduledTask description onlyExecute the recurring task with no prior context dependency
unblockedTask description + prior step output + blocker resolution noteResume; the external blocker has been resolved
subtask_completedTask description + prior step output + subtask name + subtask outputContinue using the subtask result
review_approvedTask description + prior step output + reviewer feedback (if any)Human approved — finalise and complete the task
review_feedbackTask description + prior step output + reviewer feedbackRevise based on the reviewer’s feedback and re-submit or complete

Prompt additions for the new wake reasons:

if (data.wakeReason === 'unblocked' && data.blockerResolution) { prompt += `\n\n## Blocker resolved\n\n${data.blockerResolution}`; prompt += `\n\nResume the task from where you left off.`; } if (data.wakeReason === 'subtask_completed' && data.subtaskOutputRef) { const subtaskOutput = await getActivityOutput(data.subtaskOutputRef); prompt += `\n\n## Subtask completed: ${data.subtaskName}\n\n${subtaskOutput.content}`; prompt += `\n\nContinue your task using the subtask result above.`; } if (data.wakeReason === 'review_approved') { prompt += `\n\n## Human review: Approved`; if (data.reviewerFeedback) prompt += `\n\nReviewer notes: ${data.reviewerFeedback}`; prompt += `\n\nYour work was approved. Finalise and complete the task.`; } if (data.wakeReason === 'review_feedback') { prompt += `\n\n## Human review: Feedback provided\n\n${data.reviewerFeedback}`; prompt += `\n\nRevise your work based on the feedback above, then complete the task.`; }

Pipeline Step Progression

On activity completion (approved / agent task done)

async function onActivityCompleted(activityId: string, tenantId: string): Promise<void> { const activity = await getActivity(activityId, tenantId); const template = await getActivityTemplate(activity.templateId); // Find next step in pipeline const nextStep = template.steps[activity.pipelineStep + 1]; if (!nextStep) { // Pipeline complete — mark deliverable unit done await incrementDeliverablePeriodCount(activity.deliverableId, tenantId); return; } // Create next activity const nextActivity = await db.insert(activities).values({ tenantId, deliverableId: activity.deliverableId, deliverablePeriodId: activity.deliverablePeriodId, templateId: activity.templateId, pipelineStep: activity.pipelineStep + 1, name: nextStep.name, assigneeType: nextStep.assigneeType, assigneeAgentRole: nextStep.agentRole, isApprovalGate: nextStep.isApproval, inputRef: activity.outputRef, // Output of this step is input to next status: 'created', }).returning(); // Enqueue if agent activity; surface in inbox if human activity if (nextStep.assigneeType === 'agent') { await enqueueActivity(nextActivity[0], tenantId, nextStep.agentRole, { wakeReason: 'new_task', }); } // Human activities are surfaced automatically via the DM Portal inbox query }

On activity rejection (human rejects agent output)

When a human reviewer rejects an agent’s output, the same agent is re-dispatched with the rejected output and reviewer notes threaded into the new prompt via wakeReason: 'rejection'.

async function onActivityRejected( activityId: string, tenantId: string, reviewerNotes: string, ): Promise<void> { const activity = await getActivity(activityId, tenantId); // Mark the current activity as rejected await db.update(activities) .set({ status: 'rejected', reviewerNotes, rejectedAt: new Date() }) .where(eq(activities.id, activityId)); // Create a revision activity — same step, same agent, same pipeline position const revisionActivity = await db.insert(activities).values({ tenantId, deliverableId: activity.deliverableId, deliverablePeriodId: activity.deliverablePeriodId, templateId: activity.templateId, pipelineStep: activity.pipelineStep, // same step, not incremented name: `${activity.name} (revision)`, assigneeType: 'agent', assigneeAgentRole: activity.assigneeAgentRole, isApprovalGate: false, inputRef: activity.inputRef, // same input as original status: 'created', }).returning(); // Enqueue with rejection context — agent will receive both its rejected output // and the reviewer's notes in the prompt await enqueueActivity(revisionActivity[0], tenantId, activity.assigneeAgentRole, { wakeReason: 'rejection', rejectionRef: activity.outputRef, // MongoDB ObjectId of the rejected output reviewerNotes, }); }

The revision activity re-uses the same session (via deliverableId + agentRole lookup) so the agent has full conversation history when revising.

On blocker reported (agent calls report_blocker tool)

When an agent cannot proceed due to an external dependency (expired OAuth token, missing integration, API outage), it calls the report_blocker tool during execution. The tool handler suspends the activity and creates a human task to resolve the issue.

async function onActivityBlocked( activityId: string, tenantId: string, blockerType: BlockerType, blockerMessage: string, channelId?: string, ): Promise<void> { // 1. Mark the activity blocked await db.update(activities) .set({ status: 'blocked', blockerType, blockerMessage, blockedAt: new Date(), }) .where(eq(activities.id, activityId)); // 2. Create a human-assigned resolution activity in the DM Portal inbox await db.insert(activities).values({ tenantId, name: `Resolve blocker: ${blockerMessage}`, assigneeType: 'human', isApprovalGate: false, status: 'created', parentActivityId: activityId, // links back to the blocked activity metadata: { blockerType, blockedActivityId: activityId, channelId, }, }); // 3. If channel auth expired, flag the channel record if (blockerType === 'channel_auth_expired' && channelId) { await db.update(channels) .set({ status: 'expired' }) .where(and(eq(channels.id, channelId), eq(channels.tenantId, tenantId))); } }

On blocker resolved (human marks resolution activity complete)

async function onBlockerResolved( resolutionActivityId: string, tenantId: string, resolutionNote: string, ): Promise<void> { const resolution = await getActivity(resolutionActivityId, tenantId); const blockedActivityId = resolution.metadata.blockedActivityId; const blocked = await getActivity(blockedActivityId, tenantId); // Mark the blocked activity as ready to resume await db.update(activities) .set({ status: 'created', blockerType: null, blockerMessage: null }) .where(eq(activities.id, blockedActivityId)); // Re-enqueue with unblocked wake reason so the agent knows to resume await enqueueActivity(blocked, tenantId, blocked.assigneeAgentRole, { wakeReason: 'unblocked', blockerResolution: resolutionNote, }); }

The re-enqueued activity resumes the same session — the agent has full conversation history and picks up where it left off. The blockerResolution note is prepended to the prompt so the agent knows what changed.

On subtask completed (agent called create_subtask)

When an agent creates a blocking subtask, the parent activity waits in awaiting_subtask state. When the subtask finishes, the parent is re-enqueued with the subtask output.

async function onSubtaskCompleted( subtaskActivityId: string, tenantId: string, ): Promise<void> { const subtask = await getActivity(subtaskActivityId, tenantId); if (!subtask.parentActivityId) return; // not a subtask const parent = await getActivity(subtask.parentActivityId, tenantId); if (parent.status !== 'awaiting_subtask') return; // parent not waiting await db.update(activities) .set({ status: 'created' }) .where(eq(activities.id, parent.id)); await enqueueActivity(parent, tenantId, parent.assigneeAgentRole, { wakeReason: 'subtask_completed', subtaskOutputRef: subtask.outputRef, subtaskName: subtask.name, }); }

For non-blocking subtasks (blocking: false), the parent continues running — onSubtaskCompleted is still called but only stores the result on the subtask record; no parent re-enqueue.

On review response (agent called request_human_review)

async function onReviewCompleted( reviewActivityId: string, tenantId: string, approved: boolean, reviewerFeedback: string, ): Promise<void> { const review = await getActivity(reviewActivityId, tenantId); const parentId = review.metadata.reviewedActivityId; const parent = await getActivity(parentId, tenantId); await db.update(activities) .set({ status: 'created' }) .where(eq(activities.id, parentId)); await enqueueActivity(parent, tenantId, parent.assigneeAgentRole, { wakeReason: approved ? 'review_approved' : 'review_feedback', reviewApproved: approved, reviewerFeedback: reviewerFeedback, }); }

On approval resolved (human approves/rejects in DM Portal)

Approval resolution is the most powerful event in the system — a single reviewer action can simultaneously re-enqueue many activities across multiple pipelines.

// Called from the approvals API route when reviewer submits their decision // Full implementation in governance-guardrails.md — onApprovalResolved() // The queue layer's responsibility: enqueue all linked activities correctly async function enqueueApprovalLinkedActivities( approvalId: string, tenantId: string, approved: boolean, reviewerNotes: string, chosenOption?: string, ): Promise<void> { const links = await db.query.approvalLinkedActivities.findMany({ where: eq(approvalLinkedActivities.approvalId, approvalId), with: { activity: true }, }); const feedbackText = [reviewerNotes, chosenOption ? `Selected: ${chosenOption}` : null] .filter(Boolean).join('\n\n'); // All linked activities re-enqueued in parallel await Promise.all(links.map(({ activity }) => enqueueActivity(activity, tenantId, activity.assigneeAgentRole, { wakeReason: approved ? 'review_approved' : 'review_feedback', reviewApproved: approved, reviewerFeedback: feedbackText, }) )); }

This is why create_approval is powerful for content_direction and brand_direction approvals: one reviewer action fans out to all blocked activities at once, with the decision injected into each agent’s prompt via reviewerFeedback.


On task reassigned (agent called reassign_task)

async function onActivityReassigned( activityId: string, tenantId: string, assigneeType: 'agent' | 'human', agentRole: AgentRole | null, humanRole: string | null, reason: string, ): Promise<void> { const activity = await getActivity(activityId, tenantId); // Mark original as reassigned (terminal — no further processing) await db.update(activities) .set({ status: 'reassigned', reassignReason: reason }) .where(eq(activities.id, activityId)); // Create a new activity for the target assignee, same step and input const newActivity = await db.insert(activities).values({ tenantId, deliverableId: activity.deliverableId, deliverablePeriodId: activity.deliverablePeriodId, templateId: activity.templateId, pipelineStep: activity.pipelineStep, name: activity.name, assigneeType, assigneeAgentRole: agentRole, assigneeHumanRole: humanRole, isApprovalGate: assigneeType === 'human', inputRef: activity.inputRef, parentActivityId: activity.id, // link back for audit trail status: 'created', metadata: { reassignedFrom: activityId, reassignReason: reason }, }).returning(); if (assigneeType === 'agent') { await enqueueActivity(newActivity[0], tenantId, agentRole, { wakeReason: 'new_task', }); } // Human activities surface automatically in DM Portal inbox }

Monthly Period Scheduler

At the start of each calendar month, a cron job runs per active tenant:

// Runs at 00:00 on 1st of every month (UTC) // One job per tenant in the system await deliverablePeriodQueue.add( 'start-new-period', { tenantId }, { repeat: { pattern: '0 0 1 * *' } } );

This triggers startNewDeliverablePeriods(tenantId) which:

  1. Creates a deliverable_periods record for each active deliverable
  2. Dispatches the Activity Planner to plan the activity pipeline for the period
  3. Activity Planner creates the first activity in each pipeline

Priority Queuing

BullMQ priority: lower number = higher priority.

PriorityUse Case
1Urgent / overdue activities flagged by DM Portal
5Approval-gated activities (unblocking downstream)
10Normal scheduled activities
20Background research / low-urgency tasks

Concurrency Limits

Default concurrency per agent role per tenant:

AgentDefault ConcurrencyReason
Client Researcher1Playwright scraping — CPU-intensive
Competitor Researcher1Playwright scraping — CPU-intensive
Context File Writer1Runs once per tenant — sequential by design
Strategy Writer1Sequential by design
Deliverable Planner1Sequential by design
Activity Planner2Heavy orchestration; creates many downstream activities
Keyword Researcher2External SEMrush API calls
Content Brief Writer3Pure LLM — parallelises freely
Site Auditor1Playwright + external audit APIs — CPU-intensive
Backlink Researcher2DataForSEO API rate limits
Backlink Outreach Writer3Pure LLM
Blog Writer3Heavy LLM output; concurrency 3 balances cost vs throughput
GBP Post Writer4Short output — parallelises freely
Social Post Writer4Short output — parallelises freely
Email Writer3Pure LLM
Landing Page Writer2Longer output — moderate concurrency
Google Ads Writer3Pure LLM; Google Ads API writes are gated at publish time
Meta Ads Writer3Pure LLM; Meta API writes are gated at publish time
Social Calendar Planner2Planning task — runs once per period
Ads Analyst2GA4/Google Ads/Meta API reads
Report Writer2Long output; runs once per reporting period
Anomaly Detector4Short classification — parallelises freely
Topic Researcher2Local Ollama — CPU on agent host
Research Note Writer3Local Ollama — short outputs
Review Response Writer4Short output — parallelises freely

Concurrency is configured in agent_configs.max_concurrency per tenant and applied when the worker is created.


Dead Letter Queue

After all retries exhausted, BullMQ moves the job to failed state. The global dead-letter worker:

  1. Marks activities.status = 'needs_review'
  2. Creates an escalation Activity assigned to the DM Portal reviewer
  3. Sends a Slack alert (internal team channel)
  4. Flags the deliverable_period if the failed activity was on the critical path

Package Location

packages/queue/

queue/ ├── src/ │ ├── queues.ts # Queue factory: getQueue(tenantId, agentRole) │ ├── workers.ts # Worker factory: createAgentWorker(), processActivity() │ ├── jobs.ts # ActivityJobData type definition │ ├── prompt.ts # buildActivityPrompt() — threads wake reason + rejection/blocker context │ ├── pipeline.ts # onActivityCompleted(), onActivityRejected(), step progression │ ├── blocker.ts # onActivityBlocked(), onBlockerResolved() │ ├── subtask.ts # onSubtaskCompleted() │ ├── review.ts # onReviewCompleted() │ ├── approval.ts # enqueueApprovalLinkedActivities() — fan-out on approval resolve │ ├── reassign.ts # onActivityReassigned() │ ├── scheduler.ts # Monthly period cron registration │ └── deadletter.ts # Dead letter handler └── package.json

Open questions: OQ-1 (worker lifecycle), OQ-2 (cross-tenant queue monitoring) in open-questions.md.

© 2026 Leadmetrics — Internal use only