Skip to Content
FeaturesActivity Prompt Assembly

Activity Prompt Assembly

How the control plane takes a BullMQ job payload and turns it into a complete LLM prompt — covering skill injection, tenant context, upstream output threading, and RAG pre-fetch.


The Problem

An agent doc defines inputs like this:

interface BlogWriterInput { tenantId: string; contentBrief: { workingTitle, primaryKeyword, recommendedOutline, ... }; researchNotes?: string; targetWordCount: number; }

But the LLM doesn’t receive a TypeScript object. It receives a text prompt. This doc explains everything that happens between “BullMQ job dequeued” and “LLM call made”.


Overview — What Gets Assembled

Every agent prompt is built from five layers, assembled in this order:

┌─────────────────────────────────────────────────────┐ │ 1. System prompt template (from agent config) │ │ — The agent's personality, rules, output format │ ├─────────────────────────────────────────────────────┤ │ 2. Skill files (from MongoDB) │ │ — client-context-file.md + agent-specific skills │ │ — Injected via --add-dir (Claude) or prepended │ ├─────────────────────────────────────────────────────┤ │ 3. Tenant settings (from PostgreSQL) │ │ — brandVoice, prohibitedWords[], industry, etc. │ ├─────────────────────────────────────────────────────┤ │ 4. Job payload / upstream outputs (from BullMQ job) │ │ — The actual task: brief, research notes, etc. │ ├─────────────────────────────────────────────────────┤ │ 5. RAG pre-fetch results (from Qdrant) │ │ — Control plane runs initial queries before LLM │ │ — Agent can also call rag_search during execution │ └─────────────────────────────────────────────────────┘

Step-by-Step: Blog Writer Example

1. BullMQ job dequeued

The worker receives a Job<BlogWriterInput>:

// Delivered by BullMQ to the agent__blog-writer worker const job = { id: 'job_7f3a', data: { tenantId: 'tenant_abc', campaignId: 'camp_456', activityId: 'act_789', // the activity record this job fulfils upstreamOutputs: { contentBriefActivityId: 'act_712', // Content Brief Writer output researchNotesActivityId: 'act_714', // Research Note Writer output (optional) }, targetWordCount: 1800, publishTarget: 'wordpress', } }

Note: The Activity Planner does not copy the full brief text into the job payload. It stores the activityId of the upstream task. The worker fetches the output itself. This keeps job payloads small and ensures the worker always reads the latest approved output.


2. Worker fetches upstream outputs

// Inside the agent__blog-writer worker async function blogWriterWorker(job: Job<BlogWriterInput>) { const { tenantId, activityId, upstreamOutputs } = job.data; // Fetch Content Brief Writer's output from activity_runs const contentBrief = await db.query.activityRuns.findFirst({ where: eq(activityRuns.activityId, upstreamOutputs.contentBriefActivityId), orderBy: desc(activityRuns.completedAt), }); // contentBrief.output is the JSON from ContentBriefWriterOutput // Fetch Research Note Writer's output (if that activity ran) let researchNotes: string | undefined; if (upstreamOutputs.researchNotesActivityId) { const notesRun = await db.query.activityRuns.findFirst({ where: eq(activityRuns.activityId, upstreamOutputs.researchNotesActivityId), }); researchNotes = notesRun?.output?.notes; }

3. Fetch tenant settings

const tenant = await db.query.tenants.findFirst({ where: eq(tenants.id, tenantId), columns: { brandVoice: true, prohibitedWords: true, targetAudience: true, industry: true, companyName: true, plan: true, } }); const tenantSettings = ` Company: ${tenant.companyName} Industry: ${tenant.industry} Brand voice: ${tenant.brandVoice} Target audience: ${tenant.targetAudience} Prohibited words: ${tenant.prohibitedWords.join(', ')} `.trim();

4. Fetch and inject skill files

// Fetch the skill files assigned to this agent for this tenant const skills = await fetchSkillsForAgent({ tenantId, agentRole: 'blog-writer', }); // Returns: [client-context-file.md, long-form-writing-guide.md, seo-content-rules.md] // For Claude Code CLI: write skills to a temp dir, pass via --add-dir const skillsDir = await writeSkillsToTempDir(skills); // Claude reads these as context documents — not injected into the prompt text itself // For OpenAI / Ollama: prepend skill content to the system prompt const skillsContext = skills.map(s => s.content).join('\n\n---\n\n');

Claude Code CLI skill injection (preferred):

claude \ --model claude-sonnet-4-6 \ --add-dir /tmp/skills/tenant_abc/blog-writer \ --print \ --output-format stream-json

Skills in --add-dir are available as readable files — Claude can cat them when needed. They don’t consume prompt tokens until referenced.

OpenAI / Ollama fallback: Skills are prepended to the system prompt as text. Consumes tokens upfront.


5. RAG pre-fetch

The control plane runs a set of standard queries before dispatching to the LLM. This gives the agent a baseline context without relying on it to make tool calls for the most predictable queries.

const ragContext = await preRunRagQueries({ tenantId, queries: [ // Duplicate check { dataset: 'published_content', query: `blog posts about ${contentBrief.output.primaryKeyword}`, topK: 3, purpose: 'duplicate_check', }, // Internal link targets { dataset: 'website_content', query: `service pages related to ${contentBrief.output.angle}`, topK: 5, purpose: 'internal_links', }, // Brand facts { dataset: 'client_documents', query: `product details case studies ${contentBrief.output.angle}`, topK: 4, purpose: 'brand_facts', }, // Competitor differentiation { dataset: 'competitor_research', query: `competitor content on ${contentBrief.output.primaryKeyword}`, topK: 3, purpose: 'competitor_angle', }, ] }); // ragContext is formatted as readable chunks: // "=== Published Content (duplicate check) ===\n[chunks]\n\n=== Website Content (internal links) ===\n..."

The agent can also call rag_search as a tool during execution for follow-up queries not covered by the pre-fetch.


6. Build the final prompt

The system prompt is loaded directly from the agent_config table — there is no in-code fallback. If the row is missing or systemPrompt is null, the worker throws immediately and the job fails.

const agentConfig = await db.agentConfig.findUnique({ where: { role: 'blog-writer' }, select: { model: true, systemPrompt: true }, }); if (!agentConfig?.systemPrompt) { throw new Error('No systemPrompt configured for agent role "blog-writer". Seed the database.'); } // The system prompt from the DB is the static instruction set for this agent role. // Runtime context (client context, RAG results, brief, etc.) is appended below it. const prompt = [ agentConfig.systemPrompt, ``, `CLIENT: ${tenantName}`, `BLOG POST TOPIC: ${activityLabel}`, ``, clientContext ? `CLIENT CONTEXT:\n${clientContext}\n` : null, ragContext ? `KNOWLEDGE BASE CONTEXT:\n${ragContext}\n` : null, keywordCluster ? `KEYWORD CLUSTER:\n${keywordCluster}\n` : null, contentBrief ? `CONTENT BRIEF:\n${contentBrief}\n` : null, reviewerFeedback ? `REVISION INSTRUCTIONS:\n${reviewerFeedback}\n` : null, ].filter(Boolean).join('\n'); // Example final prompt passed to the adapter: // "You are the Blog Writer agent for Leadmetrics... // [full static instructions from agent_config.systemPrompt] // // CLIENT: Acme Plumbing Services // BLOG POST TOPIC: Why Your Hot Water System Keeps Tripping // // CLIENT CONTEXT: // ## Business Overview // Family-owned plumbing business in Melbourne... // // KNOWLEDGE BASE CONTEXT: // [1] (published_content/blog-posts) // Emergency plumber tips for Melbourne homeowners... // // CONTENT BRIEF: // Working title: Why Your Hot Water System Keeps Tripping..."

Where prompts are stored: All agent system prompts live in the agent_config table (systemPrompt column). They are seeded via pnpm --filter @leadmetrics/db db:seed. To update a prompt, update the seed and re-run it, or edit the row directly in the database.


7. Dispatch to adapter

const result = await adapter.dispatch({ taskId: job.id, tenantId, agentRole: 'blog-writer', systemPrompt, skillsDir, // --add-dir path (Claude only) tools: ['rag_search'], // tools available during execution callbackUrl: `${API_BASE}/api/agent-callback/${job.id}`, timeoutMs: 10 * 60 * 1000, // 10 minutes });

8. Store output and create approval

// Parse BlogWriterOutput from result.text const output = parseBlogWriterOutput(result.text); // Store in activity_runs await db.insert(activityRuns).values({ activityId, tenantId, output: output, inputTokens: result.usage.inputTokens, outputTokens: result.usage.outputTokens, cost: result.cost, status: 'completed', }); // Store deliverable content in MongoDB (large text blob) await DeliverableContent.create({ activityRunId: activityRunId, tenantId, type: 'blog_post', content: output.bodyMarkdown, }); // Update activity status await db.update(activities) .set({ status: 'awaiting_approval' }) .where(eq(activities.id, activityId)); // Create approval record → triggers HITL await db.insert(approvals).values({ tenantId, activityId, type: 'content_review', riskLevel: 'medium', status: 'pending', }); // Broadcast to DM Portal via SSE await publish('approval_created', { tenantId, activityId }); }

How the Activity Planner Threads Outputs to Inputs

The Activity Planner creates all activities upfront at the start of the period. For dependent activities, it stores the upstream activityId in the dependsOn field:

// Activity Planner output — activities it creates for a blog post pipeline [ { id: 'act_712', agentQueue: 'agent__content-brief-writer', input: { tenantId, keywordCluster: {...} }, dependsOn: [], // runs first status: 'pending', }, { id: 'act_714', agentQueue: 'agent__research-note-writer', input: { tenantId, topic: 'hot water system faults' }, dependsOn: ['act_712'], // waits for brief status: 'pending', }, { id: 'act_789', agentQueue: 'agent__blog-writer', input: { tenantId, upstreamOutputs: { contentBriefActivityId: 'act_712', // ← Blog Writer fetches this itself researchNotesActivityId: 'act_714', }, targetWordCount: 1800, publishTarget: 'wordpress', }, dependsOn: ['act_712', 'act_714'], // waits for both status: 'pending', } ]

When act_712 completes, BullMQ’s dependsOn gate releases act_714. When both act_712 and act_714 complete, act_789 (Blog Writer) is released. The Blog Writer worker fetches the outputs of both by their activityId at runtime.


Summary — Where Each Input Field Comes From

For BlogWriterInput specifically:

FieldSourceHow it gets there
tenantIdBullMQ job dataSet by Activity Planner at enqueue time
activityIdBullMQ job dataSet by Activity Planner
upstreamOutputs.contentBriefActivityIdBullMQ job dataActivity Planner stores the upstream activityId
contentBrief (resolved)activity_runs tableWorker fetches by activityId at runtime
researchNotes (resolved)activity_runs tableWorker fetches by activityId at runtime
targetWordCountBullMQ job dataSet by Activity Planner from deliverable plan
publishTargetBullMQ job dataSet by Activity Planner from tenant’s channel config
CLIENT_CONTEXTMongoDB (skills)Worker fetches skill by tenantId + skill name
TENANT_SETTINGSPostgreSQL (tenants)Worker fetches tenant record by tenantId
RAG_CONTEXTQdrant (via rag_search)Pre-fetched by worker + agent tool calls during execution
Skills (Claude —add-dir)MongoDB (skills)Worker writes to temp dir before dispatch

Human On-Demand Trigger

When a DM reviewer or tenant admin triggers a Blog Writer job manually (not via Activity Planner), the UI collects the inputs directly:

DM Portal → "Write post" button → slide-over form: - Content brief (paste or select from existing brief activity) - Research notes (optional, paste or skip) - Word count target (number input) - Publish target (dropdown) → POST /api/activities { agentQueue: 'agent__blog-writer', input: { tenantId, contentBrief: {...}, ... } } → API creates activity record + enqueues BullMQ job immediately → Same worker runs — no difference in execution path

The only difference from a Activity Planner dispatch: there are no upstreamOutputs.activityIds — the human pastes the brief text directly into the job payload, so the worker skips the upstream-fetch step.


© 2026 Leadmetrics — Internal use only