Skip to Content
FeaturesRAG Architecture

RAG Architecture

Overview

This document covers the technical design of the RAG subsystem. For the functional requirements and integration spec, see rag-integration.md. For the UI screens, see screens-knowledge-base.md.

The RAG subsystem is implemented natively inside the Leadmetrics monorepo — not as a separate service. It reuses the existing Fastify API, Prisma ORM, BullMQ workers, and Redis infrastructure. The only new infrastructure dependency is Qdrant (vector store), added to Docker Compose.

Core design patterns (adopted from the RAG Manager reference implementation):

  • One Qdrant collection per dataset, named ds_{datasetId}
  • Hybrid search: vector similarity + keyword (BM25) + Reciprocal Rank Fusion
  • BullMQ async ingestion pipeline with three job types
  • provider-qdrant package pattern (mirrors provider-db)
  • feature-knowledge and feature-search feature packages

What Gets Added vs What Already Exists

RAG Manager componentLeadmetrics equivalentStatus
Express.js APIFastify API (apps/api)Exists — add routes
Prisma ORMPrisma ORM (providers/provider-db)Exists — add tables
BullMQ workersBullMQ workers (apps/api workers)Exists — add new queue
RedisRedis (already in Docker Compose)Exists
QdrantQdrantNew — add to Docker Compose
provider-dbproviders/provider-dbExists
provider-loggerproviders/provider-loggingExists
provider-ui-controlspackages/ui (TBD)Exists
feature-authfeatures/feature-usersExists
feature-tenantsfeatures/feature-tenantsExists
feature-datasetsfeatures/feature-knowledgeNew
feature-searchfeatures/feature-searchNew
apps/web RAG settings screensapps/dashboard settings sectionNew screens
Docling sidecar (optional)Same — optional Docling sidecarNew (optional)

New Monorepo Additions

leadmetrics-v3/ ├── providers/ │ └── provider-qdrant/ # NEW — Qdrant client singleton + collection helpers │ ├── src/ │ │ ├── index.ts # Export: qdrantClient, createDatasetCollection(), deleteDatasetCollection() │ │ ├── client.ts # QdrantClient instantiation │ │ ├── collections.ts # createCollection(), deleteCollection(), collectionExists() │ │ └── types.ts # QdrantPayload interface │ └── package.json ├── features/ │ ├── feature-knowledge/ # NEW — Datasets + file management + ingestion queue │ │ ├── src/ │ │ │ ├── index.ts │ │ │ ├── datasets.service.ts # CRUD for rag_datasets; manages Qdrant collections │ │ │ ├── files.service.ts # CRUD for rag_files; enqueues ingestion jobs │ │ │ ├── crawl.service.ts # Enqueue + track website crawl jobs │ │ │ └── knowledge.types.ts │ │ └── package.json │ │ │ └── feature-search/ # NEW — Hybrid search + RRF + reranking │ ├── src/ │ │ ├── index.ts │ │ ├── search.service.ts # Orchestrate vector + keyword + RRF + rerank │ │ ├── embedder.ts # Embed query with dataset's configured model │ │ ├── reranker.service.ts # Cross-encoder reranking (optional) │ │ ├── rrf.ts # Reciprocal Rank Fusion implementation │ │ └── search.types.ts │ └── package.json ├── apps/ │ ├── api/ │ │ └── src/ │ │ ├── routes/ │ │ │ ├── knowledge.routes.ts # NEW — datasets + files endpoints │ │ │ └── search.routes.ts # NEW — search endpoint │ │ └── workers/ │ │ └── rag-ingestion.worker.ts # NEW — BullMQ processor for RAG ingestion │ │ │ └── dashboard/ │ └── app/ │ └── (app)/ │ └── settings/ │ └── knowledge-base/ # NEW — RAG management screens │ ├── page.tsx # Dataset list + file management │ └── sandbox/ │ └── page.tsx # Retrieval sandbox └── docker-compose.yml # Add Qdrant service

Database Schema — New Tables (Prisma)

Added to prisma/schema.prisma:

rag_datasets

model RagDataset { id String @id @default(cuid()) refId String @unique tenantId String name String description String? // Embedding model — immutable after creation (changing would invalidate all vectors) embeddingProvider String // 'openai' | 'ollama' | 'azure_openai' | ... embeddingModel String // 'text-embedding-3-small' | 'nomic-embed-text' | ... vectorSize Int // Derived from model at creation time // Chunking defaults for this dataset chunkSize Int @default(512) chunkOverlap Int @default(64) parseType String @default("NAIVE") // 'NAIVE' | 'MARKDOWN' | 'MANUAL' parserEngine String @default("NODE_NATIVE") // 'NODE_NATIVE' | 'DOCLING' // Role access — which agent roles can query this dataset allowedAgentRoles String[] // e.g. ['copywriter', 'seo_specialist', 'content_researcher'] // Qdrant collection name (ds_{refId}) qdrantCollection String status String @default("active") // 'active' | 'building' | 'error' totalFiles Int @default(0) totalChunks Int @default(0) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt deletedAt DateTime? createdBy String? files RagFile[] crawlJobs RagCrawlJob[] @@index([tenantId]) @@map("rag_datasets") }

rag_files

model RagFile { id String @id @default(cuid()) refId String @unique tenantId String datasetId String fileName String mimeType String fileSizeBytes Int storagePath String // Path on disk / object storage key // Override dataset defaults per file (optional) chunkSize Int? chunkOverlap Int? parseType String? parserEngine String? // Ingestion status status String @default("pending") // 'pending' | 'parsing' | 'embedding' | 'indexed' | 'error' | 'disabled' errorMessage String? chunksCount Int? @default(0) enabled Boolean @default(true) // BullMQ job reference (for status polling) bullJobId String? source String @default("upload") // 'upload' | 'website_crawl' | 'published_content' | 'competitor_research' createdAt DateTime @default(now()) updatedAt DateTime @updatedAt deletedAt DateTime? createdBy String? dataset RagDataset @relation(fields: [datasetId], references: [id]) @@index([datasetId]) @@index([tenantId]) @@map("rag_files") }

rag_crawl_jobs

model RagCrawlJob { id String @id @default(cuid()) tenantId String datasetId String startUrl String maxPages Int @default(200) maxDepth Int @default(3) urlPathFilter String? // e.g. '/blog' — only crawl this path prefix status String @default("queued") // 'queued' | 'running' | 'completed' | 'failed' pagesCrawled Int @default(0) pagesIndexed Int @default(0) pagesFailed Int @default(0) errorMessage String? scheduledFor DateTime? // null = immediate completedAt DateTime? createdAt DateTime @default(now()) dataset RagDataset @relation(fields: [datasetId], references: [id]) @@index([datasetId]) @@map("rag_crawl_jobs") }

Qdrant Collection Design

Identical to RAG Manager. One Qdrant collection per dataset, named ds_{dataset.refId}.

Collection: ds_01HV8MZABC... ├── Vectors: float32[vectorSize], distance: Cosine, on_disk: true │ HNSW index: m=16, ef_construct=100, ef=128 │ (m=16 balances recall vs. index size; ef_construct=100 gives good build quality; │ ef=128 at query time achieves >95% recall@10 at <10ms p99) ├── Payload index: content (text, tokenizer: word) — for keyword search ├── Payload index: tenantId (keyword) — always filtered first (tenant isolation) └── Payload index: enabled (bool) — for filtering disabled files

HNSW parameter guidance:

ParameterValueWhen to increase
m16Raise to 32 if recall < 90% at high collection sizes (> 500k vectors)
ef_construct100Raise for better index quality (at cost of slower build time)
ef (search)128Raise for higher recall; lower for faster queries with acceptable recall tradeoff

Tenant isolation filter: tenantId payload filter is always applied before the vector search. Qdrant evaluates payload filters before scanning the HNSW graph, making cross-tenant leakage impossible at the query layer regardless of application-level bugs.

Point payload schema

interface QdrantPoint { id: string; // UUID — unique per chunk vector: number[]; payload: { fileId: string; // rag_files.id datasetId: string; // rag_datasets.id tenantId: string; // defense-in-depth isolation filter chunkIndex: number; content: string; // Full chunk text (for keyword search + reranking) fileName: string; enabled: boolean; // Mirrors rag_files.enabled — always filter to true source: string; // 'upload' | 'website_crawl' | 'published_content' | 'competitor_research' metadata: { pageNumber?: number; heading?: string; url?: string; // For crawled pages publishedAt?: string; // For published_content }; }; }

Every Qdrant query in feature-search adds a mandatory filter:

{ "must": [ { "key": "tenantId", "match": { "value": "<tenantId>" } }, { "key": "enabled", "match": { "value": true } } ] }

This is never optional.


Ingestion Pipeline

BullMQ queue

New queue added to the existing BullMQ setup in apps/api:

// Queue name: 'rag:ingestion' const ragIngestionQueue = new Queue('rag:ingestion', { connection: redis, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, removeOnComplete: { count: 100 }, removeOnFail: { count: 200 }, timeout: 600_000, // 10 minutes max per file }, });

Worker concurrency: 5 (configurable via RAG_WORKER_CONCURRENCY env var).

Job types

type RagIngestionJobData = | { type: 'file'; fileId: string; tenantId: string; datasetId: string } | { type: 'crawl'; crawlJobId: string; tenantId: string; datasetId: string } | { type: 'content'; content: string; fileId: string; tenantId: string; datasetId: string }; // ^ for published_content feedback loop (content already extracted, skip parse step)

Processor flow

Worker picks up job ├─ type: 'file' ────────────────────────────────────────┐ │ Load file from storage │ │ Parse based on parserEngine: │ │ NODE_NATIVE: pdf-parse (PDF), fs.readFile (TXT/MD)│ │ DOCLING: POST to Docling sidecar HTTP API │ │ → raw text string │ │ │ ├─ type: 'content' ─────────────────────────────────────┤ │ Skip parse step (text already provided) │ │ │ │ Chunk text based on parseType: │ │ NAIVE: fixed-size char splitter (chunkSize, overlap) │ MARKDOWN: heading-aware splitter (H1/H2 boundaries) │ MANUAL: split on '---' delimiter │ │ → string[] │ │ │ │ For each chunk: │ │ Embed using dataset.embeddingModel │ │ OpenAI: POST /embeddings │ │ Ollama: POST /api/embeddings │ │ Batch upsert to Qdrant (batch size: 32 points) │ │ │ │ Update rag_files: status='indexed', chunksCount=N │ │ Update rag_datasets: totalChunks += N │ └───────────────────────────────────────────────────────┘ ├─ type: 'crawl' ───────────────────────────────────────┐ Load ragCrawlJob from DB │ For each page (breadth-first, maxDepth, maxPages): │ Launch Playwright browser │ Fetch page → extract body text (strip nav/footer) │ Extract links (same-domain only) │ Create rag_files record (source: 'website_crawl') │ Enqueue type: 'content' job for the page text │ Update rag_crawl_jobs: pagesCrawled++ │ Update rag_crawl_jobs: status='completed' │ ────────────────────────────────────────────────────┘

Status updates (SSE)

File ingestion status is streamed to the Dashboard UI via SSE. The worker emits events to Redis pub/sub:

Worker emits: PUBLISH rag:status:<tenantId> '{"fileId":"...","status":"embedding","progress":45}' SSE endpoint subscribes to Redis channel for the tenant Dashboard UI receives event → updates file row status bar

Hybrid Search Implementation

Identical to RAG Manager’s design. Implemented in features/feature-search/src/search.service.ts.

Search request

interface RagSearchRequest { tenantId: string; datasetId?: string; // Single dataset — null means all tenant datasets datasetIds?: string[]; // Explicit list of datasets to search query: string; topK?: number; // Default: 5, max: 20 vectorWeight?: number; // 0.0–1.0, default: 0.6. Keyword weight = 1 - vectorWeight rerankerModel?: string; // Optional — enables cross-encoder reranking agentRole?: string; // Used to filter datasets by allowedAgentRoles }

Search algorithm

1. Resolve target datasets - If agentRole provided: filter datasets where allowedAgentRoles includes agentRole - If datasetId provided: use that dataset only - Otherwise: all active tenant datasets 2. For each dataset (in parallel — Promise.all): a. Embed the query using dataset.embeddingModel b. Vector search: Qdrant query_points, using_vector: <embedded_query>, limit: topK * 2 with_payload: true, score_threshold: 0.1 filter: { tenantId: <id>, enabled: true } c. Keyword search: Qdrant query_points, using: sparse (text index on 'content' field) query: { text: queryString } limit: topK * 2 d. Normalize scores: score / max(scores) → [0, 1] e. Apply RRF fusion: for rank r in result list, score = 1 / (60 + r) f. Merge vector + keyword RRF scores with vectorWeight: finalScore = (vectorWeight * vectorRRF) + ((1 - vectorWeight) * keywordRRF) g. Sort by finalScore DESC → top topK * 2 candidates per dataset 3. Merge results across all datasets Apply global RRF across all dataset result lists 4. If rerankerModel provided: POST to reranker service with { query, candidates: top topK*2 chunks } Reranker scores each chunk → re-sort → take top topK 5. Return top topK results with: { text, score, fileId, fileName, datasetId, datasetName, source, metadata }

RRF implementation

// features/feature-search/src/rrf.ts export function reciprocalRankFusion<T extends { id: string }>( rankings: T[][], k = 60, ): Array<T & { rrfScore: number }> { const scores = new Map<string, number>(); const items = new Map<string, T>(); for (const ranking of rankings) { ranking.forEach((item, rank) => { scores.set(item.id, (scores.get(item.id) ?? 0) + 1 / (k + rank + 1)); items.set(item.id, item); }); } return [...scores.entries()] .sort((a, b) => b[1] - a[1]) .map(([id, rrfScore]) => ({ ...items.get(id)!, rrfScore })); }

API Routes (Fastify)

New routes added to apps/api/src/routes/:

knowledge.routes.ts

GET /api/knowledge/datasets List all datasets for tenant POST /api/knowledge/datasets Create dataset GET /api/knowledge/datasets/:id Get dataset PATCH /api/knowledge/datasets/:id Update dataset (name, description only) DELETE /api/knowledge/datasets/:id Delete dataset + Qdrant collection GET /api/knowledge/datasets/:id/files List files in dataset POST /api/knowledge/datasets/:id/files Upload file(s) (multipart, max 32 files) PATCH /api/knowledge/datasets/:id/files/:fileId Toggle enabled/disabled DELETE /api/knowledge/datasets/:id/files/:fileId Delete file + Qdrant vectors POST /api/knowledge/datasets/:id/crawl Trigger website crawl GET /api/knowledge/datasets/:id/crawl/status Get active crawl job status GET /api/knowledge/sse/status SSE stream — file ingestion status updates

search.routes.ts

POST /api/search Search across datasets (org-wide or multi-dataset) POST /api/knowledge/datasets/:id/search Search within a single dataset

All routes require tenantId from JWT session. The tenantId is injected by the auth middleware and is always applied to all DB + Qdrant queries.


rag_search Tool (Agent Integration)

The rag_search tool in features/feature-search/src/tool.ts calls the search service directly (in-process — not an HTTP hop):

// Registered in packages/integrations/src/tool-dispatcher.ts alongside other tools export const ragSearchTool: ToolDefinition = { name: 'rag_search', description: `Search the client's knowledge base for specific information. Returns the top matching text chunks. Use when you need specific facts from the client's documents, website, published content, or competitor research that are not in your injected context.`, inputSchema: { type: 'object', properties: { query: { type: 'string', description: 'Natural language query.' }, dataset: { type: 'string', enum: ['client_docs', 'website_content', 'published_content', 'competitor_content'] }, topK: { type: 'number', default: 5, description: 'Chunks to return (max 20).' }, }, required: ['query', 'dataset'], }, execute: async ({ query, dataset, topK = 5 }, context) => { // context.tenantId, context.agentRole injected by tool-dispatcher const datasetRecord = await db.query.ragDatasets.findFirst({ where: and( eq(ragDatasets.tenantId, context.tenantId), eq(ragDatasets.name, dataset), isNull(ragDatasets.deletedOn), ), }); if (!datasetRecord) return { error: `Dataset "${dataset}" not found for this tenant.` }; // Privacy guard: competitor_content must only be called from content_researcher if (dataset === 'competitor_content' && context.agentRole !== 'content_researcher') { return { error: 'competitor_content dataset is only accessible to the Content Researcher agent.' }; } const results = await searchService.search({ tenantId: context.tenantId, datasetId: datasetRecord.id, query, topK, rerankerModel: datasetRecord.rerankerModel ?? undefined, agentRole: context.agentRole, }); return { chunks: results.map(r => ({ text: r.text, score: r.score, source: r.fileName, dataset: dataset, })), }; }, };

Docker Compose Addition

Add to docker-compose.yml:

qdrant: image: qdrant/qdrant:latest container_name: leadmetrics-qdrant restart: unless-stopped ports: - "6333:6333" # REST API - "6334:6334" # gRPC volumes: - qdrant_data:/qdrant/storage environment: QDRANT__SERVICE__GRPC_PORT: "6334" # Optional — high-accuracy document parsing docling: image: ghcr.io/ds4sd/docling-serve:latest container_name: leadmetrics-docling restart: unless-stopped ports: - "5001:5001" profiles: - docling # Only starts if --profile docling is passed

Add to volumes: section:

qdrant_data:

New Environment Variables

# === RAG / Qdrant === QDRANT_URL=http://qdrant:6333 QDRANT_API_KEY= # empty for local Docker # === RAG Worker === RAG_WORKER_CONCURRENCY=5 RAG_UPLOAD_DIR=./uploads/rag # relative to apps/api # === Embedding models === RAG_DEFAULT_EMBEDDING_PROVIDER=openai RAG_DEFAULT_EMBEDDING_MODEL=text-embedding-3-small RAG_LOCAL_EMBEDDING_MODEL=nomic-embed-text # Ollama — for privacy-sensitive datasets # === Reranking (optional) === RAG_RERANKER_MODEL=BAAI/bge-reranker-v2-m3 RAG_RERANKER_URL=http://ollama:11434 # Run reranker locally via Ollama # === Docling (optional high-accuracy parser) === DOCLING_URL= # empty = disabled; set to http://docling:5001 to enable

Standard Datasets Per Tenant

When a tenant is created (feature-tenants provisioning), four standard datasets are automatically created:

// Source of truth: packages/feature-knowledge/src/knowledge.types.ts const STANDARD_DATASETS = [ { name: 'client_docs', allowedAgentRoles: [ // Content workers 'keyword-researcher', 'content-brief-writer', 'gbp-post-writer', 'social-post-writer', 'social-calendar-planner', 'email-writer', 'landing-page-writer', 'google-ads-writer', 'meta-ads-writer', 'report-writer', 'review-response-writer', 'backlink-outreach-writer', // Dedicated workers 'blog-writer', 'strategy-writer', // Legacy role names (backwards compat) 'activity_planner', 'seo_specialist', 'copywriter', 'social_media_manager', 'paid_ads_manager', 'strategy', ], useLocalEmbedding: false, }, { name: 'website_content', allowedAgentRoles: [ 'keyword-researcher', 'content-brief-writer', 'landing-page-writer', 'topic-researcher', 'research-note-writer', 'site-auditor', 'backlink-researcher', 'blog-writer', 'strategy-writer', // Legacy 'activity_planner', 'seo_specialist', 'copywriter', 'paid_ads_manager', 'strategy', ], useLocalEmbedding: false, }, { name: 'published_content', allowedAgentRoles: [ 'social-post-writer', 'social-calendar-planner', 'blog-writer', 'report-writer', 'ads-analyst', 'anomaly-detector', 'strategy-writer', // Legacy 'activity_planner', 'seo_specialist', 'copywriter', 'social_media_manager', 'data_analyst', ], useLocalEmbedding: false, }, { name: 'competitor_content', allowedAgentRoles: ['topic-researcher', 'research-note-writer', 'strategy-writer', 'content_researcher'], useLocalEmbedding: true, // ALWAYS local — never cloud embedding }, ];

File Type Support

Same as RAG Manager:

ExtensionMIME TypeNODE_NATIVEDOCLING
.pdfapplication/pdfpdf-parse
.txttext/plainfs.readFile
.mdtext/markdownfs.readFile
.docxapplication/vnd.openxmlformats-officedocument…mammoth
.csvtext/csvcsv-parse

Max file size: 10 MB. Max files per upload: 32.


UI Screens

Knowledge Base screens live in apps/dashboard under /settings/knowledge-base/. Full screen specifications are in screens-knowledge-base.md (KB1–KB6).

© 2026 Leadmetrics — Internal use only