Gap 7: No Priority Queue Differentiation
Problem
All BullMQ jobs are enqueued with identical priority. A rejection re-run (a human is blocked, waiting for content) competes equally with a background weekly insight refresh. A new tenant on the setup chain (waiting to complete onboarding) has no advantage over a routine activity planning job for an established tenant.
From packages/queue/src/queues.ts:
defaultJobOptions: {
attempts: 4,
backoff: { type: "exponential", delay: 5_000 },
}No priority field is set. BullMQ supports integer priority (lower = higher priority) but it is not used anywhere in the codebase.
Concrete impact
| Job type | User impact | Current priority |
|---|---|---|
| Rejection re-run (DM is waiting) | DM blocked, can’t approve | Same as everything else |
| Setup chain (new tenant onboarding) | User can’t proceed past onboarding | Same as everything else |
| Executor-triggered job (user in chat) | User waiting for interactive response | Same as everything else |
| Weekly insight refresh | Background, no one waiting | Same as everything else |
| Goal tracker sync | Background, scheduled | Same as everything else |
HuggingGPT’s architecture explicitly prioritises user-facing, interactive tasks over background batch tasks. This is the same principle.
What to Build
1. Define a priority scale
// packages/queue/src/priority.ts
export const JobPriority = {
CRITICAL: 1, // rejection re-runs, executor-triggered, user actively waiting
HIGH: 10, // new tenant setup chain, onboarding-blocking tasks
NORMAL: 50, // regular content generation (blog, social, landing page)
LOW: 100, // background insight refreshes, goal tracking, analytics sync
BACKGROUND: 200 // search indexer syncs, web crawls, non-urgent batch jobs
} as const;2. Apply priority to all enqueue calls
Audit every queue.add() call and add the appropriate priority:
// Rejection re-run — user is waiting
await blogWriterQueue.add("blog-writer", jobData, {
priority: JobPriority.CRITICAL,
jobId: `blog-writer-${blogPostId}-${Date.now()}`,
});
// Setup chain (new tenant)
await setupQueue.add("setup", jobData, {
priority: JobPriority.HIGH,
jobId: `setup-${tenantId}`,
});
// Regular blog generation
await blogWriterQueue.add("blog-writer", jobData, {
priority: JobPriority.NORMAL,
jobId: `blog-writer-${activityId}`,
});
// Background insight refresh
await insightQueue.add("insight-refresh", jobData, {
priority: JobPriority.LOW,
jobId: `insight-${tenantId}-${channelId}`,
});3. Per-queue worker concurrency tuning
BullMQ priority only works within a single queue. Since Leadmetrics uses one queue per agent role, priority helps within a role but not across roles. For cross-role prioritisation, set worker concurrency to reflect urgency:
// High-priority worker roles get more concurrency
const CONCURRENCY = {
"blog-writer": 5, // user-facing content
"setup": 3, // onboarding critical
"strategy-writer": 3, // user-facing
"gsc-insights": 2, // background
"goal-tracker": 1, // background, scheduled
"search-indexer": 2, // background
};4. Priority escalation for stale high-priority jobs
If a CRITICAL or HIGH priority job hasn’t started within 2 minutes of enqueue (queue backlog), escalate and send a Slack/in-app alert to the engineering team:
// In the worker's active handler
const waitTime = Date.now() - job.timestamp;
if (job.opts.priority <= JobPriority.HIGH && waitTime > 120_000) {
await sendAdminAlert({
type: "queue_backlog",
message: `High priority job ${job.name} waited ${waitTime}ms before starting`,
jobId: job.id,
});
}5. Expose queue depth by priority in the Execution Queue dashboard
The /dashboards/execution-queue page should show queue depth broken down by priority tier, not just total count. This lets admins see at a glance if CRITICAL/HIGH jobs are backed up.
Files to Change
- New file:
packages/queue/src/priority.ts packages/queue/src/queues.ts— add priority todefaultJobOptionsper queue- All worker files that call
queue.add()— pass priority fromJobPriority apps/servers/agents/src/index.ts— set concurrency per worker roleapps/dashboard/src/app/(dashboard)/dashboards/execution-queue/— add priority breakdown
Related
- Gap 4: BullMQ ↔ LangGraph bridge (executor-triggered jobs need CRITICAL priority)
- Gap 13: Cost circuit breaker (priority and concurrency limits work together to cap costs)