Task Queue & Orchestration
Purpose
Accept, route, and manage the full lifecycle of activities across all agents. Decouple activity submission from agent execution so the system handles load spikes, prioritises urgent work, retries failures, and enforces concurrency limits per agent type — all without the API layer needing to know about agent availability.
Related: Workflow Model — how activities are created | LLM Providers — adapter pattern for dispatching to agents | Scalability — queue capacity design
Technology
BullMQ (v5) backed by Redis (v7).
- Reliable job delivery with at-least-once semantics
- Priority queues (higher priority = dequeued first)
- Delayed / scheduled jobs (monthly period triggers)
- Per-queue concurrency limits
- Automatic retry with configurable backoff
- Dead letter queue (failed jobs that exhausted retries)
- Job events for real-time status tracking
Queue Architecture
Queues are shared per agent role across all tenants — one queue per agent type, not one per tenant. Tenant isolation is enforced via tenantId in the job payload; workers verify it before processing. This mirrors the rag__ingestion pattern already in use.
Redis — shared queues (all tenants)
├── agent__client-researcher concurrency: variable
├── agent__competitor-researcher concurrency: variable
├── agent__context-file-writer concurrency: variable
├── agent__strategy-writer concurrency: variable
├── agent__deliverable-planner concurrency: variable
├── agent__activity-planner concurrency: variable
├── agent__keyword-researcher concurrency: variable
├── agent__content-brief-writer concurrency: variable
├── agent__site-auditor concurrency: variable
├── agent__backlink-researcher concurrency: variable
├── agent__backlink-outreach-writer concurrency: variable
├── agent__blog-writer concurrency: variable
├── agent__gbp-post-writer concurrency: variable
├── agent__social-post-writer concurrency: variable
├── agent__email-writer concurrency: variable
├── agent__landing-page-writer concurrency: variable
├── agent__google-ads-writer concurrency: variable
├── agent__meta-ads-writer concurrency: variable
├── agent__social-calendar-planner concurrency: variable
├── agent__ads-analyst concurrency: variable
├── agent__report-writer concurrency: variable
├── agent__anomaly-detector concurrency: variable
├── agent__topic-researcher concurrency: variable
├── agent__research-note-writer concurrency: variable
├── agent__review-response-writer concurrency: variable
└── dead-letter concurrency: 1 (global alerting)Why shared queues?
- Simpler worker lifecycle — one worker process per agent role handles all tenants; no per-tenant worker spawn/teardown.
- Matches the existing
rag__ingestionandnotifications__{channel}patterns already in the codebase. - Tenant isolation is maintained by
tenantIdin the job payload (verified by the worker before execution) and by BullMQ priority so high-priority jobs from any tenant are dequeued first. - Per-tenant concurrency limits from
agent_configs.max_concurrencyare enforced at enqueue time via BullMQ’s rate-limiter keyed ontenantId, not at the queue level.
Note: Concurrency limits above are worker-level totals across all tenants. Per-tenant caps are applied via the rate limiter. See Agent Hierarchy for per-plan limits.
Job Definition
All agent activity jobs share a single typed payload:
interface ActivityJobData {
activityId: string; // PostgreSQL activities.id
tenantId: string; // For tenant isolation verification
agentRole: AgentRole; // Which agent queue this came from
deliverableId: string; // Parent deliverable
pipelineStep: number; // Which step in the activity template
inputRef?: string; // MongoDB ObjectId of previous step's output
priority: number; // 0 = normal, 1 = high, 2 = urgent
// Wake reason — tells the agent why it was dispatched
wakeReason: 'new_task' | 'rejection' | 'scheduled' | 'unblocked'
| 'subtask_completed' | 'review_approved' | 'review_feedback';
// Populated only when wakeReason === 'rejection'
rejectionRef?: string; // MongoDB ObjectId of the output that was rejected
reviewerNotes?: string; // Human reviewer's written feedback
// Populated only when wakeReason === 'unblocked'
blockerResolution?: string; // Human's note explaining how the blocker was resolved
// Populated only when wakeReason === 'subtask_completed'
subtaskOutputRef?: string; // MongoDB ObjectId of the completed subtask's output
subtaskName?: string; // Name of the subtask that completed
// Populated only when wakeReason === 'review_approved' | 'review_feedback'
reviewApproved?: boolean; // true = approved, false = feedback given
reviewerFeedback?: string; // Reviewer's comments (present for both approved + feedback)
}wakeReason is threaded into the task prompt (see Prompt Construction below) so the agent understands its context without needing to infer it from prior session history.
Activity State Machine
┌─────────┐
created ──────► │ created │
└────┬────┘
│ worker picks up job
┌────▼─────┐
│ assigned │
└────┬─────┘
│ adapter dispatches
┌────▼──────────┐◄──── re-enqueued (any wakeReason)
│ in_progress │
└──┬───┬───┬───┘
┌─────────────┘ │ └──────────────────────────┐
│ ┌────┘ │
┌────────▼──────┐ ┌──▼────┐ ┌─────────┐ ┌──────────┐ └──► failed
│awaiting_ │ │ done │ │ blocked │ │awaiting_ │ /timeout
│approval │ │ │ │ │ │subtask │ │
└───────┬───────┘ └───────┘ └────┬────┘ └────┬─────┘ retries?
│ │ │
approved/rejected blocker subtask reassigned
│ resolved completes (terminal)
next step / re-enqueued re-enqueued
revision activity (unblocked) (subtask_completed)
│
┌──────────────────┤
│ │
┌────▼──────────┐ awaiting_
│ in_progress │ review
└───────────────┘ │
approved /
feedback
re-enqueued
(review_approved / review_feedback)All activity statuses:
| Status | Set by | Meaning |
|---|---|---|
created | Control plane | Activity exists, not yet dispatched |
assigned | Worker | Job dequeued, about to dispatch to adapter |
in_progress | Worker | Adapter running |
awaiting_approval | Worker | Agent done; predefined pipeline approval gate |
awaiting_subtask | create_subtask tool | Agent suspended; waiting for subtask to complete |
awaiting_review | request_human_review tool | Agent suspended; waiting for ad-hoc human review |
blocked | report_blocker tool | Suspended; external dependency needs human action |
done | Worker | Agent task completed successfully |
approved | Human | Human approval granted; pipeline advances |
rejected | Human | Human rejected; revision activity created |
reassigned | reassign_task tool | Activity handed off; new activity created for target agent/human |
failed | Worker | Adapter error after all retries |
State transitions are written to activities.status after every relevant event.
For human approval activities (assignee_type = 'human', is_approval_gate = true):
- Job is not dispatched to an agent queue
- Activity is surfaced in the DM Portal Activities Inbox
- Status transitions on human action:
created → in_progress → approved | rejected
Worker Factory
Each agent role gets a single shared BullMQ Worker. Workers are created by a factory using the shared agent__{agentRole} queue name — one queue for all tenants:
import { Worker } from 'bullmq';
function createAgentWorker(agentRole: AgentRole, concurrency: number) {
const queueName = `agent__${agentRole}`; // shared across all tenants
return new Worker(
queueName,
async (job: Job<ActivityJobData>) => {
// tenantId is in the payload — worker verifies it before processing
await processActivity(job.data);
},
{
connection: redisConnection,
concurrency,
limiter: {
max: 10, // max 10 jobs per minute per worker (runaway protection)
duration: 60_000,
},
}
);
}Job Deduplication
Recurring cron-triggered jobs use a deduplication key to prevent the same work being enqueued twice if a cron fires while the previous run is still in the queue:
await queue.add(
'activity',
jobData,
{
// Deduplication key: same activity cannot be queued twice with the same wake reason
deduplication: {
id: `${jobData.tenantId}:${jobData.activityId}:${jobData.wakeReason}`,
},
priority: jobData.priority,
}
);If a job with the same deduplication ID already exists in the waiting or active state, the new job is silently dropped. This prevents double-execution without blocking the enqueue call.
Deduplication key format: {tenantId}:{activityId}:{wakeReason} — the wakeReason is included because the same activity legitimately re-enters the queue with different wake reasons (e.g. new_task → review_feedback).
Dedup key persistence after failure: BullMQ’s dedup key (bull:{queue}:de:{id}) is NOT cleared when a job fails. This means re-enqueueing the same key after a failure is silently dropped. Two mitigations are in place for the activity-planner:
- The
failedevent handler deletes the dedup key from Redis immediately after final failure. - A 2-hour TTL on the dedup key ensures auto-expiry even if the handler doesn’t run (hard kill).
Worker Stall Handling
All agent workers set maxStalledCount: 2:
const worker = new Worker(queueName, processor, {
connection: getRedisConnection(),
concurrency: 1,
lockDuration: 660_000,
maxStalledCount: 2, // allow 2 stall-retries before giving up
});Why maxStalledCount: 2 not 0: When a worker process crashes unexpectedly (e.g. Windows DLL cold-start, OOM kill), BullMQ marks the in-progress job as “stalled”. With maxStalledCount: 0 the job moves directly to failed — bypassing the attempts/backoff retry mechanism entirely (BullMQ only retries on thrown errors, not stalls). With maxStalledCount: 2, a stalled job is moved back to waiting and re-picked by the next healthy worker, up to twice before being considered permanently failed.
Workers are spawned once at API server startup — one worker process per agent role, shared across all tenants. No per-tenant lifecycle management is needed.
Activity Processing in a Worker
async function processActivity(data: ActivityJobData): Promise<void> {
// 1. Mark activity as in_progress
await db.update(activities)
.set({ status: 'in_progress', startedAt: new Date() })
.where(and(eq(activities.id, data.activityId), eq(activities.tenantId, data.tenantId)));
// 2. Create activity_run record
const [run] = await db.insert(activityRuns).values({
activityId: data.activityId,
tenantId: data.tenantId,
wakeReason: data.wakeReason,
startedAt: new Date(),
}).returning();
// 3. Resolve adapter for this tenant + agent role
const agentConfig = await getAgentConfig(data.tenantId, data.agentRole);
const adapter = resolveAdapter(agentConfig);
// 4. Resolve skills — returns client context file (full) + manifest (metadata-only)
const { contextFile, manifest } = await resolveSkillsForActivity(data.tenantId, data.agentRole);
const skillsDir = await injectSkills(contextFile, manifest);
// 5. Build task prompt — threads wake reason and rejection context
const prompt = await buildActivityPrompt(data);
// 6. Resolve the existing session for this campaign + agent role (if any)
const session = await resolveSession(data.deliverableId, data.agentRole);
try {
// 7. Dispatch to adapter
const result = await adapter.dispatch({
prompt,
skillsDir,
sessionId: session?.externalSessionId ?? undefined,
activityRunId: run.id,
wakeReason: data.wakeReason,
});
// 8. Persist session state for next heartbeat
await persistSession(session, result, data.deliverableId, data.agentRole);
// 9. Update run record
// Note: if the agent called report_blocker, status is already 'blocked' —
// the tool handler sets it. The adapter returns normally but the activity
// remains blocked until onBlockerResolved() re-enqueues it.
const isBlocked = await isActivityBlocked(data.activityId);
if (!isBlocked) {
await db.update(activityRuns)
.set({ status: 'completed', endedAt: new Date(), sessionId: result.sessionId })
.where(eq(activityRuns.id, run.id));
}
} catch (err) {
await db.update(activityRuns)
.set({ status: 'failed', error: String(err), endedAt: new Date() })
.where(eq(activityRuns.id, run.id));
throw err; // BullMQ catches this and triggers retry
} finally {
await cleanupSkillsDir(skillsDir);
}
}See also: LLM Providers for
resolveAdapter()implementation and the phone-home callback handler.
Prompt Construction
buildActivityPrompt() assembles the full task prompt from the activity definition, the prior step’s output, and — critically — the wake reason context. This ensures the agent always knows why it was dispatched and what it is expected to do differently (if anything).
async function buildActivityPrompt(data: ActivityJobData): Promise<string> {
const activity = await getActivity(data.activityId, data.tenantId);
const input = data.inputRef ? await getActivityOutput(data.inputRef) : null;
// Base task description
let prompt = `# Task: ${activity.name}\n\n${activity.description}`;
// Thread the prior pipeline step's output as context
if (input) {
prompt += `\n\n## Input from previous step\n\n${input.content}`;
}
// Rejection path — agent is revising work a human reviewed and rejected
if (data.wakeReason === 'rejection') {
if (data.rejectionRef) {
const rejected = await getActivityOutput(data.rejectionRef);
prompt += `\n\n## Your previous output (rejected by reviewer)\n\n${rejected.content}`;
}
if (data.reviewerNotes) {
prompt += `\n\n## Reviewer feedback\n\n${data.reviewerNotes}`;
}
prompt += `\n\nRevise your output to address the reviewer's feedback above. `
+ `Do not restate the feedback — produce the corrected output directly.`;
}
return prompt;
}Wake reason behaviour by case
wakeReason | What the agent receives | Expected behaviour |
|---|---|---|
new_task | Task description + prior step output (if any) | Produce the deliverable from scratch |
rejection | Task description + prior step output + rejected output + reviewer notes | Revise the rejected output per the feedback |
scheduled | Task description only | Execute the recurring task with no prior context dependency |
unblocked | Task description + prior step output + blocker resolution note | Resume; the external blocker has been resolved |
subtask_completed | Task description + prior step output + subtask name + subtask output | Continue using the subtask result |
review_approved | Task description + prior step output + reviewer feedback (if any) | Human approved — finalise and complete the task |
review_feedback | Task description + prior step output + reviewer feedback | Revise based on the reviewer’s feedback and re-submit or complete |
Prompt additions for the new wake reasons:
if (data.wakeReason === 'unblocked' && data.blockerResolution) {
prompt += `\n\n## Blocker resolved\n\n${data.blockerResolution}`;
prompt += `\n\nResume the task from where you left off.`;
}
if (data.wakeReason === 'subtask_completed' && data.subtaskOutputRef) {
const subtaskOutput = await getActivityOutput(data.subtaskOutputRef);
prompt += `\n\n## Subtask completed: ${data.subtaskName}\n\n${subtaskOutput.content}`;
prompt += `\n\nContinue your task using the subtask result above.`;
}
if (data.wakeReason === 'review_approved') {
prompt += `\n\n## Human review: Approved`;
if (data.reviewerFeedback) prompt += `\n\nReviewer notes: ${data.reviewerFeedback}`;
prompt += `\n\nYour work was approved. Finalise and complete the task.`;
}
if (data.wakeReason === 'review_feedback') {
prompt += `\n\n## Human review: Feedback provided\n\n${data.reviewerFeedback}`;
prompt += `\n\nRevise your work based on the feedback above, then complete the task.`;
}Pipeline Step Progression
On activity completion (approved / agent task done)
async function onActivityCompleted(activityId: string, tenantId: string): Promise<void> {
const activity = await getActivity(activityId, tenantId);
const template = await getActivityTemplate(activity.templateId);
// Find next step in pipeline
const nextStep = template.steps[activity.pipelineStep + 1];
if (!nextStep) {
// Pipeline complete — mark deliverable unit done
await incrementDeliverablePeriodCount(activity.deliverableId, tenantId);
return;
}
// Create next activity
const nextActivity = await db.insert(activities).values({
tenantId,
deliverableId: activity.deliverableId,
deliverablePeriodId: activity.deliverablePeriodId,
templateId: activity.templateId,
pipelineStep: activity.pipelineStep + 1,
name: nextStep.name,
assigneeType: nextStep.assigneeType,
assigneeAgentRole: nextStep.agentRole,
isApprovalGate: nextStep.isApproval,
inputRef: activity.outputRef, // Output of this step is input to next
status: 'created',
}).returning();
// Enqueue if agent activity; surface in inbox if human activity
if (nextStep.assigneeType === 'agent') {
await enqueueActivity(nextActivity[0], tenantId, nextStep.agentRole, {
wakeReason: 'new_task',
});
}
// Human activities are surfaced automatically via the DM Portal inbox query
}On activity rejection (human rejects agent output)
When a human reviewer rejects an agent’s output, the same agent is re-dispatched with the rejected output and reviewer notes threaded into the new prompt via wakeReason: 'rejection'.
async function onActivityRejected(
activityId: string,
tenantId: string,
reviewerNotes: string,
): Promise<void> {
const activity = await getActivity(activityId, tenantId);
// Mark the current activity as rejected
await db.update(activities)
.set({ status: 'rejected', reviewerNotes, rejectedAt: new Date() })
.where(eq(activities.id, activityId));
// Create a revision activity — same step, same agent, same pipeline position
const revisionActivity = await db.insert(activities).values({
tenantId,
deliverableId: activity.deliverableId,
deliverablePeriodId: activity.deliverablePeriodId,
templateId: activity.templateId,
pipelineStep: activity.pipelineStep, // same step, not incremented
name: `${activity.name} (revision)`,
assigneeType: 'agent',
assigneeAgentRole: activity.assigneeAgentRole,
isApprovalGate: false,
inputRef: activity.inputRef, // same input as original
status: 'created',
}).returning();
// Enqueue with rejection context — agent will receive both its rejected output
// and the reviewer's notes in the prompt
await enqueueActivity(revisionActivity[0], tenantId, activity.assigneeAgentRole, {
wakeReason: 'rejection',
rejectionRef: activity.outputRef, // MongoDB ObjectId of the rejected output
reviewerNotes,
});
}The revision activity re-uses the same session (via deliverableId + agentRole lookup) so the agent has full conversation history when revising.
On blocker reported (agent calls report_blocker tool)
When an agent cannot proceed due to an external dependency (expired OAuth token, missing integration, API outage), it calls the report_blocker tool during execution. The tool handler suspends the activity and creates a human task to resolve the issue.
async function onActivityBlocked(
activityId: string,
tenantId: string,
blockerType: BlockerType,
blockerMessage: string,
channelId?: string,
): Promise<void> {
// 1. Mark the activity blocked
await db.update(activities)
.set({
status: 'blocked',
blockerType,
blockerMessage,
blockedAt: new Date(),
})
.where(eq(activities.id, activityId));
// 2. Create a human-assigned resolution activity in the DM Portal inbox
await db.insert(activities).values({
tenantId,
name: `Resolve blocker: ${blockerMessage}`,
assigneeType: 'human',
isApprovalGate: false,
status: 'created',
parentActivityId: activityId, // links back to the blocked activity
metadata: {
blockerType,
blockedActivityId: activityId,
channelId,
},
});
// 3. If channel auth expired, flag the channel record
if (blockerType === 'channel_auth_expired' && channelId) {
await db.update(channels)
.set({ status: 'expired' })
.where(and(eq(channels.id, channelId), eq(channels.tenantId, tenantId)));
}
}On blocker resolved (human marks resolution activity complete)
async function onBlockerResolved(
resolutionActivityId: string,
tenantId: string,
resolutionNote: string,
): Promise<void> {
const resolution = await getActivity(resolutionActivityId, tenantId);
const blockedActivityId = resolution.metadata.blockedActivityId;
const blocked = await getActivity(blockedActivityId, tenantId);
// Mark the blocked activity as ready to resume
await db.update(activities)
.set({ status: 'created', blockerType: null, blockerMessage: null })
.where(eq(activities.id, blockedActivityId));
// Re-enqueue with unblocked wake reason so the agent knows to resume
await enqueueActivity(blocked, tenantId, blocked.assigneeAgentRole, {
wakeReason: 'unblocked',
blockerResolution: resolutionNote,
});
}The re-enqueued activity resumes the same session — the agent has full conversation history and picks up where it left off. The blockerResolution note is prepended to the prompt so the agent knows what changed.
On subtask completed (agent called create_subtask)
When an agent creates a blocking subtask, the parent activity waits in awaiting_subtask state. When the subtask finishes, the parent is re-enqueued with the subtask output.
async function onSubtaskCompleted(
subtaskActivityId: string,
tenantId: string,
): Promise<void> {
const subtask = await getActivity(subtaskActivityId, tenantId);
if (!subtask.parentActivityId) return; // not a subtask
const parent = await getActivity(subtask.parentActivityId, tenantId);
if (parent.status !== 'awaiting_subtask') return; // parent not waiting
await db.update(activities)
.set({ status: 'created' })
.where(eq(activities.id, parent.id));
await enqueueActivity(parent, tenantId, parent.assigneeAgentRole, {
wakeReason: 'subtask_completed',
subtaskOutputRef: subtask.outputRef,
subtaskName: subtask.name,
});
}For non-blocking subtasks (blocking: false), the parent continues running — onSubtaskCompleted is still called but only stores the result on the subtask record; no parent re-enqueue.
On review response (agent called request_human_review)
async function onReviewCompleted(
reviewActivityId: string,
tenantId: string,
approved: boolean,
reviewerFeedback: string,
): Promise<void> {
const review = await getActivity(reviewActivityId, tenantId);
const parentId = review.metadata.reviewedActivityId;
const parent = await getActivity(parentId, tenantId);
await db.update(activities)
.set({ status: 'created' })
.where(eq(activities.id, parentId));
await enqueueActivity(parent, tenantId, parent.assigneeAgentRole, {
wakeReason: approved ? 'review_approved' : 'review_feedback',
reviewApproved: approved,
reviewerFeedback: reviewerFeedback,
});
}On approval resolved (human approves/rejects in DM Portal)
Approval resolution is the most powerful event in the system — a single reviewer action can simultaneously re-enqueue many activities across multiple pipelines.
// Called from the approvals API route when reviewer submits their decision
// Full implementation in governance-guardrails.md — onApprovalResolved()
// The queue layer's responsibility: enqueue all linked activities correctly
async function enqueueApprovalLinkedActivities(
approvalId: string,
tenantId: string,
approved: boolean,
reviewerNotes: string,
chosenOption?: string,
): Promise<void> {
const links = await db.query.approvalLinkedActivities.findMany({
where: eq(approvalLinkedActivities.approvalId, approvalId),
with: { activity: true },
});
const feedbackText = [reviewerNotes, chosenOption ? `Selected: ${chosenOption}` : null]
.filter(Boolean).join('\n\n');
// All linked activities re-enqueued in parallel
await Promise.all(links.map(({ activity }) =>
enqueueActivity(activity, tenantId, activity.assigneeAgentRole, {
wakeReason: approved ? 'review_approved' : 'review_feedback',
reviewApproved: approved,
reviewerFeedback: feedbackText,
})
));
}This is why create_approval is powerful for content_direction and brand_direction approvals: one reviewer action fans out to all blocked activities at once, with the decision injected into each agent’s prompt via reviewerFeedback.
On task reassigned (agent called reassign_task)
async function onActivityReassigned(
activityId: string,
tenantId: string,
assigneeType: 'agent' | 'human',
agentRole: AgentRole | null,
humanRole: string | null,
reason: string,
): Promise<void> {
const activity = await getActivity(activityId, tenantId);
// Mark original as reassigned (terminal — no further processing)
await db.update(activities)
.set({ status: 'reassigned', reassignReason: reason })
.where(eq(activities.id, activityId));
// Create a new activity for the target assignee, same step and input
const newActivity = await db.insert(activities).values({
tenantId,
deliverableId: activity.deliverableId,
deliverablePeriodId: activity.deliverablePeriodId,
templateId: activity.templateId,
pipelineStep: activity.pipelineStep,
name: activity.name,
assigneeType,
assigneeAgentRole: agentRole,
assigneeHumanRole: humanRole,
isApprovalGate: assigneeType === 'human',
inputRef: activity.inputRef,
parentActivityId: activity.id, // link back for audit trail
status: 'created',
metadata: { reassignedFrom: activityId, reassignReason: reason },
}).returning();
if (assigneeType === 'agent') {
await enqueueActivity(newActivity[0], tenantId, agentRole, {
wakeReason: 'new_task',
});
}
// Human activities surface automatically in DM Portal inbox
}Monthly Period Scheduler
At the start of each calendar month, a cron job runs per active tenant:
// Runs at 00:00 on 1st of every month (UTC)
// One job per tenant in the system
await deliverablePeriodQueue.add(
'start-new-period',
{ tenantId },
{ repeat: { pattern: '0 0 1 * *' } }
);This triggers startNewDeliverablePeriods(tenantId) which:
- Creates a
deliverable_periodsrecord for each active deliverable - Dispatches the Activity Planner to plan the activity pipeline for the period
- Activity Planner creates the first activity in each pipeline
Priority Queuing
BullMQ priority: lower number = higher priority.
| Priority | Use Case |
|---|---|
| 1 | Urgent / overdue activities flagged by DM Portal |
| 5 | Approval-gated activities (unblocking downstream) |
| 10 | Normal scheduled activities |
| 20 | Background research / low-urgency tasks |
Concurrency Limits
Default concurrency per agent role per tenant:
| Agent | Default Concurrency | Reason |
|---|---|---|
| Client Researcher | 1 | Playwright scraping — CPU-intensive |
| Competitor Researcher | 1 | Playwright scraping — CPU-intensive |
| Context File Writer | 1 | Runs once per tenant — sequential by design |
| Strategy Writer | 1 | Sequential by design |
| Deliverable Planner | 1 | Sequential by design |
| Activity Planner | 2 | Heavy orchestration; creates many downstream activities |
| Keyword Researcher | 2 | External SEMrush API calls |
| Content Brief Writer | 3 | Pure LLM — parallelises freely |
| Site Auditor | 1 | Playwright + external audit APIs — CPU-intensive |
| Backlink Researcher | 2 | DataForSEO API rate limits |
| Backlink Outreach Writer | 3 | Pure LLM |
| Blog Writer | 3 | Heavy LLM output; concurrency 3 balances cost vs throughput |
| GBP Post Writer | 4 | Short output — parallelises freely |
| Social Post Writer | 4 | Short output — parallelises freely |
| Email Writer | 3 | Pure LLM |
| Landing Page Writer | 2 | Longer output — moderate concurrency |
| Google Ads Writer | 3 | Pure LLM; Google Ads API writes are gated at publish time |
| Meta Ads Writer | 3 | Pure LLM; Meta API writes are gated at publish time |
| Social Calendar Planner | 2 | Planning task — runs once per period |
| Ads Analyst | 2 | GA4/Google Ads/Meta API reads |
| Report Writer | 2 | Long output; runs once per reporting period |
| Anomaly Detector | 4 | Short classification — parallelises freely |
| Topic Researcher | 2 | Local Ollama — CPU on agent host |
| Research Note Writer | 3 | Local Ollama — short outputs |
| Review Response Writer | 4 | Short output — parallelises freely |
Concurrency is configured in agent_configs.max_concurrency per tenant and applied when the worker is created.
Dead Letter Queue
After all retries exhausted, BullMQ moves the job to failed state. The global dead-letter worker:
- Marks
activities.status = 'needs_review' - Creates an escalation Activity assigned to the DM Portal reviewer
- Sends a Slack alert (internal team channel)
- Flags the
deliverable_periodif the failed activity was on the critical path
Package Location
packages/queue/
queue/
├── src/
│ ├── queues.ts # Queue factory: getQueue(tenantId, agentRole)
│ ├── workers.ts # Worker factory: createAgentWorker(), processActivity()
│ ├── jobs.ts # ActivityJobData type definition
│ ├── prompt.ts # buildActivityPrompt() — threads wake reason + rejection/blocker context
│ ├── pipeline.ts # onActivityCompleted(), onActivityRejected(), step progression
│ ├── blocker.ts # onActivityBlocked(), onBlockerResolved()
│ ├── subtask.ts # onSubtaskCompleted()
│ ├── review.ts # onReviewCompleted()
│ ├── approval.ts # enqueueApprovalLinkedActivities() — fan-out on approval resolve
│ ├── reassign.ts # onActivityReassigned()
│ ├── scheduler.ts # Monthly period cron registration
│ └── deadletter.ts # Dead letter handler
└── package.jsonOpen questions: OQ-1 (worker lifecycle), OQ-2 (cross-tenant queue monitoring) in open-questions.md.