Skip to Content
Architecture

Architecture — Leadmetrics v3

Core Concept: The Control Plane

This system is a control plane for humans and AI agents to collaborate on digital marketing work.

The control plane does not execute LLM calls directly in the API layer. It orchestrates them — enqueuing work, routing jobs to the right agent worker, enforcing approval gates, and aggregating results.

Agent workers run inside dedicated server processes (apps/servers/agents), consuming BullMQ jobs from shared queues. The API layer enqueues jobs; workers process them and write results back to PostgreSQL. This keeps the API stateless and lets the agent layer scale independently.


Apps & Portals

AppPortAudienceTech
apps/dashboard3000Client/tenant usersNext.js 15.3, React 19, TailwindCSS
apps/manage3001Super-admins (platform ops)Next.js 15.3, React 19, TailwindCSS
apps/dm3002Digital marketing team (internal)Next.js 15.3, React 19, TailwindCSS
apps/api3003All portals + mobileFastify, TypeScript, Socket.IO
apps/knowledgebase3004Docs siteNextra 4.0 (Next.js)
apps/dashboard-mobileClient/tenant users (iOS/Android)React Native 0.83, Expo 55

High-Level Architecture

CLIENTS & DM TEAM PLATFORM ADMINS ───────────────────────────────────────────── ─────────────── ┌──────────────┐ ┌──────────────┐ ┌─────────┐ ┌────────────┐ │ Dashboard │ │ DM Portal │ │ Mobile │ │ Manage │ │ :3000 │ │ :3002 │ │ iOS/And │ │ :3001 │ │ Next.js 15 │ │ Next.js 15 │ │ Expo │ │ Next.js 15 │ └──────┬───────┘ └──────┬───────┘ └────┬────┘ └─────┬──────┘ │ │ │ │ └────────────────┴──────────────┴───────────────┘ HTTPS · JWT auth REST · SSE · Socket.IO ┌─────────────▼──────────────────┐ │ API (Fastify :3003) │ │ │ │ /auth/v1 /tenant/v1 /dm/v1 │ │ /admin/v1 /aichat/v1 │ │ Swagger docs · Rate limiting │ │ Socket.IO (real-time events) │ └───┬──────────────────────────┬──┘ │ reads/writes │ enqueues jobs ▼ ▼ ┌─────────────────────┐ ┌─────────────────────┐ │ PostgreSQL │ │ Redis │ │ (Prisma ORM) │ │ │ │ │◄────►│ BullMQ job queues │ │ All app data │ │ Rate limiting │ │ 100+ models │ │ Socket.IO rooms │ └─────────────────────┘ └──────────┬──────────┘ ▲ │ dequeue jobs │ write results │ │ ┌──────────────▼──────────────────┐ │ │ WORKER SERVERS │ │ │ │ │ │ ┌─────────────────────────┐ │ └──────────────┤ │ agents server │ │ │ │ 40+ AI agent workers │ │ │ │ Claude · OpenAI · Gem. │ │ │ └─────────────────────────┘ │ │ ┌──────────┐ ┌─────────────┐ │ │ │ ragengine│ │notifications│ │ │ └──────────┘ └─────────────┘ │ │ ┌──────────┐ ┌─────────────┐ │ │ │ billing │ │search-index │ │ │ └──────────┘ └─────────────┘ │ │ ┌──────────┐ ┌─────────────┐ │ │ │reporting │ │ scheduler │ │ │ └──────────┘ └─────────────┘ │ └──────────────────────────────────┘ │ (audit logs only) ┌─────────────────────┐ │ MongoDB │ │ Immutable audit │ │ trail + Redis │ │ pub/sub events │ └─────────────────────┘

Server Processes

Seven long-running Node.js processes handle all async work. Each connects directly to PostgreSQL and Redis.

ServerPurposeQueue(s)
apps/servers/agents40+ AI agent workers — blog-writer, social-post-writer, strategy-writer, insights workers, SEO, campaign, RAG, etc.agent__{role} (one per role)
apps/servers/billingMonthly invoicing, overdue lockout, credit resetsbilling__invoice, billing__overdue, billing__credits-reset
apps/servers/notificationsMulti-channel notification dispatch (email, SMS, WhatsApp, Telegram, web push)notifications__{channel}
apps/servers/ragengineRAG document ingestion — file parsing, text extraction, embedding, vector DB indexingrag__ingestion, agent__tenant-web-crawler
apps/servers/reportingDaily automated reports sent at 22:00 local time (tenant + admin summaries)None (uses node-cron → enqueueNotification())
apps/servers/schedulerPolls ScheduledTask table every 15 min; executes one-off tasks (signup reminders, alerts)None (DB-poll pattern)
apps/servers/search-indexerSyncs content to Typesense for full-text search across 13 collectionssearch__sync

Queue System (BullMQ)

All async work flows through BullMQ backed by Redis. Queues are shared across all tenants — one queue per agent role. Tenant isolation is enforced by tenantId in the job payload.

HOW A JOB FLOWS FROM API TO COMPLETION ─────────────────────────────────────────────────────────────── 1. API ENQUEUES ┌───────────────────────────────────────────────────────┐ │ POST /tenant/v1/blog/generate │ │ → enqueueBlogWriter({ tenantId, activityId, ... }) │ │ → Job added to Redis queue: "agent__blog-writer" │ │ → API responds 202 immediately (fire-and-forget) │ └───────────────────────────┬───────────────────────────┘ 2. WORKER DEQUEUES ┌───────────────────────────▼───────────────────────────┐ │ agents server is listening on "agent__blog-writer" │ │ BullMQ delivers the job to the next free worker │ │ (BullMQ guarantees exactly-once delivery) │ └───────────────────────────┬───────────────────────────┘ 3. WORKER PREPARES ┌───────────────────────────▼───────────────────────────┐ │ Worker loads from PostgreSQL: │ │ • Activity record (brief, topic, prior output) │ │ • AgentConfig (which LLM adapter to use) │ │ • Client context file + applicable skills │ │ • Any reviewer feedback from previous rejections │ │ Builds the full prompt via buildActivityPrompt() │ └───────────────────────────┬───────────────────────────┘ 4. LLM CALL (via adapter) ┌───────────────────────────▼───────────────────────────┐ │ Dispatches to the configured LLM: │ │ │ │ claude-local → spawns claude CLI subprocess │ │ streams NDJSON output │ │ │ │ codex-local → calls OpenAI REST API │ │ streams SSE chunks │ │ │ │ gemini-local → calls Gemini REST API │ │ streams response │ └───────────────────────────┬───────────────────────────┘ 5. WORKER SAVES & ADVANCES ┌───────────────────────────▼───────────────────────────┐ │ Writes result to PostgreSQL: │ │ • BlogPost.content updated │ │ • Activity.status → dm_review │ │ • AgentRun logged (tokens, cost, duration) │ │ │ │ Enqueues follow-up jobs: │ │ • enqueueSearchSync() → search-indexer │ │ • enqueueNotification() → notifications server │ │ (notifies DM reviewer: "new draft ready") │ └───────────────────────────────────────────────────────┘

Queue naming: agent__{agentRole} — e.g. agent__blog-writer, agent__strategy-writer, agent__setup.

Common enqueue functions (all from @leadmetrics/queue):

FunctionQueueTrigger
enqueueSetupChain()agent__setupOnboarding wizard completion
enqueueBlogWriter()agent__blog-writerNew blog activity or brief generation
enqueueSocialPostWriter()agent__social-post-writerSocial post creation
enqueueStrategyWriter()agent__strategy-writerStrategy revision
enqueueNotification()notifications__{channel}Any event requiring user notification
enqueueRagFile()rag__ingestionDocument upload
enqueueSearchSync()search__syncContent create/update

Job config defaults: 4 retry attempts, exponential backoff (5s initial), completed jobs pruned after 100, failed after 50.


Adapter Layer (LLM Backends)

Three LLM adapters ship as independent packages. Each exports a main module, a ./server integration, and a ./ui component.

HOW ADAPTER SELECTION WORKS ────────────────────────────────────────────────────────────────────── Agent worker starts processing a job Reads AgentConfig from PostgreSQL (one row per agentRole per tenant — e.g. blog-writer for Acme Corp) ├─ AgentConfig.adapterType = "claude_local" │ │ │ └──► packages/adapters/claude-local │ Spawns claude CLI subprocess │ Streams NDJSON on stdout │ Supports --resume (session continuity) │ Supports --add-dir (skills injection) ├─ AgentConfig.adapterType = "codex_local" │ │ │ └──► packages/adapters/codex-local │ Calls OpenAI REST API │ Streams SSE chunks │ Injects skills into system prompt └─ AgentConfig.adapterType = "gemini_local" └──► packages/adapters/gemini-local Calls Gemini REST API Used for AI visibility monitoring All three adapters return the same structure: { text, usage: { inputTokens, outputTokens }, cost, durationMs } The worker never knows or cares which LLM ran — it just gets text back.
PackageBackendUsed for
packages/adapters/claude-localAnthropic Claude (via @anthropic-ai/sdk)Strategy, blog writing, brand-sensitive content
packages/adapters/codex-localOpenAI Codex / GPT (via openai SDK)Research, classification, data-heavy tasks
packages/adapters/gemini-localGoogle GeminiAI visibility monitoring, supplementary tasks

Adapter selection is config-driven per agent role (stored in AgentConfig.adapterType in PostgreSQL). The packages/agents package handles dynamic dispatch — it selects the right adapter at worker startup based on the AgentConfig record for the tenant.

Note: Agents are not external processes that “phone home”. They are BullMQ worker functions that run inside apps/servers/agents, write results directly to PostgreSQL, and enqueue follow-up jobs as needed.


AI Chat (LangGraph)

The /chat page in the dashboard runs a stateful LLM conversation engine built on LangGraph TS.

  • Package: packages/ai-chat
  • Deps: @langchain/langgraph, @langchain/anthropic, @langchain/langgraph-checkpoint-postgres
  • Router: POST /aichat/v1/* (registered in apps/api/src/index.ts)
  • State persistence: LangGraph checkpoints stored in PostgreSQL (ai_chat schema, auto-created on startup)
  • Thread model: Each conversation is a thread (AiChatThread model in Prisma); the LangGraph checkpoint key is the thread ID

Persistence

PostgreSQL (Primary — Prisma ORM)

All application data lives in PostgreSQL. The schema is the single source of truth at packages/db/prisma/schema.prisma (~100+ models, 3500+ lines).

Data categoryExamples
Identity & authTenant, User, TenantMember, Session, Account
Content pipelineActivity, BlogPost, SocialPost, EmailNewsletter, LandingPage, ContentBrief
Approvals & plansDeliverablePlan, DeliverablePeriod, DeliverablePeriodLog
StrategyStrategy, StrategyVersion, ClientContext, ClientContextVersion
Agent infrastructureAgentConfig, AgentRun, Skill, AgentSkill
Channels & integrationsConnectedChannel, ChannelInsight, ChannelMaster
Search terms / SEOSearchTermReport, SearchTermClassification, GSCKeywordSnapshot, AIVisibilitySnapshot
CampaignsCampaign, CampaignSequence, ReviewCampaign, MetaAd, LinkedInAd
Billing & creditsSubscription, Invoice, CreditBalance, CreditLedger, Plan, Offering
RAG / knowledgeRagDataset, RagFile, RagCrawlJob, WebPage, WebCrawlJob
NotificationsNotification, NotificationPreference, PushDeviceToken
Goals & reportsGoal, GoalSnapshot, Report, TenantBaseline
Contacts & leadsContact, Lead, LeadActivity, ContactList
Platform configPlatformSetting, BacklinkDirectory, HelpPageRating
Tenant lifecycleTenantDeletion, TenantDeletionStep

Env: DATABASE_URL (PostgreSQL connection string)

MongoDB (Audit Logs only — Mongoose)

MongoDB is used exclusively for the immutable audit trail via packages/nosqldb.

  • Model: AuditLog (append-only, no updates)
  • Distribution: Redis pub/sub broadcasts audit events to all connected subscribers in real time via startAuditSubscriber()
  • Env: MONGO_URL (default: mongodb://localhost:27017/leadmetrics)

Redis

  • BullMQ backend — all job queues
  • Rate limiting — API rate limiter (300 req/min default, per-IP)
  • Socket.IO adapter — cross-server presence and room broadcasting
  • Env: REDIS_URL
  • Connection variants: persistent (maxRetriesPerRequest: null) for workers; transient (maxRetriesPerRequest: 2) for short-lived contexts

Auth

How Auth Works

LOGIN FLOW ────────────────────────────────────────────────────────────────────── User submits email + password POST /auth/v1/login (Fastify API) ├─ Verifies credentials against PostgreSQL ├─ Issues access token (JWT, 15 min, signed with JWT_SECRET) │ payload: { sub: userId, tenantId, role, appAccess[] } └─ Issues refresh token (JWT, 7 days, signed with JWT_REFRESH_SECRET) Both tokens set as httpOnly cookies (cookie names differ per portal — see table below) REQUEST FLOW (every API call from a portal) ────────────────────────────────────────────────────────────────────── Browser/app sends request Next.js middleware (@leadmetrics/middleware) ├─ Reads access token cookie ├─ Verifies with jose (edge-compatible) ├─ ✅ Token valid → attach payload to request headers → forward └─ ❌ Token expired? POST API /auth/v1/refresh (silent, user never sees this) ├─ ✅ Refresh token valid → new access token issued │ → request forwarded └─ ❌ Refresh token expired → redirect to /login FASTIFY API VALIDATION ────────────────────────────────────────────────────────────────────── Every protected route runs: requireAuth() → verifies JWT, attaches user to request requireTenantUser() → ensures tenantId in JWT matches route param requireSuperAdmin() → role === 'super_admin' only

API (Fastify — apps/api)

JWT-based via jsonwebtoken. All auth logic in apps/api/src/jwt.ts.

TokenPayloadExpiry
Access tokensub (userId), tenantId, role, appAccess[]15 minutes
Refresh tokensub (userId), type: "refresh"7 days
Registration sessionsub (sessionId), type: "reg_session"1 hour

Secrets: JWT_SECRET (access + reg session), JWT_REFRESH_SECRET (refresh only).

Key routes: POST /auth/v1/login, POST /auth/v1/refresh, POST /auth/v1/register, POST /auth/v1/forgot-password, POST /auth/v1/reset-password.

Next.js Portals — Shared Middleware (packages/middleware)

All three portals share createJwtAuthMiddleware from @leadmetrics/middleware. Verification uses jose (not jsonwebtoken) for edge-compatible JWT verification.

PortalAccess cookieRefresh cookie
Dashboarddashboard_access_tokendashboard_refresh_token
DMdm_access_tokendm_refresh_token
Managemanage_access_tokenmanage_refresh_token

Silent refresh: if the access token is expired, middleware transparently calls POST {API_URL}/auth/v1/refresh before forwarding the request. No re-login required unless the refresh token also expires.

Helper functions: setPortalAuthCookies(), clearPortalAuthCookies() — used in server actions and API route handlers.


Tenant Architecture

All data is scoped per tenant in PostgreSQL. The tenantId field is present on every content model and enforced in every query by API middleware.

PostgreSQL ├── Tenant: Acme Digital (SaaS) │ ├── TenantMember records — source of truth for membership │ ├── AgentConfig — per-tenant agent + adapter settings │ ├── Subscription + Plan — billing tier and feature flags │ └── All content (activities, blog posts, channels, ...) scoped by tenantId └── Tenant: Globex Marketing (SaaS) └── (fully isolated — separate rows, same schema)

Tenant isolation rules:

  • Every API handler extracts tenantId from the JWT payload
  • All Prisma queries include where: { tenantId } — no cross-tenant reads
  • BullMQ jobs carry tenantId in the payload; workers verify it before processing

Data Flow — Activity Pipeline

The core content production loop. Every piece of content follows this flow.

FULL LIFECYCLE: DELIVERABLE PLAN → PUBLISHED CONTENT ───────────────────────────────────────────────────────────────────────── ┌───────────────────────────────────────────────────────────────────────┐ │ 1. PLAN APPROVED │ │ Client approves DeliverablePlan on dashboard │ │ DeliverablePlan.status → approved │ └────────────────────────────────┬──────────────────────────────────────┘ ┌────────────────────────────────▼──────────────────────────────────────┐ │ 2. ACTIVITY PLANNER RUNS │ │ agent__activity-planner job dequeued │ │ AI reads the plan → creates Activity records in PostgreSQL │ │ Each activity = one unit of content (one blog, one social post…) │ └────────────────────────────────┬──────────────────────────────────────┘ │ one job per activity ┌────────────────────────────────▼──────────────────────────────────────┐ │ 3. CONTENT AGENT RUNS (e.g. Blog Writer) │ │ │ │ Prompt assembly (buildActivityPrompt): │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ Activity brief (topic, target keyword, format) │ │ │ │ + Client Context File (brand voice, audience, products) │ │ │ │ + Skills (SEO guidelines, tone rules, platform specs) │ │ │ │ + Prior step output (if revision chain) │ │ │ │ + Wake reason (new_task / rejection / review_approved) │ │ │ │ + Reviewer feedback (if this is a retry after rejection) │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ │ LLM call → draft content written │ │ Result saved → BlogPost.content updated │ │ Activity.status → dm_review │ │ Notification → DM reviewer: "new draft ready for review" │ └────────────────────────────────┬──────────────────────────────────────┘ ┌────────────────────────────────▼──────────────────────────────────────┐ │ 4. DM PORTAL REVIEW │ │ │ │ ✅ Approve → Activity.status → active (or client_review) │ │ │ │ ❌ Reject → rejectionFeedback saved to activity │ │ → agent job re-enqueued with wakeReason: "rejection" │ │ → AI retries with feedback threaded into prompt │ │ → new draft → back to dm_review (max 3 retries) │ │ │ │ ✏️ Edit + approve → edited content saved as final, no AI retry │ └────────────────────────────────┬──────────────────────────────────────┘ │ approved ┌────────────────────────────────▼──────────────────────────────────────┐ │ 5. CLIENT APPROVAL (dashboard) │ │ Client sees draft on dashboard │ │ ✅ Approve → Activity.status → client_approved │ │ ❌ Reject → feedback sent back → AI retries │ └────────────────────────────────┬──────────────────────────────────────┘ │ client_approved ┌────────────────────────────────▼──────────────────────────────────────┐ │ 6. PUBLISH TO CHANNEL │ │ Publisher agent retrieves stored OAuth token for target channel │ │ Calls channel API (WordPress / LinkedIn / Instagram / GBP…) │ │ BlogPost.publishedUrl set; Activity.status → published │ │ DeliverablePeriod.completedCount incremented │ │ GoalSnapshot updated │ └────────────────────────────────────────────────────────────────────────┘

Real-Time (Socket.IO)

Socket.IO is registered as a Fastify plugin (apps/api/src/socket/index.ts) with a Redis adapter for cross-server broadcasting.

  • Dashboard: receives live activity status updates, chat messages, notifications
  • Mobile: uses SSE (Server-Sent Events) for streaming, not Socket.IO
  • Rooms: tenant-scoped rooms; presence tracked per connection

Concurrency & Scaling

  • One BullMQ queue per agent role — shared across all tenants
  • Worker concurrency is configurable per process (env var SYNC_WORKER_CONCURRENCY for search-indexer; similar per server)
  • Horizontal scaling: add more apps/servers/agents processes — BullMQ distributes jobs automatically
  • Playwright-based agents (web crawler, directory submitter) run at low concurrency to avoid CPU contention
  • PostgreSQL: Prisma connection pool; MongoDB: Mongoose connection pool
  • Redis: two connection types — persistent (workers) and transient (enqueue-only callers)

Security

ConcernApproach
Tenant isolationtenantId extracted from JWT on every request; every Prisma query scoped
JWT validationAccess token verified in Fastify middleware and Next.js edge middleware
Token rotationRefresh tokens rotated on use; old token invalidated
External API keysEncrypted at rest in PostgreSQL (packages/crypto); decrypted per-job in adapter layer
HITL gateNo publish action fires without DeliverablePlan.status = 'approved'
File uploadsFastify multipart: 10 MB per file, 32 files max
Rate limiting@fastify/rate-limit — 300 req/min default, Redis-backed, per-IP
SecretsDoppler per-environment, per-service — never committed

Packages

PackagePurpose
@leadmetrics/dbPrisma client + schema; only place to new PrismaClient()
@leadmetrics/queueBullMQ queue abstraction; all enqueue* functions
@leadmetrics/middlewareJWT auth middleware + cookie helpers for all Next.js portals
@leadmetrics/agentsAgent worker definitions, prompt builders, adapter dispatch
@leadmetrics/ai-chatLangGraph chat orchestration
@leadmetrics/billingCredit balance, subscription logic
@leadmetrics/nosqldbMongoDB connection, AuditLog model, Redis pub/sub subscriber
@leadmetrics/commonShared types and constants
@leadmetrics/cryptoEncryption/decryption utilities
@leadmetrics/loggerPino + Loki structured logging
@leadmetrics/storageFile/blob storage (DigitalOcean Spaces)
@leadmetrics/uiShared React component library
@leadmetrics/feature-knowledgeKnowledge base / RAG utilities
@leadmetrics/feature-searchTypesense search utilities
packages/adapters/claude-localClaude Code CLI / Anthropic SDK adapter
packages/adapters/codex-localOpenAI Codex adapter
packages/adapters/gemini-localGoogle Gemini adapter
packages/providers/*22 integration providers (email, OAuth, images, search, payments, messaging)

Deployment

SaaS (Cloud)

Single Coolify installation. All tenants share infrastructure; data is isolated at the application layer. Each server process runs as a separate container.

Dev

See dev-reset.ps1 — kills all node processes, starts Docker (Postgres + MongoDB + Redis), recreates 4 databases (dev, api_test, manage_test, dashboard_test), runs seed, flushes Redis, and starts all 8 services.

Dev ports:

  • Dashboard: 3000 · Manage: 3001 · DM: 3002 · API: 3003 · Knowledgebase: 3004
  • agents, billing, notifications, ragengine, reporting, scheduler, search-indexer run as background processes

© 2026 Leadmetrics — Internal use only