RAG Architecture
Overview
This document covers the technical design of the RAG subsystem. For the functional requirements and integration spec, see rag-integration.md. For the UI screens, see screens-knowledge-base.md.
The RAG subsystem is implemented natively inside the Leadmetrics monorepo — not as a separate service. It reuses the existing Fastify API, Prisma ORM, BullMQ workers, and Redis infrastructure. The only new infrastructure dependency is Qdrant (vector store), added to Docker Compose.
Core design patterns (adopted from the RAG Manager reference implementation):
- One Qdrant collection per dataset, named
ds_{datasetId} - Hybrid search: vector similarity + keyword (BM25) + Reciprocal Rank Fusion
- BullMQ async ingestion pipeline with three job types
provider-qdrantpackage pattern (mirrorsprovider-db)feature-knowledgeandfeature-searchfeature packages
What Gets Added vs What Already Exists
| RAG Manager component | Leadmetrics equivalent | Status |
|---|---|---|
| Express.js API | Fastify API (apps/api) | Exists — add routes |
| Prisma ORM | Prisma ORM (providers/provider-db) | Exists — add tables |
| BullMQ workers | BullMQ workers (apps/api workers) | Exists — add new queue |
| Redis | Redis (already in Docker Compose) | Exists |
| Qdrant | Qdrant | New — add to Docker Compose |
provider-db | providers/provider-db | Exists |
provider-logger | providers/provider-logging | Exists |
provider-ui-controls | packages/ui (TBD) | Exists |
feature-auth | features/feature-users | Exists |
feature-tenants | features/feature-tenants | Exists |
feature-datasets | features/feature-knowledge | New |
feature-search | features/feature-search | New |
apps/web RAG settings screens | apps/dashboard settings section | New screens |
| Docling sidecar (optional) | Same — optional Docling sidecar | New (optional) |
New Monorepo Additions
leadmetrics-v3/
│
├── providers/
│ └── provider-qdrant/ # NEW — Qdrant client singleton + collection helpers
│ ├── src/
│ │ ├── index.ts # Export: qdrantClient, createDatasetCollection(), deleteDatasetCollection()
│ │ ├── client.ts # QdrantClient instantiation
│ │ ├── collections.ts # createCollection(), deleteCollection(), collectionExists()
│ │ └── types.ts # QdrantPayload interface
│ └── package.json
│
├── features/
│ ├── feature-knowledge/ # NEW — Datasets + file management + ingestion queue
│ │ ├── src/
│ │ │ ├── index.ts
│ │ │ ├── datasets.service.ts # CRUD for rag_datasets; manages Qdrant collections
│ │ │ ├── files.service.ts # CRUD for rag_files; enqueues ingestion jobs
│ │ │ ├── crawl.service.ts # Enqueue + track website crawl jobs
│ │ │ └── knowledge.types.ts
│ │ └── package.json
│ │
│ └── feature-search/ # NEW — Hybrid search + RRF + reranking
│ ├── src/
│ │ ├── index.ts
│ │ ├── search.service.ts # Orchestrate vector + keyword + RRF + rerank
│ │ ├── embedder.ts # Embed query with dataset's configured model
│ │ ├── reranker.service.ts # Cross-encoder reranking (optional)
│ │ ├── rrf.ts # Reciprocal Rank Fusion implementation
│ │ └── search.types.ts
│ └── package.json
│
├── apps/
│ ├── api/
│ │ └── src/
│ │ ├── routes/
│ │ │ ├── knowledge.routes.ts # NEW — datasets + files endpoints
│ │ │ └── search.routes.ts # NEW — search endpoint
│ │ └── workers/
│ │ └── rag-ingestion.worker.ts # NEW — BullMQ processor for RAG ingestion
│ │
│ └── dashboard/
│ └── app/
│ └── (app)/
│ └── settings/
│ └── knowledge-base/ # NEW — RAG management screens
│ ├── page.tsx # Dataset list + file management
│ └── sandbox/
│ └── page.tsx # Retrieval sandbox
│
└── docker-compose.yml # Add Qdrant serviceDatabase Schema — New Tables (Prisma)
Added to prisma/schema.prisma:
rag_datasets
model RagDataset {
id String @id @default(cuid())
refId String @unique
tenantId String
name String
description String?
// Embedding model — immutable after creation (changing would invalidate all vectors)
embeddingProvider String // 'openai' | 'ollama' | 'azure_openai' | ...
embeddingModel String // 'text-embedding-3-small' | 'nomic-embed-text' | ...
vectorSize Int // Derived from model at creation time
// Chunking defaults for this dataset
chunkSize Int @default(512)
chunkOverlap Int @default(64)
parseType String @default("NAIVE") // 'NAIVE' | 'MARKDOWN' | 'MANUAL'
parserEngine String @default("NODE_NATIVE") // 'NODE_NATIVE' | 'DOCLING'
// Role access — which agent roles can query this dataset
allowedAgentRoles String[] // e.g. ['copywriter', 'seo_specialist', 'content_researcher']
// Qdrant collection name (ds_{refId})
qdrantCollection String
status String @default("active") // 'active' | 'building' | 'error'
totalFiles Int @default(0)
totalChunks Int @default(0)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
deletedAt DateTime?
createdBy String?
files RagFile[]
crawlJobs RagCrawlJob[]
@@index([tenantId])
@@map("rag_datasets")
}rag_files
model RagFile {
id String @id @default(cuid())
refId String @unique
tenantId String
datasetId String
fileName String
mimeType String
fileSizeBytes Int
storagePath String // Path on disk / object storage key
// Override dataset defaults per file (optional)
chunkSize Int?
chunkOverlap Int?
parseType String?
parserEngine String?
// Ingestion status
status String @default("pending")
// 'pending' | 'parsing' | 'embedding' | 'indexed' | 'error' | 'disabled'
errorMessage String?
chunksCount Int? @default(0)
enabled Boolean @default(true)
// BullMQ job reference (for status polling)
bullJobId String?
source String @default("upload")
// 'upload' | 'website_crawl' | 'published_content' | 'competitor_research'
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
deletedAt DateTime?
createdBy String?
dataset RagDataset @relation(fields: [datasetId], references: [id])
@@index([datasetId])
@@index([tenantId])
@@map("rag_files")
}rag_crawl_jobs
model RagCrawlJob {
id String @id @default(cuid())
tenantId String
datasetId String
startUrl String
maxPages Int @default(200)
maxDepth Int @default(3)
urlPathFilter String? // e.g. '/blog' — only crawl this path prefix
status String @default("queued")
// 'queued' | 'running' | 'completed' | 'failed'
pagesCrawled Int @default(0)
pagesIndexed Int @default(0)
pagesFailed Int @default(0)
errorMessage String?
scheduledFor DateTime? // null = immediate
completedAt DateTime?
createdAt DateTime @default(now())
dataset RagDataset @relation(fields: [datasetId], references: [id])
@@index([datasetId])
@@map("rag_crawl_jobs")
}Qdrant Collection Design
Identical to RAG Manager. One Qdrant collection per dataset, named ds_{dataset.refId}.
Collection: ds_01HV8MZABC...
├── Vectors: float32[vectorSize], distance: Cosine, on_disk: true
│ HNSW index: m=16, ef_construct=100, ef=128
│ (m=16 balances recall vs. index size; ef_construct=100 gives good build quality;
│ ef=128 at query time achieves >95% recall@10 at <10ms p99)
├── Payload index: content (text, tokenizer: word) — for keyword search
├── Payload index: tenantId (keyword) — always filtered first (tenant isolation)
└── Payload index: enabled (bool) — for filtering disabled filesHNSW parameter guidance:
| Parameter | Value | When to increase |
|---|---|---|
m | 16 | Raise to 32 if recall < 90% at high collection sizes (> 500k vectors) |
ef_construct | 100 | Raise for better index quality (at cost of slower build time) |
ef (search) | 128 | Raise for higher recall; lower for faster queries with acceptable recall tradeoff |
Tenant isolation filter: tenantId payload filter is always applied before the vector search. Qdrant evaluates payload filters before scanning the HNSW graph, making cross-tenant leakage impossible at the query layer regardless of application-level bugs.
Point payload schema
interface QdrantPoint {
id: string; // UUID — unique per chunk
vector: number[];
payload: {
fileId: string; // rag_files.id
datasetId: string; // rag_datasets.id
tenantId: string; // defense-in-depth isolation filter
chunkIndex: number;
content: string; // Full chunk text (for keyword search + reranking)
fileName: string;
enabled: boolean; // Mirrors rag_files.enabled — always filter to true
source: string; // 'upload' | 'website_crawl' | 'published_content' | 'competitor_research'
metadata: {
pageNumber?: number;
heading?: string;
url?: string; // For crawled pages
publishedAt?: string; // For published_content
};
};
}Every Qdrant query in feature-search adds a mandatory filter:
{
"must": [
{ "key": "tenantId", "match": { "value": "<tenantId>" } },
{ "key": "enabled", "match": { "value": true } }
]
}This is never optional.
Ingestion Pipeline
BullMQ queue
New queue added to the existing BullMQ setup in apps/api:
// Queue name: 'rag:ingestion'
const ragIngestionQueue = new Queue('rag:ingestion', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: { count: 100 },
removeOnFail: { count: 200 },
timeout: 600_000, // 10 minutes max per file
},
});Worker concurrency: 5 (configurable via RAG_WORKER_CONCURRENCY env var).
Job types
type RagIngestionJobData =
| { type: 'file'; fileId: string; tenantId: string; datasetId: string }
| { type: 'crawl'; crawlJobId: string; tenantId: string; datasetId: string }
| { type: 'content'; content: string; fileId: string; tenantId: string; datasetId: string };
// ^ for published_content feedback loop (content already extracted, skip parse step)Processor flow
Worker picks up job
│
├─ type: 'file' ────────────────────────────────────────┐
│ Load file from storage │
│ Parse based on parserEngine: │
│ NODE_NATIVE: pdf-parse (PDF), fs.readFile (TXT/MD)│
│ DOCLING: POST to Docling sidecar HTTP API │
│ → raw text string │
│ │
├─ type: 'content' ─────────────────────────────────────┤
│ Skip parse step (text already provided) │
│ │
│ Chunk text based on parseType: │
│ NAIVE: fixed-size char splitter (chunkSize, overlap)
│ MARKDOWN: heading-aware splitter (H1/H2 boundaries)
│ MANUAL: split on '---' delimiter │
│ → string[] │
│ │
│ For each chunk: │
│ Embed using dataset.embeddingModel │
│ OpenAI: POST /embeddings │
│ Ollama: POST /api/embeddings │
│ Batch upsert to Qdrant (batch size: 32 points) │
│ │
│ Update rag_files: status='indexed', chunksCount=N │
│ Update rag_datasets: totalChunks += N │
└───────────────────────────────────────────────────────┘
├─ type: 'crawl' ───────────────────────────────────────┐
Load ragCrawlJob from DB │
For each page (breadth-first, maxDepth, maxPages): │
Launch Playwright browser │
Fetch page → extract body text (strip nav/footer) │
Extract links (same-domain only) │
Create rag_files record (source: 'website_crawl') │
Enqueue type: 'content' job for the page text │
Update rag_crawl_jobs: pagesCrawled++ │
Update rag_crawl_jobs: status='completed' │
────────────────────────────────────────────────────┘Status updates (SSE)
File ingestion status is streamed to the Dashboard UI via SSE. The worker emits events to Redis pub/sub:
Worker emits: PUBLISH rag:status:<tenantId> '{"fileId":"...","status":"embedding","progress":45}'
│
▼
SSE endpoint subscribes to Redis channel for the tenant
│
▼
Dashboard UI receives event → updates file row status barHybrid Search Implementation
Identical to RAG Manager’s design. Implemented in features/feature-search/src/search.service.ts.
Search request
interface RagSearchRequest {
tenantId: string;
datasetId?: string; // Single dataset — null means all tenant datasets
datasetIds?: string[]; // Explicit list of datasets to search
query: string;
topK?: number; // Default: 5, max: 20
vectorWeight?: number; // 0.0–1.0, default: 0.6. Keyword weight = 1 - vectorWeight
rerankerModel?: string; // Optional — enables cross-encoder reranking
agentRole?: string; // Used to filter datasets by allowedAgentRoles
}Search algorithm
1. Resolve target datasets
- If agentRole provided: filter datasets where allowedAgentRoles includes agentRole
- If datasetId provided: use that dataset only
- Otherwise: all active tenant datasets
2. For each dataset (in parallel — Promise.all):
a. Embed the query using dataset.embeddingModel
b. Vector search:
Qdrant query_points, using_vector: <embedded_query>, limit: topK * 2
with_payload: true, score_threshold: 0.1
filter: { tenantId: <id>, enabled: true }
c. Keyword search:
Qdrant query_points, using: sparse (text index on 'content' field)
query: { text: queryString }
limit: topK * 2
d. Normalize scores: score / max(scores) → [0, 1]
e. Apply RRF fusion: for rank r in result list, score = 1 / (60 + r)
f. Merge vector + keyword RRF scores with vectorWeight:
finalScore = (vectorWeight * vectorRRF) + ((1 - vectorWeight) * keywordRRF)
g. Sort by finalScore DESC → top topK * 2 candidates per dataset
3. Merge results across all datasets
Apply global RRF across all dataset result lists
4. If rerankerModel provided:
POST to reranker service with { query, candidates: top topK*2 chunks }
Reranker scores each chunk → re-sort → take top topK
5. Return top topK results with:
{ text, score, fileId, fileName, datasetId, datasetName, source, metadata }RRF implementation
// features/feature-search/src/rrf.ts
export function reciprocalRankFusion<T extends { id: string }>(
rankings: T[][],
k = 60,
): Array<T & { rrfScore: number }> {
const scores = new Map<string, number>();
const items = new Map<string, T>();
for (const ranking of rankings) {
ranking.forEach((item, rank) => {
scores.set(item.id, (scores.get(item.id) ?? 0) + 1 / (k + rank + 1));
items.set(item.id, item);
});
}
return [...scores.entries()]
.sort((a, b) => b[1] - a[1])
.map(([id, rrfScore]) => ({ ...items.get(id)!, rrfScore }));
}API Routes (Fastify)
New routes added to apps/api/src/routes/:
knowledge.routes.ts
GET /api/knowledge/datasets List all datasets for tenant
POST /api/knowledge/datasets Create dataset
GET /api/knowledge/datasets/:id Get dataset
PATCH /api/knowledge/datasets/:id Update dataset (name, description only)
DELETE /api/knowledge/datasets/:id Delete dataset + Qdrant collection
GET /api/knowledge/datasets/:id/files List files in dataset
POST /api/knowledge/datasets/:id/files Upload file(s) (multipart, max 32 files)
PATCH /api/knowledge/datasets/:id/files/:fileId Toggle enabled/disabled
DELETE /api/knowledge/datasets/:id/files/:fileId Delete file + Qdrant vectors
POST /api/knowledge/datasets/:id/crawl Trigger website crawl
GET /api/knowledge/datasets/:id/crawl/status Get active crawl job status
GET /api/knowledge/sse/status SSE stream — file ingestion status updatessearch.routes.ts
POST /api/search Search across datasets (org-wide or multi-dataset)
POST /api/knowledge/datasets/:id/search Search within a single datasetAll routes require tenantId from JWT session. The tenantId is injected by the auth middleware and is always applied to all DB + Qdrant queries.
rag_search Tool (Agent Integration)
The rag_search tool in features/feature-search/src/tool.ts calls the search service directly (in-process — not an HTTP hop):
// Registered in packages/integrations/src/tool-dispatcher.ts alongside other tools
export const ragSearchTool: ToolDefinition = {
name: 'rag_search',
description: `Search the client's knowledge base for specific information.
Returns the top matching text chunks. Use when you need specific facts from the client's
documents, website, published content, or competitor research that are not in your injected context.`,
inputSchema: {
type: 'object',
properties: {
query: { type: 'string', description: 'Natural language query.' },
dataset: { type: 'string', enum: ['client_docs', 'website_content', 'published_content', 'competitor_content'] },
topK: { type: 'number', default: 5, description: 'Chunks to return (max 20).' },
},
required: ['query', 'dataset'],
},
execute: async ({ query, dataset, topK = 5 }, context) => {
// context.tenantId, context.agentRole injected by tool-dispatcher
const datasetRecord = await db.query.ragDatasets.findFirst({
where: and(
eq(ragDatasets.tenantId, context.tenantId),
eq(ragDatasets.name, dataset),
isNull(ragDatasets.deletedOn),
),
});
if (!datasetRecord) return { error: `Dataset "${dataset}" not found for this tenant.` };
// Privacy guard: competitor_content must only be called from content_researcher
if (dataset === 'competitor_content' && context.agentRole !== 'content_researcher') {
return { error: 'competitor_content dataset is only accessible to the Content Researcher agent.' };
}
const results = await searchService.search({
tenantId: context.tenantId,
datasetId: datasetRecord.id,
query,
topK,
rerankerModel: datasetRecord.rerankerModel ?? undefined,
agentRole: context.agentRole,
});
return {
chunks: results.map(r => ({
text: r.text,
score: r.score,
source: r.fileName,
dataset: dataset,
})),
};
},
};Docker Compose Addition
Add to docker-compose.yml:
qdrant:
image: qdrant/qdrant:latest
container_name: leadmetrics-qdrant
restart: unless-stopped
ports:
- "6333:6333" # REST API
- "6334:6334" # gRPC
volumes:
- qdrant_data:/qdrant/storage
environment:
QDRANT__SERVICE__GRPC_PORT: "6334"
# Optional — high-accuracy document parsing
docling:
image: ghcr.io/ds4sd/docling-serve:latest
container_name: leadmetrics-docling
restart: unless-stopped
ports:
- "5001:5001"
profiles:
- docling # Only starts if --profile docling is passedAdd to volumes: section:
qdrant_data:New Environment Variables
# === RAG / Qdrant ===
QDRANT_URL=http://qdrant:6333
QDRANT_API_KEY= # empty for local Docker
# === RAG Worker ===
RAG_WORKER_CONCURRENCY=5
RAG_UPLOAD_DIR=./uploads/rag # relative to apps/api
# === Embedding models ===
RAG_DEFAULT_EMBEDDING_PROVIDER=openai
RAG_DEFAULT_EMBEDDING_MODEL=text-embedding-3-small
RAG_LOCAL_EMBEDDING_MODEL=nomic-embed-text # Ollama — for privacy-sensitive datasets
# === Reranking (optional) ===
RAG_RERANKER_MODEL=BAAI/bge-reranker-v2-m3
RAG_RERANKER_URL=http://ollama:11434 # Run reranker locally via Ollama
# === Docling (optional high-accuracy parser) ===
DOCLING_URL= # empty = disabled; set to http://docling:5001 to enableStandard Datasets Per Tenant
When a tenant is created (feature-tenants provisioning), four standard datasets are automatically created:
// Source of truth: packages/feature-knowledge/src/knowledge.types.ts
const STANDARD_DATASETS = [
{
name: 'client_docs',
allowedAgentRoles: [
// Content workers
'keyword-researcher', 'content-brief-writer', 'gbp-post-writer',
'social-post-writer', 'social-calendar-planner', 'email-writer',
'landing-page-writer', 'google-ads-writer', 'meta-ads-writer',
'report-writer', 'review-response-writer', 'backlink-outreach-writer',
// Dedicated workers
'blog-writer', 'strategy-writer',
// Legacy role names (backwards compat)
'activity_planner', 'seo_specialist', 'copywriter',
'social_media_manager', 'paid_ads_manager', 'strategy',
],
useLocalEmbedding: false,
},
{
name: 'website_content',
allowedAgentRoles: [
'keyword-researcher', 'content-brief-writer', 'landing-page-writer',
'topic-researcher', 'research-note-writer', 'site-auditor',
'backlink-researcher', 'blog-writer', 'strategy-writer',
// Legacy
'activity_planner', 'seo_specialist', 'copywriter', 'paid_ads_manager', 'strategy',
],
useLocalEmbedding: false,
},
{
name: 'published_content',
allowedAgentRoles: [
'social-post-writer', 'social-calendar-planner', 'blog-writer',
'report-writer', 'ads-analyst', 'anomaly-detector', 'strategy-writer',
// Legacy
'activity_planner', 'seo_specialist', 'copywriter',
'social_media_manager', 'data_analyst',
],
useLocalEmbedding: false,
},
{
name: 'competitor_content',
allowedAgentRoles: ['topic-researcher', 'research-note-writer', 'strategy-writer', 'content_researcher'],
useLocalEmbedding: true, // ALWAYS local — never cloud embedding
},
];File Type Support
Same as RAG Manager:
| Extension | MIME Type | NODE_NATIVE | DOCLING |
|---|---|---|---|
.pdf | application/pdf | pdf-parse | ✅ |
.txt | text/plain | fs.readFile | ✅ |
.md | text/markdown | fs.readFile | ✅ |
.docx | application/vnd.openxmlformats-officedocument… | mammoth | ✅ |
.csv | text/csv | csv-parse | ✅ |
Max file size: 10 MB. Max files per upload: 32.
UI Screens
Knowledge Base screens live in apps/dashboard under /settings/knowledge-base/. Full screen specifications are in screens-knowledge-base.md (KB1–KB6).