Architecture — Leadmetrics v3
Core Concept: The Control Plane
This system is a control plane for humans and AI agents to collaborate on digital marketing work.
The control plane does not execute LLM calls directly in the API layer. It orchestrates them — enqueuing work, routing jobs to the right agent worker, enforcing approval gates, and aggregating results.
Agent workers run inside dedicated server processes (apps/servers/agents), consuming BullMQ jobs from shared queues. The API layer enqueues jobs; workers process them and write results back to PostgreSQL. This keeps the API stateless and lets the agent layer scale independently.
Apps & Portals
| App | Port | Audience | Tech |
|---|---|---|---|
apps/dashboard | 3000 | Client/tenant users | Next.js 15.3, React 19, TailwindCSS |
apps/manage | 3001 | Super-admins (platform ops) | Next.js 15.3, React 19, TailwindCSS |
apps/dm | 3002 | Digital marketing team (internal) | Next.js 15.3, React 19, TailwindCSS |
apps/api | 3003 | All portals + mobile | Fastify, TypeScript, Socket.IO |
apps/knowledgebase | 3004 | Docs site | Nextra 4.0 (Next.js) |
apps/dashboard-mobile | — | Client/tenant users (iOS/Android) | React Native 0.83, Expo 55 |
High-Level Architecture
CLIENTS & DM TEAM PLATFORM ADMINS
───────────────────────────────────────────── ───────────────
┌──────────────┐ ┌──────────────┐ ┌─────────┐ ┌────────────┐
│ Dashboard │ │ DM Portal │ │ Mobile │ │ Manage │
│ :3000 │ │ :3002 │ │ iOS/And │ │ :3001 │
│ Next.js 15 │ │ Next.js 15 │ │ Expo │ │ Next.js 15 │
└──────┬───────┘ └──────┬───────┘ └────┬────┘ └─────┬──────┘
│ │ │ │
└────────────────┴──────────────┴───────────────┘
│
HTTPS · JWT auth
REST · SSE · Socket.IO
│
┌─────────────▼──────────────────┐
│ API (Fastify :3003) │
│ │
│ /auth/v1 /tenant/v1 /dm/v1 │
│ /admin/v1 /aichat/v1 │
│ Swagger docs · Rate limiting │
│ Socket.IO (real-time events) │
└───┬──────────────────────────┬──┘
│ reads/writes │ enqueues jobs
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ PostgreSQL │ │ Redis │
│ (Prisma ORM) │ │ │
│ │◄────►│ BullMQ job queues │
│ All app data │ │ Rate limiting │
│ 100+ models │ │ Socket.IO rooms │
└─────────────────────┘ └──────────┬──────────┘
▲ │ dequeue jobs
│ write results │
│ ┌──────────────▼──────────────────┐
│ │ WORKER SERVERS │
│ │ │
│ │ ┌─────────────────────────┐ │
└──────────────┤ │ agents server │ │
│ │ 40+ AI agent workers │ │
│ │ Claude · OpenAI · Gem. │ │
│ └─────────────────────────┘ │
│ ┌──────────┐ ┌─────────────┐ │
│ │ ragengine│ │notifications│ │
│ └──────────┘ └─────────────┘ │
│ ┌──────────┐ ┌─────────────┐ │
│ │ billing │ │search-index │ │
│ └──────────┘ └─────────────┘ │
│ ┌──────────┐ ┌─────────────┐ │
│ │reporting │ │ scheduler │ │
│ └──────────┘ └─────────────┘ │
└──────────────────────────────────┘
│
│ (audit logs only)
▼
┌─────────────────────┐
│ MongoDB │
│ Immutable audit │
│ trail + Redis │
│ pub/sub events │
└─────────────────────┘Server Processes
Seven long-running Node.js processes handle all async work. Each connects directly to PostgreSQL and Redis.
| Server | Purpose | Queue(s) |
|---|---|---|
apps/servers/agents | 40+ AI agent workers — blog-writer, social-post-writer, strategy-writer, insights workers, SEO, campaign, RAG, etc. | agent__{role} (one per role) |
apps/servers/billing | Monthly invoicing, overdue lockout, credit resets | billing__invoice, billing__overdue, billing__credits-reset |
apps/servers/notifications | Multi-channel notification dispatch (email, SMS, WhatsApp, Telegram, web push) | notifications__{channel} |
apps/servers/ragengine | RAG document ingestion — file parsing, text extraction, embedding, vector DB indexing | rag__ingestion, agent__tenant-web-crawler |
apps/servers/reporting | Daily automated reports sent at 22:00 local time (tenant + admin summaries) | None (uses node-cron → enqueueNotification()) |
apps/servers/scheduler | Polls ScheduledTask table every 15 min; executes one-off tasks (signup reminders, alerts) | None (DB-poll pattern) |
apps/servers/search-indexer | Syncs content to Typesense for full-text search across 13 collections | search__sync |
Queue System (BullMQ)
All async work flows through BullMQ backed by Redis. Queues are shared across all tenants — one queue per agent role. Tenant isolation is enforced by tenantId in the job payload.
HOW A JOB FLOWS FROM API TO COMPLETION
───────────────────────────────────────────────────────────────
1. API ENQUEUES
┌───────────────────────────────────────────────────────┐
│ POST /tenant/v1/blog/generate │
│ → enqueueBlogWriter({ tenantId, activityId, ... }) │
│ → Job added to Redis queue: "agent__blog-writer" │
│ → API responds 202 immediately (fire-and-forget) │
└───────────────────────────┬───────────────────────────┘
│
2. WORKER DEQUEUES
┌───────────────────────────▼───────────────────────────┐
│ agents server is listening on "agent__blog-writer" │
│ BullMQ delivers the job to the next free worker │
│ (BullMQ guarantees exactly-once delivery) │
└───────────────────────────┬───────────────────────────┘
│
3. WORKER PREPARES
┌───────────────────────────▼───────────────────────────┐
│ Worker loads from PostgreSQL: │
│ • Activity record (brief, topic, prior output) │
│ • AgentConfig (which LLM adapter to use) │
│ • Client context file + applicable skills │
│ • Any reviewer feedback from previous rejections │
│ Builds the full prompt via buildActivityPrompt() │
└───────────────────────────┬───────────────────────────┘
│
4. LLM CALL (via adapter)
┌───────────────────────────▼───────────────────────────┐
│ Dispatches to the configured LLM: │
│ │
│ claude-local → spawns claude CLI subprocess │
│ streams NDJSON output │
│ │
│ codex-local → calls OpenAI REST API │
│ streams SSE chunks │
│ │
│ gemini-local → calls Gemini REST API │
│ streams response │
└───────────────────────────┬───────────────────────────┘
│
5. WORKER SAVES & ADVANCES
┌───────────────────────────▼───────────────────────────┐
│ Writes result to PostgreSQL: │
│ • BlogPost.content updated │
│ • Activity.status → dm_review │
│ • AgentRun logged (tokens, cost, duration) │
│ │
│ Enqueues follow-up jobs: │
│ • enqueueSearchSync() → search-indexer │
│ • enqueueNotification() → notifications server │
│ (notifies DM reviewer: "new draft ready") │
└───────────────────────────────────────────────────────┘Queue naming: agent__{agentRole} — e.g. agent__blog-writer, agent__strategy-writer, agent__setup.
Common enqueue functions (all from @leadmetrics/queue):
| Function | Queue | Trigger |
|---|---|---|
enqueueSetupChain() | agent__setup | Onboarding wizard completion |
enqueueBlogWriter() | agent__blog-writer | New blog activity or brief generation |
enqueueSocialPostWriter() | agent__social-post-writer | Social post creation |
enqueueStrategyWriter() | agent__strategy-writer | Strategy revision |
enqueueNotification() | notifications__{channel} | Any event requiring user notification |
enqueueRagFile() | rag__ingestion | Document upload |
enqueueSearchSync() | search__sync | Content create/update |
Job config defaults: 4 retry attempts, exponential backoff (5s initial), completed jobs pruned after 100, failed after 50.
Adapter Layer (LLM Backends)
Three LLM adapters ship as independent packages. Each exports a main module, a ./server integration, and a ./ui component.
HOW ADAPTER SELECTION WORKS
──────────────────────────────────────────────────────────────────────
Agent worker starts processing a job
│
▼
Reads AgentConfig from PostgreSQL
(one row per agentRole per tenant — e.g. blog-writer for Acme Corp)
│
├─ AgentConfig.adapterType = "claude_local"
│ │
│ └──► packages/adapters/claude-local
│ Spawns claude CLI subprocess
│ Streams NDJSON on stdout
│ Supports --resume (session continuity)
│ Supports --add-dir (skills injection)
│
├─ AgentConfig.adapterType = "codex_local"
│ │
│ └──► packages/adapters/codex-local
│ Calls OpenAI REST API
│ Streams SSE chunks
│ Injects skills into system prompt
│
└─ AgentConfig.adapterType = "gemini_local"
│
└──► packages/adapters/gemini-local
Calls Gemini REST API
Used for AI visibility monitoring
All three adapters return the same structure:
{ text, usage: { inputTokens, outputTokens }, cost, durationMs }
The worker never knows or cares which LLM ran — it just gets text back.| Package | Backend | Used for |
|---|---|---|
packages/adapters/claude-local | Anthropic Claude (via @anthropic-ai/sdk) | Strategy, blog writing, brand-sensitive content |
packages/adapters/codex-local | OpenAI Codex / GPT (via openai SDK) | Research, classification, data-heavy tasks |
packages/adapters/gemini-local | Google Gemini | AI visibility monitoring, supplementary tasks |
Adapter selection is config-driven per agent role (stored in AgentConfig.adapterType in PostgreSQL). The packages/agents package handles dynamic dispatch — it selects the right adapter at worker startup based on the AgentConfig record for the tenant.
Note: Agents are not external processes that “phone home”. They are BullMQ worker functions that run inside apps/servers/agents, write results directly to PostgreSQL, and enqueue follow-up jobs as needed.
AI Chat (LangGraph)
The /chat page in the dashboard runs a stateful LLM conversation engine built on LangGraph TS.
- Package:
packages/ai-chat - Deps:
@langchain/langgraph,@langchain/anthropic,@langchain/langgraph-checkpoint-postgres - Router:
POST /aichat/v1/*(registered inapps/api/src/index.ts) - State persistence: LangGraph checkpoints stored in PostgreSQL (
ai_chatschema, auto-created on startup) - Thread model: Each conversation is a thread (
AiChatThreadmodel in Prisma); the LangGraph checkpoint key is the thread ID
Persistence
PostgreSQL (Primary — Prisma ORM)
All application data lives in PostgreSQL. The schema is the single source of truth at packages/db/prisma/schema.prisma (~100+ models, 3500+ lines).
| Data category | Examples |
|---|---|
| Identity & auth | Tenant, User, TenantMember, Session, Account |
| Content pipeline | Activity, BlogPost, SocialPost, EmailNewsletter, LandingPage, ContentBrief |
| Approvals & plans | DeliverablePlan, DeliverablePeriod, DeliverablePeriodLog |
| Strategy | Strategy, StrategyVersion, ClientContext, ClientContextVersion |
| Agent infrastructure | AgentConfig, AgentRun, Skill, AgentSkill |
| Channels & integrations | ConnectedChannel, ChannelInsight, ChannelMaster |
| Search terms / SEO | SearchTermReport, SearchTermClassification, GSCKeywordSnapshot, AIVisibilitySnapshot |
| Campaigns | Campaign, CampaignSequence, ReviewCampaign, MetaAd, LinkedInAd |
| Billing & credits | Subscription, Invoice, CreditBalance, CreditLedger, Plan, Offering |
| RAG / knowledge | RagDataset, RagFile, RagCrawlJob, WebPage, WebCrawlJob |
| Notifications | Notification, NotificationPreference, PushDeviceToken |
| Goals & reports | Goal, GoalSnapshot, Report, TenantBaseline |
| Contacts & leads | Contact, Lead, LeadActivity, ContactList |
| Platform config | PlatformSetting, BacklinkDirectory, HelpPageRating |
| Tenant lifecycle | TenantDeletion, TenantDeletionStep |
Env: DATABASE_URL (PostgreSQL connection string)
MongoDB (Audit Logs only — Mongoose)
MongoDB is used exclusively for the immutable audit trail via packages/nosqldb.
- Model:
AuditLog(append-only, no updates) - Distribution: Redis pub/sub broadcasts audit events to all connected subscribers in real time via
startAuditSubscriber() - Env:
MONGO_URL(default:mongodb://localhost:27017/leadmetrics)
Redis
- BullMQ backend — all job queues
- Rate limiting — API rate limiter (300 req/min default, per-IP)
- Socket.IO adapter — cross-server presence and room broadcasting
- Env:
REDIS_URL - Connection variants: persistent (
maxRetriesPerRequest: null) for workers; transient (maxRetriesPerRequest: 2) for short-lived contexts
Auth
How Auth Works
LOGIN FLOW
──────────────────────────────────────────────────────────────────────
User submits email + password
│
▼
POST /auth/v1/login (Fastify API)
│
├─ Verifies credentials against PostgreSQL
│
├─ Issues access token (JWT, 15 min, signed with JWT_SECRET)
│ payload: { sub: userId, tenantId, role, appAccess[] }
│
└─ Issues refresh token (JWT, 7 days, signed with JWT_REFRESH_SECRET)
│
▼
Both tokens set as httpOnly cookies
(cookie names differ per portal — see table below)
REQUEST FLOW (every API call from a portal)
──────────────────────────────────────────────────────────────────────
Browser/app sends request
│
▼
Next.js middleware (@leadmetrics/middleware)
│
├─ Reads access token cookie
│
├─ Verifies with jose (edge-compatible)
│
├─ ✅ Token valid → attach payload to request headers → forward
│
└─ ❌ Token expired?
│
▼
POST API /auth/v1/refresh (silent, user never sees this)
│
├─ ✅ Refresh token valid → new access token issued
│ → request forwarded
│
└─ ❌ Refresh token expired → redirect to /login
FASTIFY API VALIDATION
──────────────────────────────────────────────────────────────────────
Every protected route runs:
requireAuth() → verifies JWT, attaches user to request
requireTenantUser() → ensures tenantId in JWT matches route param
requireSuperAdmin() → role === 'super_admin' onlyAPI (Fastify — apps/api)
JWT-based via jsonwebtoken. All auth logic in apps/api/src/jwt.ts.
| Token | Payload | Expiry |
|---|---|---|
| Access token | sub (userId), tenantId, role, appAccess[] | 15 minutes |
| Refresh token | sub (userId), type: "refresh" | 7 days |
| Registration session | sub (sessionId), type: "reg_session" | 1 hour |
Secrets: JWT_SECRET (access + reg session), JWT_REFRESH_SECRET (refresh only).
Key routes: POST /auth/v1/login, POST /auth/v1/refresh, POST /auth/v1/register, POST /auth/v1/forgot-password, POST /auth/v1/reset-password.
Next.js Portals — Shared Middleware (packages/middleware)
All three portals share createJwtAuthMiddleware from @leadmetrics/middleware. Verification uses jose (not jsonwebtoken) for edge-compatible JWT verification.
| Portal | Access cookie | Refresh cookie |
|---|---|---|
| Dashboard | dashboard_access_token | dashboard_refresh_token |
| DM | dm_access_token | dm_refresh_token |
| Manage | manage_access_token | manage_refresh_token |
Silent refresh: if the access token is expired, middleware transparently calls POST {API_URL}/auth/v1/refresh before forwarding the request. No re-login required unless the refresh token also expires.
Helper functions: setPortalAuthCookies(), clearPortalAuthCookies() — used in server actions and API route handlers.
Tenant Architecture
All data is scoped per tenant in PostgreSQL. The tenantId field is present on every content model and enforced in every query by API middleware.
PostgreSQL
├── Tenant: Acme Digital (SaaS)
│ ├── TenantMember records — source of truth for membership
│ ├── AgentConfig — per-tenant agent + adapter settings
│ ├── Subscription + Plan — billing tier and feature flags
│ └── All content (activities, blog posts, channels, ...) scoped by tenantId
│
└── Tenant: Globex Marketing (SaaS)
└── (fully isolated — separate rows, same schema)Tenant isolation rules:
- Every API handler extracts
tenantIdfrom the JWT payload - All Prisma queries include
where: { tenantId }— no cross-tenant reads - BullMQ jobs carry
tenantIdin the payload; workers verify it before processing
Data Flow — Activity Pipeline
The core content production loop. Every piece of content follows this flow.
FULL LIFECYCLE: DELIVERABLE PLAN → PUBLISHED CONTENT
─────────────────────────────────────────────────────────────────────────
┌───────────────────────────────────────────────────────────────────────┐
│ 1. PLAN APPROVED │
│ Client approves DeliverablePlan on dashboard │
│ DeliverablePlan.status → approved │
└────────────────────────────────┬──────────────────────────────────────┘
│
┌────────────────────────────────▼──────────────────────────────────────┐
│ 2. ACTIVITY PLANNER RUNS │
│ agent__activity-planner job dequeued │
│ AI reads the plan → creates Activity records in PostgreSQL │
│ Each activity = one unit of content (one blog, one social post…) │
└────────────────────────────────┬──────────────────────────────────────┘
│ one job per activity
┌────────────────────────────────▼──────────────────────────────────────┐
│ 3. CONTENT AGENT RUNS (e.g. Blog Writer) │
│ │
│ Prompt assembly (buildActivityPrompt): │
│ ┌────────────────────────────────────────────────────────────┐ │
│ │ Activity brief (topic, target keyword, format) │ │
│ │ + Client Context File (brand voice, audience, products) │ │
│ │ + Skills (SEO guidelines, tone rules, platform specs) │ │
│ │ + Prior step output (if revision chain) │ │
│ │ + Wake reason (new_task / rejection / review_approved) │ │
│ │ + Reviewer feedback (if this is a retry after rejection) │ │
│ └────────────────────────────────────────────────────────────┘ │
│ │
│ LLM call → draft content written │
│ Result saved → BlogPost.content updated │
│ Activity.status → dm_review │
│ Notification → DM reviewer: "new draft ready for review" │
└────────────────────────────────┬──────────────────────────────────────┘
│
┌────────────────────────────────▼──────────────────────────────────────┐
│ 4. DM PORTAL REVIEW │
│ │
│ ✅ Approve → Activity.status → active (or client_review) │
│ │
│ ❌ Reject → rejectionFeedback saved to activity │
│ → agent job re-enqueued with wakeReason: "rejection" │
│ → AI retries with feedback threaded into prompt │
│ → new draft → back to dm_review (max 3 retries) │
│ │
│ ✏️ Edit + approve → edited content saved as final, no AI retry │
└────────────────────────────────┬──────────────────────────────────────┘
│ approved
┌────────────────────────────────▼──────────────────────────────────────┐
│ 5. CLIENT APPROVAL (dashboard) │
│ Client sees draft on dashboard │
│ ✅ Approve → Activity.status → client_approved │
│ ❌ Reject → feedback sent back → AI retries │
└────────────────────────────────┬──────────────────────────────────────┘
│ client_approved
┌────────────────────────────────▼──────────────────────────────────────┐
│ 6. PUBLISH TO CHANNEL │
│ Publisher agent retrieves stored OAuth token for target channel │
│ Calls channel API (WordPress / LinkedIn / Instagram / GBP…) │
│ BlogPost.publishedUrl set; Activity.status → published │
│ DeliverablePeriod.completedCount incremented │
│ GoalSnapshot updated │
└────────────────────────────────────────────────────────────────────────┘Real-Time (Socket.IO)
Socket.IO is registered as a Fastify plugin (apps/api/src/socket/index.ts) with a Redis adapter for cross-server broadcasting.
- Dashboard: receives live activity status updates, chat messages, notifications
- Mobile: uses SSE (Server-Sent Events) for streaming, not Socket.IO
- Rooms: tenant-scoped rooms; presence tracked per connection
Concurrency & Scaling
- One BullMQ queue per agent role — shared across all tenants
- Worker concurrency is configurable per process (env var
SYNC_WORKER_CONCURRENCYfor search-indexer; similar per server) - Horizontal scaling: add more
apps/servers/agentsprocesses — BullMQ distributes jobs automatically - Playwright-based agents (web crawler, directory submitter) run at low concurrency to avoid CPU contention
- PostgreSQL: Prisma connection pool; MongoDB: Mongoose connection pool
- Redis: two connection types — persistent (workers) and transient (enqueue-only callers)
Security
| Concern | Approach |
|---|---|
| Tenant isolation | tenantId extracted from JWT on every request; every Prisma query scoped |
| JWT validation | Access token verified in Fastify middleware and Next.js edge middleware |
| Token rotation | Refresh tokens rotated on use; old token invalidated |
| External API keys | Encrypted at rest in PostgreSQL (packages/crypto); decrypted per-job in adapter layer |
| HITL gate | No publish action fires without DeliverablePlan.status = 'approved' |
| File uploads | Fastify multipart: 10 MB per file, 32 files max |
| Rate limiting | @fastify/rate-limit — 300 req/min default, Redis-backed, per-IP |
| Secrets | Doppler per-environment, per-service — never committed |
Packages
| Package | Purpose |
|---|---|
@leadmetrics/db | Prisma client + schema; only place to new PrismaClient() |
@leadmetrics/queue | BullMQ queue abstraction; all enqueue* functions |
@leadmetrics/middleware | JWT auth middleware + cookie helpers for all Next.js portals |
@leadmetrics/agents | Agent worker definitions, prompt builders, adapter dispatch |
@leadmetrics/ai-chat | LangGraph chat orchestration |
@leadmetrics/billing | Credit balance, subscription logic |
@leadmetrics/nosqldb | MongoDB connection, AuditLog model, Redis pub/sub subscriber |
@leadmetrics/common | Shared types and constants |
@leadmetrics/crypto | Encryption/decryption utilities |
@leadmetrics/logger | Pino + Loki structured logging |
@leadmetrics/storage | File/blob storage (DigitalOcean Spaces) |
@leadmetrics/ui | Shared React component library |
@leadmetrics/feature-knowledge | Knowledge base / RAG utilities |
@leadmetrics/feature-search | Typesense search utilities |
packages/adapters/claude-local | Claude Code CLI / Anthropic SDK adapter |
packages/adapters/codex-local | OpenAI Codex adapter |
packages/adapters/gemini-local | Google Gemini adapter |
packages/providers/* | 22 integration providers (email, OAuth, images, search, payments, messaging) |
Deployment
SaaS (Cloud)
Single Coolify installation. All tenants share infrastructure; data is isolated at the application layer. Each server process runs as a separate container.
Dev
See dev-reset.ps1 — kills all node processes, starts Docker (Postgres + MongoDB + Redis), recreates 4 databases (dev, api_test, manage_test, dashboard_test), runs seed, flushes Redis, and starts all 8 services.
Dev ports:
- Dashboard: 3000 · Manage: 3001 · DM: 3002 · API: 3003 · Knowledgebase: 3004
- agents, billing, notifications, ragengine, reporting, scheduler, search-indexer run as background processes