Skip to Content
FeaturesAgent Execution Engine

Agent Execution Engine

Purpose

The foundational layer of the platform. Abstracts away differences between LLM backends (Claude Code CLI, Ollama REST, OpenAI REST) behind a single unified adapter interface, handles streaming output, session continuity, retries, cost extraction, and structured result delivery.

Related: Adapters — full adapter pattern, phone-home callbacks, resolveAdapter() factory | Task Queue — how activities are dispatched | Skills System — skill injection before execution

Important: The control plane does not run agents. It dispatches tasks to external agent runtimes via adapters. Agents “phone home” with results via callback. This document covers the adapter implementations themselves.


Heartbeat Model

Leadmetrics agents follow a heartbeat pattern — they wake up, do work, and go back to sleep. The key architectural choice is a push model rather than a pull model: the control plane (BullMQ) decides what an agent works on and pushes it. Agents never fetch their own work queue or update their own status.

This table maps each step of a standard agent heartbeat to how Leadmetrics handles it:

StepPull-model approachLeadmetrics (push model)
1. Get identityGET /api/agents/me at run starttenantId, agentRole, runId injected into prompt / POST body at dispatch time
2. Handle approvalsCheck env var for pending approval IDwakeReason: 'rejection' + reviewerNotes threaded directly into the task prompt
3. Fetch workGET /api/issues?assigneeAgentId=...Not needed — BullMQ dequeues the job and passes ActivityJobData to the worker
4. PrioritizeAgent selects highest-priority itemPriority set on the BullMQ job at enqueue time; worker processes in priority order
5. CheckoutPOST /api/issues/:id/checkout (409 on conflict)BullMQ job locking — only one worker dequeues a job; mutual exclusion is guaranteed
6. Review contextAgent fetches issue comments and linked documentsbuildActivityPrompt() assembles all context (prior step output, rejection notes) before dispatch
7. Execute workAgent calls tools, produces outputSame — adapter dispatches to LLM, streams output
8. Update statusAgent PATCH /api/issues/:id with run IDWorker updates activities.status and activity_runs after the adapter completes
9. Create subtasksAgent POST /api/issues to delegate to another agentControl plane only — see below

Push model trade-offs

AspectPush model (Leadmetrics)Pull model
Agent complexityLow — agent only processes a promptHigher — agent must call API, handle conflicts
Concurrency safetyBullMQ guarantees exclusive deliveryAgent must handle 409 checkout conflicts
Audit trailAll state changes by control plane — single source of truthAgent and control plane both mutate state
Dynamic task creationActivity Planner only (predefined pipelines)Any agent can create subtasks
TestabilityAdapter is a pure function: prompt in → text outAgent behaviour depends on API state

Agent-initiated activity creation (step 9)

The one capability worker agents lack is the ability to dynamically create activities for other agents during execution. In a pull-model platform, any agent can POST /api/issues to delegate a subtask. In Leadmetrics, only the Activity Planner creates activities — all other agents work within predefined pipeline templates.

Implication: If a worker agent (e.g. the Copywriter) discovers mid-task that it needs additional research it doesn’t have, it cannot request a Content Researcher run directly. Options available to it:

  • Signal the gap explicitly in its output text — the human reviewer can act on this, or the Activity Planner can pick it up when planning the next period
  • Use the rag_search tool to retrieve existing knowledge from the knowledge base
  • Use load_skill to retrieve additional skill content if relevant

Design rationale: Predefined pipelines make the system auditable, predictable, and easier for non-technical operators to reason about. Dynamic subtask creation would require the current activity to block while awaiting the subtask result, complicating the state machine significantly. This is a deliberate MVP constraint.

Future consideration: A request_activity tool that allows agents to signal a need for additional specialist work — creating a blocked activity that the Activity Planner must explicitly unblock. Deferred post-MVP.


Responsibilities

  • Dispatch tasks to the correct LLM backend based on agent_configs.adapter_type
  • Inject skills context (temp dir for Claude; system prompt prepend for others)
  • Stream and parse LLM output in real time (for sync adapters)
  • Handle phone-home callbacks (for async adapters)
  • Return a structured, typed result: { text, sessionId, usage, cost, error }
  • Enforce per-call timeouts
  • Retry failed calls with exponential backoff
  • Log every call to llm_calls table

Adapters

The control plane dispatches to four supported backends via the unified AgentAdapter interface. See the Adapters documentation for full I/O mechanics per backend:

AdapterDocBackend
ClaudeAdapterclaude.mdClaude Code CLI subprocess (NDJSON on stdout)
OpenAIAdapteropenai.mdOpenAI REST API (SSE streaming)
OllamaAdapterollama.mdLocal Ollama REST API (NDJSON streaming)
WebhookAdapterwebhook.mdAsync HTTP POST + phone-home callback

Unified Adapter Interface

All backends implement the same TypeScript interface. The rest of the system never needs to know which LLM is running.

interface AgentAdapter { dispatch(request: DispatchRequest): Promise<DispatchResult>; } interface DispatchRequest { prompt: string; systemPrompt?: string; skillsDir?: string; // path to temp dir (Claude: --add-dir; Ollama: read + prepend) sessionId?: string; // resume prior session (Claude only) activityRunId: string; // for phone-home callback validation model: string; timeoutMs: number; // derived from ClaudeAdapterConfig.timeoutSec * 1000 graceMs?: number; // derived from ClaudeAdapterConfig.graceSec * 1000 (Claude only) maxTurns?: number; // derived from ClaudeAdapterConfig.maxTurnsPerRun (Claude only) maxTokens?: number; // Run context — passed to WebhookAdapter as POST body fields so external agents // know why they were woken and can authenticate their callback wakeReason: 'new_task' | 'rejection' | 'scheduled' | 'unblocked' | 'subtask_completed' | 'review_approved' | 'review_feedback'; tenantId: string; agentRole: AgentRole; runId: string; // same as activityRunId — exposed for external adapters } interface DispatchResult { // For sync adapters (Claude CLI, Ollama): result is available immediately text?: string; sessionId?: string | null; usage?: { inputTokens: number; outputTokens: number }; cost?: number; durationMs?: number; // For async adapters (Webhook): result arrives via phone-home callback callbackUrl?: string; taskToken?: string; // short-lived JWT scoped to this activityRunId } type AgentEvent = | { type: 'text_delta'; delta: string } | { type: 'tool_call'; name: string; input: unknown } | { type: 'tool_result'; name: string; output: unknown } | { type: 'completed'; result: DispatchResult } | { type: 'error'; error: AgentError };

WebhookAdapter — run context passed to external agent

For external agents using the WebhookAdapter, the full run context is included in the POST body so the agent knows its identity and how to authenticate its callback:

// POST to agentConfig.webhookUrl { prompt: "...", wakeReason: "rejection", agentRole: "copywriter", tenantId: "ten_abc123", runId: "run_xyz789", callbackUrl: "https://api.leadmetrics.io/api/agent-callback/run_xyz789", taskToken: "<JWT signed with run-scoped secret, expires in 1h>" }

The external agent uses callbackUrl + taskToken to POST its result back. The JWT is scoped to the single runId and expires after the activity’s configured timeout. See OQ-16 for key rotation design.


Adapter Factory

See Adapters — Index for the full resolveAdapter() implementation including the data-privacy override. Summary:

function resolveAdapter(config: AgentConfig): AgentAdapter { switch (config.adapterType) { case 'claude': return new ClaudeAdapter(config); case 'openai': return new OpenAIAdapter(config); case 'ollama': return new OllamaAdapter(config); case 'webhook': return new WebhookAdapter(config); } }

Timeout Handling

Each execute() call enforces a two-stage shutdown for Claude (graceful) and a single-stage abort for HTTP adapters (Ollama, OpenAI, Webhook).

Claude adapter — graceful shutdown:

  1. At timeoutMs: send SIGTERM to the child process — gives Claude a chance to flush its current output and exit cleanly
  2. If the process is still alive after graceMs (default 10 s): send SIGKILL — hard kill

HTTP adapters: abort the fetch request immediately at timeoutMs.

In all cases the activity run is marked failed with error: 'timeout' and BullMQ retry policy picks it up.

async function withGracefulTimeout( child: ChildProcess, timeoutMs: number, graceMs: number ): Promise<void> { const sigterm = setTimeout(() => child.kill('SIGTERM'), timeoutMs); const sigkill = setTimeout(() => child.kill('SIGKILL'), timeoutMs + graceMs); try { await onProcessExit(child); } finally { clearTimeout(sigterm); clearTimeout(sigkill); } }

Adapter Health Checks

Before dispatching a task, the worker optionally runs a lightweight testAdapter() call to verify the adapter is operational. This prevents queueing work to a broken backend and gives operators early, actionable diagnostics.

interface AdapterTestResult { ok: boolean; message: string; // human-readable status checks: AdapterCheck[]; // individual check results } interface AdapterCheck { name: string; ok: boolean; detail?: string; }

See each adapter doc for provider-specific check details:

When health checks run:

  • On agent creation — validate before the agent is set to active
  • On manual “Test Connection” from the Manage App agent config screen
  • Optionally on worker startup — configurable per deployment; skipped if ADAPTER_HEALTH_CHECK_ON_START=false
  • Health check failures do not block existing queued work — they surface as a warning on the agent config record

Agent Status Lifecycle

Each agent role has a unified status per tenant, maintained on agent_configs.status. This gives operators a single field to query for the health and activity of each agent — rather than inferring state from BullMQ worker internals or activity run counts.

┌────────────┐ │ active │◄──────────────────────────┐ └──────┬─────┘ │ │ job picked up │ error cleared (manual) ▼ │ or budget reset (auto) ┌────────────┐ all retries │ │ running │──── exhausted ────────►┌──┴──────┐ └──────┬─────┘ │ error │ │ job completes └─────────┘ ┌────────────┐ │ idle │ (active but no queued tasks — display alias for active) └──────┬─────┘ │ budget exceeded / manual pause ┌────────────┐ │ paused │ └──────┬─────┘ │ permanently disabled ┌─────────────┐ │ terminated │ └─────────────┘
StatusMeaningSet by
activeEnabled; ready to receive and process tasksDefault on agent creation; restored on resume
idleActive but no tasks currently queued (UI display alias — not stored separately)Derived from queue depth
runningAt least one task currently in progressWorker increments on job start; decrements on job end
errorMost recent task exhausted all retries without completingDead-letter handler
pausedSuspended — either budget exceeded or manually paused by operatorBudget enforcement or Manage App action
terminatedPermanently disabled for this tenantManage App action

Status transitions

// Worker sets running on job start await db.update(agentConfigs) .set({ status: 'running', runningCount: sql`running_count + 1` }) .where(and(eq(agentConfigs.tenantId, tenantId), eq(agentConfigs.role, agentRole))); // Worker restores active when running count reaches zero await db.update(agentConfigs) .set({ status: sql`CASE WHEN running_count - 1 <= 0 THEN 'active' ELSE 'running' END`, runningCount: sql`GREATEST(running_count - 1, 0)`, }) .where(and(eq(agentConfigs.tenantId, tenantId), eq(agentConfigs.role, agentRole))); // Dead-letter handler sets error await db.update(agentConfigs) .set({ status: 'error', lastErrorAt: new Date(), lastErrorMessage: errorMessage }) .where(and(eq(agentConfigs.tenantId, tenantId), eq(agentConfigs.role, agentRole)));

runningCount on agent_configs tracks concurrent running tasks — status returns to active only when the count reaches zero. This handles the multi-concurrency case correctly (e.g. Copywriter with concurrency 4 stays running until all 4 slots are idle).


Retry Policy

Retries are managed by BullMQ at the job level, not inside the executor. The executor throws on failure; BullMQ catches it and reschedules.

AttemptDelay
1st retry5 seconds
2nd retry30 seconds
3rd retry2 minutes
4th+ retryDead letter queue

Retry count and max retries are stored on the activity_runs record so the Activity Planner can observe escalation state.


Cost Calculation

Model pricing is maintained as a static table in the shared package. Cost is calculated after every completed call:

const MODEL_PRICING: Record<string, { inputPer1M: number; outputPer1M: number }> = { 'claude-sonnet-4-6': { inputPer1M: 3.00, outputPer1M: 15.00 }, 'claude-haiku-4-5': { inputPer1M: 0.25, outputPer1M: 1.25 }, 'gemma3:4b': { inputPer1M: 0, outputPer1M: 0 }, // local = free }; function calculateCost(model: string, inputTokens: number, outputTokens: number): number { const pricing = MODEL_PRICING[model] ?? { inputPer1M: 0, outputPer1M: 0 }; return (inputTokens / 1_000_000) * pricing.inputPer1M + (outputTokens / 1_000_000) * pricing.outputPer1M; }

LLM Call Logging

Every completed or failed call is written to llm_calls immediately after the stream closes (or when the phone-home callback is received):

await db.insert(llmCalls).values({ activityRunId, tenantId, sessionId, model, promptHash: sha256(prompt), responseHash: sha256(resultText), inputTokens, outputTokens, costUsd: calculateCost(model, inputTokens, outputTokens), durationMs, createdAt: new Date(), });

The promptHash and responseHash allow audit without storing raw prompt content in the call log.


Streaming to the UI

The API exposes GET /api/activities/:id/stream (Next.js API Route, Server-Sent Events). For Claude/Ollama sync adapters, the AgentEvent stream is piped directly. For async adapters, the SSE stream polls activity_streams (MongoDB, TTL 24h) which the phone-home callback populates.

Claude/Ollama Adapter (AsyncIterable<AgentEvent>) │ text_delta events Next.js API Route (SSE, text/event-stream) usePlatformEvents() hook (React, SSE client) Activity Detail screen — live output pane (W5)

Screen reference: W5 (Activity Detail) in workflow-screens.md


POC Status

Three POC scripts exist in claude-poc/:

FileWhat it tests
run.jsClaude Code CLI child process spawn, NDJSON streaming, session ID extraction
run-ollama.jsOllama REST streaming via fetch
run-ollama-cli.jsOllama via CLI subprocess
test-tmpdir.mjsTemp dir creation, file copy, --add-dir injection pattern

These are the reference implementations to formalise into the agent-engine package.


Package Location

packages/agent-engine/

agent-engine/ ├── src/ │ ├── adapters/ │ │ ├── claude.ts # ClaudeAdapter — CLI child process, NDJSON, graceful shutdown │ │ ├── ollama.ts # OllamaAdapter — local REST, streaming │ │ ├── openai.ts # OpenAIAdapter — REST API, streaming SSE │ │ └── webhook.ts # WebhookAdapter — POST + phone-home callback │ ├── config/ │ │ ├── claude.ts # ClaudeAdapterConfig schema + renderPromptTemplate() │ │ ├── ollama.ts # OllamaAdapterConfig schema │ │ ├── openai.ts # OpenAIAdapterConfig schema │ │ └── webhook.ts # WebhookAdapterConfig schema │ ├── health/ │ │ ├── claude.ts # ClaudeAdapter health checks (CLI, auth, cwd, probe) │ │ ├── ollama.ts # OllamaAdapter health checks (server, model) │ │ └── types.ts # AdapterTestResult, AdapterCheck │ ├── factory.ts # resolveAdapter() │ ├── callback.ts # Phone-home callback handler (validates JWT, stores result) │ ├── status.ts # Agent status transitions — setRunning(), setIdle(), setError() │ ├── pricing.ts # MODEL_PRICING + calculateCost() │ ├── types.ts # AgentAdapter, DispatchRequest, AgentEvent, DispatchResult │ └── index.ts └── package.json

© 2026 Leadmetrics — Internal use only