Activity Prompt Assembly
How the control plane takes a BullMQ job payload and turns it into a complete LLM prompt — covering skill injection, tenant context, upstream output threading, and RAG pre-fetch.
The Problem
An agent doc defines inputs like this:
interface BlogWriterInput {
tenantId: string;
contentBrief: { workingTitle, primaryKeyword, recommendedOutline, ... };
researchNotes?: string;
targetWordCount: number;
}But the LLM doesn’t receive a TypeScript object. It receives a text prompt. This doc explains everything that happens between “BullMQ job dequeued” and “LLM call made”.
Overview — What Gets Assembled
Every agent prompt is built from five layers, assembled in this order:
┌─────────────────────────────────────────────────────┐
│ 1. System prompt template (from agent config) │
│ — The agent's personality, rules, output format │
├─────────────────────────────────────────────────────┤
│ 2. Skill files (from MongoDB) │
│ — client-context-file.md + agent-specific skills │
│ — Injected via --add-dir (Claude) or prepended │
├─────────────────────────────────────────────────────┤
│ 3. Tenant settings (from PostgreSQL) │
│ — brandVoice, prohibitedWords[], industry, etc. │
├─────────────────────────────────────────────────────┤
│ 4. Job payload / upstream outputs (from BullMQ job) │
│ — The actual task: brief, research notes, etc. │
├─────────────────────────────────────────────────────┤
│ 5. RAG pre-fetch results (from Qdrant) │
│ — Control plane runs initial queries before LLM │
│ — Agent can also call rag_search during execution │
└─────────────────────────────────────────────────────┘Step-by-Step: Blog Writer Example
1. BullMQ job dequeued
The worker receives a Job<BlogWriterInput>:
// Delivered by BullMQ to the agent__blog-writer worker
const job = {
id: 'job_7f3a',
data: {
tenantId: 'tenant_abc',
campaignId: 'camp_456',
activityId: 'act_789', // the activity record this job fulfils
upstreamOutputs: {
contentBriefActivityId: 'act_712', // Content Brief Writer output
researchNotesActivityId: 'act_714', // Research Note Writer output (optional)
},
targetWordCount: 1800,
publishTarget: 'wordpress',
}
}Note: The Activity Planner does not copy the full brief text into the job payload. It stores the
activityIdof the upstream task. The worker fetches the output itself. This keeps job payloads small and ensures the worker always reads the latest approved output.
2. Worker fetches upstream outputs
// Inside the agent__blog-writer worker
async function blogWriterWorker(job: Job<BlogWriterInput>) {
const { tenantId, activityId, upstreamOutputs } = job.data;
// Fetch Content Brief Writer's output from activity_runs
const contentBrief = await db.query.activityRuns.findFirst({
where: eq(activityRuns.activityId, upstreamOutputs.contentBriefActivityId),
orderBy: desc(activityRuns.completedAt),
});
// contentBrief.output is the JSON from ContentBriefWriterOutput
// Fetch Research Note Writer's output (if that activity ran)
let researchNotes: string | undefined;
if (upstreamOutputs.researchNotesActivityId) {
const notesRun = await db.query.activityRuns.findFirst({
where: eq(activityRuns.activityId, upstreamOutputs.researchNotesActivityId),
});
researchNotes = notesRun?.output?.notes;
}3. Fetch tenant settings
const tenant = await db.query.tenants.findFirst({
where: eq(tenants.id, tenantId),
columns: {
brandVoice: true,
prohibitedWords: true,
targetAudience: true,
industry: true,
companyName: true,
plan: true,
}
});
const tenantSettings = `
Company: ${tenant.companyName}
Industry: ${tenant.industry}
Brand voice: ${tenant.brandVoice}
Target audience: ${tenant.targetAudience}
Prohibited words: ${tenant.prohibitedWords.join(', ')}
`.trim();4. Fetch and inject skill files
// Fetch the skill files assigned to this agent for this tenant
const skills = await fetchSkillsForAgent({
tenantId,
agentRole: 'blog-writer',
});
// Returns: [client-context-file.md, long-form-writing-guide.md, seo-content-rules.md]
// For Claude Code CLI: write skills to a temp dir, pass via --add-dir
const skillsDir = await writeSkillsToTempDir(skills);
// Claude reads these as context documents — not injected into the prompt text itself
// For OpenAI / Ollama: prepend skill content to the system prompt
const skillsContext = skills.map(s => s.content).join('\n\n---\n\n');Claude Code CLI skill injection (preferred):
claude \
--model claude-sonnet-4-6 \
--add-dir /tmp/skills/tenant_abc/blog-writer \
--print \
--output-format stream-jsonSkills in --add-dir are available as readable files — Claude can cat them when needed. They don’t consume prompt tokens until referenced.
OpenAI / Ollama fallback: Skills are prepended to the system prompt as text. Consumes tokens upfront.
5. RAG pre-fetch
The control plane runs a set of standard queries before dispatching to the LLM. This gives the agent a baseline context without relying on it to make tool calls for the most predictable queries.
const ragContext = await preRunRagQueries({
tenantId,
queries: [
// Duplicate check
{
dataset: 'published_content',
query: `blog posts about ${contentBrief.output.primaryKeyword}`,
topK: 3,
purpose: 'duplicate_check',
},
// Internal link targets
{
dataset: 'website_content',
query: `service pages related to ${contentBrief.output.angle}`,
topK: 5,
purpose: 'internal_links',
},
// Brand facts
{
dataset: 'client_documents',
query: `product details case studies ${contentBrief.output.angle}`,
topK: 4,
purpose: 'brand_facts',
},
// Competitor differentiation
{
dataset: 'competitor_research',
query: `competitor content on ${contentBrief.output.primaryKeyword}`,
topK: 3,
purpose: 'competitor_angle',
},
]
});
// ragContext is formatted as readable chunks:
// "=== Published Content (duplicate check) ===\n[chunks]\n\n=== Website Content (internal links) ===\n..."The agent can also call rag_search as a tool during execution for follow-up queries not covered by the pre-fetch.
6. Build the final prompt
The system prompt is loaded directly from the agent_config table — there is no in-code fallback. If the row is missing or systemPrompt is null, the worker throws immediately and the job fails.
const agentConfig = await db.agentConfig.findUnique({
where: { role: 'blog-writer' },
select: { model: true, systemPrompt: true },
});
if (!agentConfig?.systemPrompt) {
throw new Error('No systemPrompt configured for agent role "blog-writer". Seed the database.');
}
// The system prompt from the DB is the static instruction set for this agent role.
// Runtime context (client context, RAG results, brief, etc.) is appended below it.
const prompt = [
agentConfig.systemPrompt,
``,
`CLIENT: ${tenantName}`,
`BLOG POST TOPIC: ${activityLabel}`,
``,
clientContext ? `CLIENT CONTEXT:\n${clientContext}\n` : null,
ragContext ? `KNOWLEDGE BASE CONTEXT:\n${ragContext}\n` : null,
keywordCluster ? `KEYWORD CLUSTER:\n${keywordCluster}\n` : null,
contentBrief ? `CONTENT BRIEF:\n${contentBrief}\n` : null,
reviewerFeedback ? `REVISION INSTRUCTIONS:\n${reviewerFeedback}\n` : null,
].filter(Boolean).join('\n');
// Example final prompt passed to the adapter:
// "You are the Blog Writer agent for Leadmetrics...
// [full static instructions from agent_config.systemPrompt]
//
// CLIENT: Acme Plumbing Services
// BLOG POST TOPIC: Why Your Hot Water System Keeps Tripping
//
// CLIENT CONTEXT:
// ## Business Overview
// Family-owned plumbing business in Melbourne...
//
// KNOWLEDGE BASE CONTEXT:
// [1] (published_content/blog-posts)
// Emergency plumber tips for Melbourne homeowners...
//
// CONTENT BRIEF:
// Working title: Why Your Hot Water System Keeps Tripping..."Where prompts are stored: All agent system prompts live in the
agent_configtable (systemPromptcolumn). They are seeded viapnpm --filter @leadmetrics/db db:seed. To update a prompt, update the seed and re-run it, or edit the row directly in the database.
7. Dispatch to adapter
const result = await adapter.dispatch({
taskId: job.id,
tenantId,
agentRole: 'blog-writer',
systemPrompt,
skillsDir, // --add-dir path (Claude only)
tools: ['rag_search'], // tools available during execution
callbackUrl: `${API_BASE}/api/agent-callback/${job.id}`,
timeoutMs: 10 * 60 * 1000, // 10 minutes
});8. Store output and create approval
// Parse BlogWriterOutput from result.text
const output = parseBlogWriterOutput(result.text);
// Store in activity_runs
await db.insert(activityRuns).values({
activityId,
tenantId,
output: output,
inputTokens: result.usage.inputTokens,
outputTokens: result.usage.outputTokens,
cost: result.cost,
status: 'completed',
});
// Store deliverable content in MongoDB (large text blob)
await DeliverableContent.create({
activityRunId: activityRunId,
tenantId,
type: 'blog_post',
content: output.bodyMarkdown,
});
// Update activity status
await db.update(activities)
.set({ status: 'awaiting_approval' })
.where(eq(activities.id, activityId));
// Create approval record → triggers HITL
await db.insert(approvals).values({
tenantId,
activityId,
type: 'content_review',
riskLevel: 'medium',
status: 'pending',
});
// Broadcast to DM Portal via SSE
await publish('approval_created', { tenantId, activityId });
}How the Activity Planner Threads Outputs to Inputs
The Activity Planner creates all activities upfront at the start of the period. For dependent activities, it stores the upstream activityId in the dependsOn field:
// Activity Planner output — activities it creates for a blog post pipeline
[
{
id: 'act_712',
agentQueue: 'agent__content-brief-writer',
input: { tenantId, keywordCluster: {...} },
dependsOn: [], // runs first
status: 'pending',
},
{
id: 'act_714',
agentQueue: 'agent__research-note-writer',
input: { tenantId, topic: 'hot water system faults' },
dependsOn: ['act_712'], // waits for brief
status: 'pending',
},
{
id: 'act_789',
agentQueue: 'agent__blog-writer',
input: {
tenantId,
upstreamOutputs: {
contentBriefActivityId: 'act_712', // ← Blog Writer fetches this itself
researchNotesActivityId: 'act_714',
},
targetWordCount: 1800,
publishTarget: 'wordpress',
},
dependsOn: ['act_712', 'act_714'], // waits for both
status: 'pending',
}
]When act_712 completes, BullMQ’s dependsOn gate releases act_714. When both act_712 and act_714 complete, act_789 (Blog Writer) is released. The Blog Writer worker fetches the outputs of both by their activityId at runtime.
Summary — Where Each Input Field Comes From
For BlogWriterInput specifically:
| Field | Source | How it gets there |
|---|---|---|
tenantId | BullMQ job data | Set by Activity Planner at enqueue time |
activityId | BullMQ job data | Set by Activity Planner |
upstreamOutputs.contentBriefActivityId | BullMQ job data | Activity Planner stores the upstream activityId |
contentBrief (resolved) | activity_runs table | Worker fetches by activityId at runtime |
researchNotes (resolved) | activity_runs table | Worker fetches by activityId at runtime |
targetWordCount | BullMQ job data | Set by Activity Planner from deliverable plan |
publishTarget | BullMQ job data | Set by Activity Planner from tenant’s channel config |
CLIENT_CONTEXT | MongoDB (skills) | Worker fetches skill by tenantId + skill name |
TENANT_SETTINGS | PostgreSQL (tenants) | Worker fetches tenant record by tenantId |
RAG_CONTEXT | Qdrant (via rag_search) | Pre-fetched by worker + agent tool calls during execution |
| Skills (Claude —add-dir) | MongoDB (skills) | Worker writes to temp dir before dispatch |
Human On-Demand Trigger
When a DM reviewer or tenant admin triggers a Blog Writer job manually (not via Activity Planner), the UI collects the inputs directly:
DM Portal → "Write post" button → slide-over form:
- Content brief (paste or select from existing brief activity)
- Research notes (optional, paste or skip)
- Word count target (number input)
- Publish target (dropdown)
→ POST /api/activities
{ agentQueue: 'agent__blog-writer', input: { tenantId, contentBrief: {...}, ... } }
→ API creates activity record + enqueues BullMQ job immediately
→ Same worker runs — no difference in execution pathThe only difference from a Activity Planner dispatch: there are no upstreamOutputs.activityIds — the human pastes the brief text directly into the job payload, so the worker skips the upstream-fetch step.
Related Docs
- Task Queue & Orchestration — BullMQ setup, queue namespacing, dependsOn gates
- Agent Execution Engine — adapter implementations (ClaudeAdapter, OpenAIAdapter, OllamaAdapter)
- Skills System — how skill files are stored, versioned, and fetched
- RAG Integration — rag_search tool, dataset structure, Qdrant queries