Agent Execution Engine
Purpose
The foundational layer of the platform. Abstracts away differences between LLM backends (Claude Code CLI, Ollama REST, OpenAI REST) behind a single unified adapter interface, handles streaming output, session continuity, retries, cost extraction, and structured result delivery.
Related: Adapters — full adapter pattern, phone-home callbacks,
resolveAdapter()factory | Task Queue — how activities are dispatched | Skills System — skill injection before execution
Important: The control plane does not run agents. It dispatches tasks to external agent runtimes via adapters. Agents “phone home” with results via callback. This document covers the adapter implementations themselves.
Heartbeat Model
Leadmetrics agents follow a heartbeat pattern — they wake up, do work, and go back to sleep. The key architectural choice is a push model rather than a pull model: the control plane (BullMQ) decides what an agent works on and pushes it. Agents never fetch their own work queue or update their own status.
This table maps each step of a standard agent heartbeat to how Leadmetrics handles it:
| Step | Pull-model approach | Leadmetrics (push model) |
|---|---|---|
| 1. Get identity | GET /api/agents/me at run start | tenantId, agentRole, runId injected into prompt / POST body at dispatch time |
| 2. Handle approvals | Check env var for pending approval ID | wakeReason: 'rejection' + reviewerNotes threaded directly into the task prompt |
| 3. Fetch work | GET /api/issues?assigneeAgentId=... | Not needed — BullMQ dequeues the job and passes ActivityJobData to the worker |
| 4. Prioritize | Agent selects highest-priority item | Priority set on the BullMQ job at enqueue time; worker processes in priority order |
| 5. Checkout | POST /api/issues/:id/checkout (409 on conflict) | BullMQ job locking — only one worker dequeues a job; mutual exclusion is guaranteed |
| 6. Review context | Agent fetches issue comments and linked documents | buildActivityPrompt() assembles all context (prior step output, rejection notes) before dispatch |
| 7. Execute work | Agent calls tools, produces output | Same — adapter dispatches to LLM, streams output |
| 8. Update status | Agent PATCH /api/issues/:id with run ID | Worker updates activities.status and activity_runs after the adapter completes |
| 9. Create subtasks | Agent POST /api/issues to delegate to another agent | Control plane only — see below |
Push model trade-offs
| Aspect | Push model (Leadmetrics) | Pull model |
|---|---|---|
| Agent complexity | Low — agent only processes a prompt | Higher — agent must call API, handle conflicts |
| Concurrency safety | BullMQ guarantees exclusive delivery | Agent must handle 409 checkout conflicts |
| Audit trail | All state changes by control plane — single source of truth | Agent and control plane both mutate state |
| Dynamic task creation | Activity Planner only (predefined pipelines) | Any agent can create subtasks |
| Testability | Adapter is a pure function: prompt in → text out | Agent behaviour depends on API state |
Agent-initiated activity creation (step 9)
The one capability worker agents lack is the ability to dynamically create activities for other agents during execution. In a pull-model platform, any agent can POST /api/issues to delegate a subtask. In Leadmetrics, only the Activity Planner creates activities — all other agents work within predefined pipeline templates.
Implication: If a worker agent (e.g. the Copywriter) discovers mid-task that it needs additional research it doesn’t have, it cannot request a Content Researcher run directly. Options available to it:
- Signal the gap explicitly in its output text — the human reviewer can act on this, or the Activity Planner can pick it up when planning the next period
- Use the
rag_searchtool to retrieve existing knowledge from the knowledge base - Use
load_skillto retrieve additional skill content if relevant
Design rationale: Predefined pipelines make the system auditable, predictable, and easier for non-technical operators to reason about. Dynamic subtask creation would require the current activity to block while awaiting the subtask result, complicating the state machine significantly. This is a deliberate MVP constraint.
Future consideration: A
request_activitytool that allows agents to signal a need for additional specialist work — creating a blocked activity that the Activity Planner must explicitly unblock. Deferred post-MVP.
Responsibilities
- Dispatch tasks to the correct LLM backend based on
agent_configs.adapter_type - Inject skills context (temp dir for Claude; system prompt prepend for others)
- Stream and parse LLM output in real time (for sync adapters)
- Handle phone-home callbacks (for async adapters)
- Return a structured, typed result:
{ text, sessionId, usage, cost, error } - Enforce per-call timeouts
- Retry failed calls with exponential backoff
- Log every call to
llm_callstable
Adapters
The control plane dispatches to four supported backends via the unified AgentAdapter interface. See the Adapters documentation for full I/O mechanics per backend:
| Adapter | Doc | Backend |
|---|---|---|
| ClaudeAdapter | claude.md | Claude Code CLI subprocess (NDJSON on stdout) |
| OpenAIAdapter | openai.md | OpenAI REST API (SSE streaming) |
| OllamaAdapter | ollama.md | Local Ollama REST API (NDJSON streaming) |
| WebhookAdapter | webhook.md | Async HTTP POST + phone-home callback |
Unified Adapter Interface
All backends implement the same TypeScript interface. The rest of the system never needs to know which LLM is running.
interface AgentAdapter {
dispatch(request: DispatchRequest): Promise<DispatchResult>;
}
interface DispatchRequest {
prompt: string;
systemPrompt?: string;
skillsDir?: string; // path to temp dir (Claude: --add-dir; Ollama: read + prepend)
sessionId?: string; // resume prior session (Claude only)
activityRunId: string; // for phone-home callback validation
model: string;
timeoutMs: number; // derived from ClaudeAdapterConfig.timeoutSec * 1000
graceMs?: number; // derived from ClaudeAdapterConfig.graceSec * 1000 (Claude only)
maxTurns?: number; // derived from ClaudeAdapterConfig.maxTurnsPerRun (Claude only)
maxTokens?: number;
// Run context — passed to WebhookAdapter as POST body fields so external agents
// know why they were woken and can authenticate their callback
wakeReason: 'new_task' | 'rejection' | 'scheduled' | 'unblocked'
| 'subtask_completed' | 'review_approved' | 'review_feedback';
tenantId: string;
agentRole: AgentRole;
runId: string; // same as activityRunId — exposed for external adapters
}
interface DispatchResult {
// For sync adapters (Claude CLI, Ollama): result is available immediately
text?: string;
sessionId?: string | null;
usage?: { inputTokens: number; outputTokens: number };
cost?: number;
durationMs?: number;
// For async adapters (Webhook): result arrives via phone-home callback
callbackUrl?: string;
taskToken?: string; // short-lived JWT scoped to this activityRunId
}
type AgentEvent =
| { type: 'text_delta'; delta: string }
| { type: 'tool_call'; name: string; input: unknown }
| { type: 'tool_result'; name: string; output: unknown }
| { type: 'completed'; result: DispatchResult }
| { type: 'error'; error: AgentError };WebhookAdapter — run context passed to external agent
For external agents using the WebhookAdapter, the full run context is included in the POST body so the agent knows its identity and how to authenticate its callback:
// POST to agentConfig.webhookUrl
{
prompt: "...",
wakeReason: "rejection",
agentRole: "copywriter",
tenantId: "ten_abc123",
runId: "run_xyz789",
callbackUrl: "https://api.leadmetrics.io/api/agent-callback/run_xyz789",
taskToken: "<JWT signed with run-scoped secret, expires in 1h>"
}The external agent uses callbackUrl + taskToken to POST its result back. The JWT is scoped to the single runId and expires after the activity’s configured timeout. See OQ-16 for key rotation design.
Adapter Factory
See Adapters — Index for the full resolveAdapter() implementation including the data-privacy override. Summary:
function resolveAdapter(config: AgentConfig): AgentAdapter {
switch (config.adapterType) {
case 'claude': return new ClaudeAdapter(config);
case 'openai': return new OpenAIAdapter(config);
case 'ollama': return new OllamaAdapter(config);
case 'webhook': return new WebhookAdapter(config);
}
}Timeout Handling
Each execute() call enforces a two-stage shutdown for Claude (graceful) and a single-stage abort for HTTP adapters (Ollama, OpenAI, Webhook).
Claude adapter — graceful shutdown:
- At
timeoutMs: send SIGTERM to the child process — gives Claude a chance to flush its current output and exit cleanly - If the process is still alive after
graceMs(default 10 s): send SIGKILL — hard kill
HTTP adapters: abort the fetch request immediately at timeoutMs.
In all cases the activity run is marked failed with error: 'timeout' and BullMQ retry policy picks it up.
async function withGracefulTimeout(
child: ChildProcess,
timeoutMs: number,
graceMs: number
): Promise<void> {
const sigterm = setTimeout(() => child.kill('SIGTERM'), timeoutMs);
const sigkill = setTimeout(() => child.kill('SIGKILL'), timeoutMs + graceMs);
try {
await onProcessExit(child);
} finally {
clearTimeout(sigterm);
clearTimeout(sigkill);
}
}Adapter Health Checks
Before dispatching a task, the worker optionally runs a lightweight testAdapter() call to verify the adapter is operational. This prevents queueing work to a broken backend and gives operators early, actionable diagnostics.
interface AdapterTestResult {
ok: boolean;
message: string; // human-readable status
checks: AdapterCheck[]; // individual check results
}
interface AdapterCheck {
name: string;
ok: boolean;
detail?: string;
}See each adapter doc for provider-specific check details:
When health checks run:
- On agent creation — validate before the agent is set to
active - On manual “Test Connection” from the Manage App agent config screen
- Optionally on worker startup — configurable per deployment; skipped if
ADAPTER_HEALTH_CHECK_ON_START=false - Health check failures do not block existing queued work — they surface as a warning on the agent config record
Agent Status Lifecycle
Each agent role has a unified status per tenant, maintained on agent_configs.status. This gives operators a single field to query for the health and activity of each agent — rather than inferring state from BullMQ worker internals or activity run counts.
┌────────────┐
│ active │◄──────────────────────────┐
└──────┬─────┘ │
│ job picked up │ error cleared (manual)
▼ │ or budget reset (auto)
┌────────────┐ all retries │
│ running │──── exhausted ────────►┌──┴──────┐
└──────┬─────┘ │ error │
│ job completes └─────────┘
▼
┌────────────┐
│ idle │ (active but no queued tasks — display alias for active)
└──────┬─────┘
│ budget exceeded / manual pause
▼
┌────────────┐
│ paused │
└──────┬─────┘
│ permanently disabled
▼
┌─────────────┐
│ terminated │
└─────────────┘| Status | Meaning | Set by |
|---|---|---|
active | Enabled; ready to receive and process tasks | Default on agent creation; restored on resume |
idle | Active but no tasks currently queued (UI display alias — not stored separately) | Derived from queue depth |
running | At least one task currently in progress | Worker increments on job start; decrements on job end |
error | Most recent task exhausted all retries without completing | Dead-letter handler |
paused | Suspended — either budget exceeded or manually paused by operator | Budget enforcement or Manage App action |
terminated | Permanently disabled for this tenant | Manage App action |
Status transitions
// Worker sets running on job start
await db.update(agentConfigs)
.set({ status: 'running', runningCount: sql`running_count + 1` })
.where(and(eq(agentConfigs.tenantId, tenantId), eq(agentConfigs.role, agentRole)));
// Worker restores active when running count reaches zero
await db.update(agentConfigs)
.set({
status: sql`CASE WHEN running_count - 1 <= 0 THEN 'active' ELSE 'running' END`,
runningCount: sql`GREATEST(running_count - 1, 0)`,
})
.where(and(eq(agentConfigs.tenantId, tenantId), eq(agentConfigs.role, agentRole)));
// Dead-letter handler sets error
await db.update(agentConfigs)
.set({ status: 'error', lastErrorAt: new Date(), lastErrorMessage: errorMessage })
.where(and(eq(agentConfigs.tenantId, tenantId), eq(agentConfigs.role, agentRole)));runningCount on agent_configs tracks concurrent running tasks — status returns to active only when the count reaches zero. This handles the multi-concurrency case correctly (e.g. Copywriter with concurrency 4 stays running until all 4 slots are idle).
Retry Policy
Retries are managed by BullMQ at the job level, not inside the executor. The executor throws on failure; BullMQ catches it and reschedules.
| Attempt | Delay |
|---|---|
| 1st retry | 5 seconds |
| 2nd retry | 30 seconds |
| 3rd retry | 2 minutes |
| 4th+ retry | Dead letter queue |
Retry count and max retries are stored on the activity_runs record so the Activity Planner can observe escalation state.
Cost Calculation
Model pricing is maintained as a static table in the shared package. Cost is calculated after every completed call:
const MODEL_PRICING: Record<string, { inputPer1M: number; outputPer1M: number }> = {
'claude-sonnet-4-6': { inputPer1M: 3.00, outputPer1M: 15.00 },
'claude-haiku-4-5': { inputPer1M: 0.25, outputPer1M: 1.25 },
'gemma3:4b': { inputPer1M: 0, outputPer1M: 0 }, // local = free
};
function calculateCost(model: string, inputTokens: number, outputTokens: number): number {
const pricing = MODEL_PRICING[model] ?? { inputPer1M: 0, outputPer1M: 0 };
return (inputTokens / 1_000_000) * pricing.inputPer1M
+ (outputTokens / 1_000_000) * pricing.outputPer1M;
}LLM Call Logging
Every completed or failed call is written to llm_calls immediately after the stream closes (or when the phone-home callback is received):
await db.insert(llmCalls).values({
activityRunId,
tenantId,
sessionId,
model,
promptHash: sha256(prompt),
responseHash: sha256(resultText),
inputTokens,
outputTokens,
costUsd: calculateCost(model, inputTokens, outputTokens),
durationMs,
createdAt: new Date(),
});The promptHash and responseHash allow audit without storing raw prompt content in the call log.
Streaming to the UI
The API exposes GET /api/activities/:id/stream (Next.js API Route, Server-Sent Events). For Claude/Ollama sync adapters, the AgentEvent stream is piped directly. For async adapters, the SSE stream polls activity_streams (MongoDB, TTL 24h) which the phone-home callback populates.
Claude/Ollama Adapter (AsyncIterable<AgentEvent>)
│ text_delta events
▼
Next.js API Route (SSE, text/event-stream)
│
▼
usePlatformEvents() hook (React, SSE client)
│
▼
Activity Detail screen — live output pane (W5)Screen reference: W5 (Activity Detail) in workflow-screens.md
POC Status
Three POC scripts exist in claude-poc/:
| File | What it tests |
|---|---|
run.js | Claude Code CLI child process spawn, NDJSON streaming, session ID extraction |
run-ollama.js | Ollama REST streaming via fetch |
run-ollama-cli.js | Ollama via CLI subprocess |
test-tmpdir.mjs | Temp dir creation, file copy, --add-dir injection pattern |
These are the reference implementations to formalise into the agent-engine package.
Package Location
packages/agent-engine/
agent-engine/
├── src/
│ ├── adapters/
│ │ ├── claude.ts # ClaudeAdapter — CLI child process, NDJSON, graceful shutdown
│ │ ├── ollama.ts # OllamaAdapter — local REST, streaming
│ │ ├── openai.ts # OpenAIAdapter — REST API, streaming SSE
│ │ └── webhook.ts # WebhookAdapter — POST + phone-home callback
│ ├── config/
│ │ ├── claude.ts # ClaudeAdapterConfig schema + renderPromptTemplate()
│ │ ├── ollama.ts # OllamaAdapterConfig schema
│ │ ├── openai.ts # OpenAIAdapterConfig schema
│ │ └── webhook.ts # WebhookAdapterConfig schema
│ ├── health/
│ │ ├── claude.ts # ClaudeAdapter health checks (CLI, auth, cwd, probe)
│ │ ├── ollama.ts # OllamaAdapter health checks (server, model)
│ │ └── types.ts # AdapterTestResult, AdapterCheck
│ ├── factory.ts # resolveAdapter()
│ ├── callback.ts # Phone-home callback handler (validates JWT, stores result)
│ ├── status.ts # Agent status transitions — setRunning(), setIdle(), setError()
│ ├── pricing.ts # MODEL_PRICING + calculateCost()
│ ├── types.ts # AgentAdapter, DispatchRequest, AgentEvent, DispatchResult
│ └── index.ts
└── package.json