Global Search — Implementation
Overview
This document is the build guide. It lists every new file and package to create, shows the code patterns to follow, and identifies every existing file that needs a sync call added. For the collection schemas and API contract, see architecture.md. For the UI component, see ui.md.
Monorepo Additions
leadmetrics-v3/
├── packages/
│ └── provider-typesense/ NEW — Typesense client + collection bootstrap
│ ├── src/
│ │ ├── index.ts exports: typesenseClient, bootstrapCollections, SearchCollection
│ │ ├── client.ts singleton TypesenseClient from env vars
│ │ ├── collections.ts createCollection(), collectionExists(), COLLECTION_SCHEMAS
│ │ └── types.ts SearchDocument, SearchCollection, SearchResult
│ └── package.json
│
├── packages/queue/src/
│ ├── types.ts ADD: SearchSyncJob, SearchCollection types
│ └── index.ts ADD: enqueueSearchSync()
│
├── apps/servers/
│ └── search-indexer/ NEW — BullMQ worker server
│ ├── src/
│ │ ├── index.ts starts worker, graceful shutdown
│ │ ├── config.ts Zod config (TYPESENSE_URL, TYPESENSE_ADMIN_API_KEY, REDIS_URL, DATABASE_URL)
│ │ └── workers/
│ │ └── sync.worker.ts reads Postgres, upserts/deletes in Typesense
│ ├── package.json
│ └── tsconfig.json
│
├── apps/api/src/routers/
│ ├── search.ts NEW — POST /v1/search (tenant-scoped)
│ └── admin/search.ts NEW — POST /v1/admin/search (cross-tenant)
│
└── packages/ui/src/components/
├── GlobalSearch.tsx NEW — Ctrl+K modal (see ui.md)
└── GlobalSearchResult.tsx NEW — individual result rowStep 1 — packages/provider-typesense
package.json
{
"name": "@leadmetrics/provider-typesense",
"version": "0.0.1",
"main": "./src/index.ts",
"dependencies": {
"typesense": "^1.8.2"
}
}src/client.ts
import Typesense from "typesense"
let _client: Typesense.Client | null = null
export function getTypesenseClient(): Typesense.Client {
if (_client) return _client
_client = new Typesense.Client({
nodes: [{ url: process.env.TYPESENSE_URL! }],
apiKey: process.env.TYPESENSE_ADMIN_API_KEY!,
connectionTimeoutSeconds: 5,
})
return _client
}src/collections.ts
import { getTypesenseClient } from "./client"
import { COLLECTION_SCHEMAS } from "./schemas" // the JSON schemas from architecture.md
export async function collectionExists(name: string): Promise<boolean> {
try {
await getTypesenseClient().collections(name).retrieve()
return true
} catch {
return false
}
}
export async function bootstrapCollections(): Promise<void> {
const client = getTypesenseClient()
for (const schema of COLLECTION_SCHEMAS) {
if (!(await collectionExists(schema.name))) {
await client.collections().create(schema)
}
}
}src/index.ts
export { getTypesenseClient } from "./client"
export { bootstrapCollections, collectionExists } from "./collections"
export { COLLECTION_SCHEMAS } from "./schemas"
export type { SearchDocument, SearchCollection, SearchResult } from "./types"src/types.ts
export type SearchCollection =
| "blogs" | "social_posts" | "landing_pages" | "newsletters"
| "activities" | "campaigns" | "content_briefs" | "contacts"
| "leads" | "keywords" | "reports" | "backlinks" | "tenants"
export interface SearchDocument {
id: string
tenantId: string
updatedAt: number // Unix timestamp (ms) — Typesense requires int64
[key: string]: unknown
}
export interface SearchResult {
type: SearchCollection
id: string
title: string
subtitle?: string
href: string
status?: string
tenantId?: string
tenantName?: string
updatedAt: string
}Step 2 — Queue additions in packages/queue
Add to src/types.ts (or wherever job types live)
export type SearchCollection =
| "blogs" | "social_posts" | "landing_pages" | "newsletters"
| "activities" | "campaigns" | "content_briefs" | "contacts"
| "leads" | "keywords" | "reports" | "backlinks" | "tenants"
export type SearchSyncJobData = {
collection: SearchCollection
operation: "upsert" | "delete"
recordId: string
tenantId: string
}Add to src/index.ts
const SEARCH_SYNC_QUEUE = "search__sync"
export async function enqueueSearchSync(data: SearchSyncJobData): Promise<void> {
const queue = getQueue<SearchSyncJobData>(SEARCH_SYNC_QUEUE)
await queue.add("sync", data, {
attempts: 5,
backoff: { type: "exponential", delay: 2000 },
removeOnComplete: { count: 100 },
removeOnFail: { count: 50 },
})
}No deduplication key — every write must produce a sync job, even rapid successive updates.
Step 3 — apps/servers/search-indexer
src/config.ts
import { z } from "zod"
const schema = z.object({
TYPESENSE_URL: z.string().url(),
TYPESENSE_ADMIN_API_KEY: z.string().min(1),
REDIS_URL: z.string().url(),
DATABASE_URL: z.string().url(),
NODE_ENV: z.enum(["development", "production", "test"]).default("development"),
})
export const config = schema.parse(process.env)src/workers/sync.worker.ts
The worker maps each collection to a fetch function and a document shaper:
import { Worker } from "bullmq"
import type IORedis from "ioredis"
import { db } from "@leadmetrics/db"
import { getTypesenseClient } from "@leadmetrics/provider-typesense"
import type { SearchSyncJobData } from "@leadmetrics/queue"
const FETCHERS: Record<string, (id: string) => Promise<unknown>> = {
blogs: (id) => db.blogPost.findUnique({ where: { id } }),
social_posts: (id) => db.socialPost.findUnique({ where: { id } }),
landing_pages: (id) => db.landingPage.findUnique({ where: { id } }),
newsletters: (id) => db.emailNewsletter.findUnique({ where: { id } }),
activities: (id) => db.activity.findUnique({ where: { id } }),
campaigns: (id) => db.campaign.findUnique({ where: { id } }),
content_briefs: (id) => db.contentBrief.findUnique({ where: { id } }),
contacts: (id) => db.contact.findUnique({ where: { id } }),
leads: (id) => db.lead.findUnique({ where: { id } }),
keywords: (id) => db.keyword.findUnique({ where: { id } }),
reports: (id) => db.report.findUnique({ where: { id } }),
backlinks: (id) => db.backlink.findUnique({ where: { id } }),
tenants: (id) => db.tenant.findUnique({ where: { id } }),
}
// Maps a Postgres record to a flat Typesense document.
// Only include fields declared in the collection schema.
function toDocument(collection: string, record: Record<string, unknown>) {
const base = {
id: record.id as string,
tenantId: (record.tenantId ?? "") as string,
updatedAt: new Date(record.updatedAt as string).getTime(),
}
const FIELD_MAP: Record<string, string[]> = {
blogs: ["title", "metaDescription", "status"],
social_posts: ["bodyText", "engagementHook", "platform", "status"],
landing_pages: ["title", "metaDescription", "status"],
newsletters: ["subject", "previewText", "status"],
activities: ["label", "notes", "type", "status"],
campaigns: ["name", "status"],
content_briefs: ["title", "topic", "angle", "status"],
contacts: ["name", "email", "company", "stage"],
leads: ["name", "company", "jobTitle", "status"],
keywords: ["keyword", "source"],
reports: ["label"],
backlinks: ["sourceDomain", "anchorText", "status"],
tenants: ["name", "website", "pocName", "industry", "status"],
}
const doc: Record<string, unknown> = { ...base }
for (const field of FIELD_MAP[collection] ?? []) {
if (record[field] !== undefined && record[field] !== null) {
doc[field] = record[field]
}
}
return doc
}
export function startSyncWorker(redis: IORedis): Worker<SearchSyncJobData> {
const client = getTypesenseClient()
return new Worker<SearchSyncJobData>(
"search__sync",
async (job) => {
const { collection, operation, recordId } = job.data
if (operation === "delete") {
try {
await client.collections(collection).documents(recordId).delete()
} catch (err: unknown) {
// 404 means already deleted — not an error
if ((err as { httpStatus?: number }).httpStatus !== 404) throw err
}
return
}
const fetcher = FETCHERS[collection]
if (!fetcher) throw new Error(`Unknown collection: ${collection}`)
const record = await fetcher(recordId)
if (!record) return // deleted before job ran
const document = toDocument(collection, record as Record<string, unknown>)
await client.collections(collection).documents().upsert(document)
},
{ connection: redis, concurrency: 10 }
)
}src/index.ts
import IORedis from "ioredis"
import { bootstrapCollections } from "@leadmetrics/provider-typesense"
import { startSyncWorker } from "./workers/sync.worker"
import { config } from "./config"
async function main() {
await bootstrapCollections()
const redis = new IORedis(config.REDIS_URL, { maxRetriesPerRequest: null })
const worker = startSyncWorker(redis)
const shutdown = async () => {
await worker.close()
await redis.quit()
process.exit(0)
}
process.on("SIGTERM", shutdown)
process.on("SIGINT", shutdown)
}
main().catch((err) => {
console.error(err)
process.exit(1)
})Step 4 — Fastify Search Router
apps/api/src/routers/search.ts
import type { FastifyInstance } from "fastify"
import { requireAuth } from "../lib/auth"
import { getTypesenseClient, type SearchCollection } from "@leadmetrics/provider-typesense"
import type { SearchResult } from "@leadmetrics/provider-typesense"
const QUERY_FIELDS: Record<SearchCollection, string> = {
blogs: "title,metaDescription",
social_posts: "bodyText,engagementHook",
landing_pages: "title,metaDescription",
newsletters: "subject,previewText",
activities: "label,notes",
campaigns: "name",
content_briefs: "title,topic,angle",
contacts: "name,email,company",
leads: "name,company,jobTitle",
keywords: "keyword",
reports: "label",
backlinks: "sourceDomain,anchorText",
tenants: "name,website,pocName", // included for admin route reuse
}
const TENANT_COLLECTIONS: SearchCollection[] = [
"blogs", "social_posts", "landing_pages", "newsletters",
"activities", "campaigns", "content_briefs", "contacts",
"leads", "keywords", "reports", "backlinks",
]
function toHref(type: SearchCollection, id: string): string {
const MAP: Record<SearchCollection, string> = {
blogs: `/blog/${id}`,
social_posts: `/social/${id}`,
landing_pages: `/landing-pages/${id}`,
newsletters: `/newsletters/${id}`,
activities: `/activities/${id}`,
campaigns: `/campaigns/${id}`,
content_briefs: `/content-briefs/${id}`,
contacts: `/contacts/${id}`,
leads: `/leads/${id}`,
keywords: `/seo/keywords`,
reports: `/reports/${id}`,
backlinks: `/seo/backlinks/${id}`,
tenants: `/tenants/${id}`,
}
return MAP[type]
}
function toTitle(type: SearchCollection, doc: Record<string, unknown>): string {
const titleField: Record<SearchCollection, string> = {
blogs: "title", social_posts: "bodyText", landing_pages: "title",
newsletters: "subject", activities: "label", campaigns: "name",
content_briefs: "title", contacts: "name", leads: "name",
keywords: "keyword", reports: "label", backlinks: "sourceDomain",
tenants: "name",
}
const val = doc[titleField[type]]
return typeof val === "string" ? val.slice(0, 120) : "Untitled"
}
export async function searchRouter(fastify: FastifyInstance) {
fastify.post<{ Body: { query: string; entityTypes?: string[]; limit?: number; offset?: number } }>(
"/",
{
schema: {
body: {
type: "object",
required: ["query"],
properties: {
query: { type: "string", minLength: 2 },
entityTypes: { type: "array", items: { type: "string" } },
limit: { type: "number", minimum: 1, maximum: 50, default: 20 },
offset: { type: "number", minimum: 0, default: 0 },
},
},
response: { 200: { type: "object", additionalProperties: true } },
},
},
async (request, reply) => {
const { tenantId } = await requireAuth(request, reply)
const { query, entityTypes, limit = 20, offset = 0 } = request.body
const collections = entityTypes
? TENANT_COLLECTIONS.filter((c) => entityTypes.includes(c))
: TENANT_COLLECTIONS
const client = getTypesenseClient()
const perCollection = Math.max(5, Math.ceil(limit / collections.length))
const searches = collections.map((collection) => ({
collection,
q: query,
query_by: QUERY_FIELDS[collection],
filter_by: `tenantId:=${tenantId}`,
per_page: perCollection,
offset,
sort_by: "_text_match:desc,updatedAt:desc",
}))
let multiSearchResults
try {
multiSearchResults = await client.multiSearch.perform({ searches })
} catch {
return reply.status(503).send({ error: "search_unavailable" })
}
const results: SearchResult[] = []
for (let i = 0; i < collections.length; i++) {
const collection = collections[i]
const hits = multiSearchResults.results[i]?.hits ?? []
for (const hit of hits) {
const doc = hit.document as Record<string, unknown>
results.push({
type: collection,
id: doc.id as string,
title: toTitle(collection, doc),
subtitle: typeof doc.status === "string" ? doc.status : undefined,
href: toHref(collection, doc.id as string),
status: doc.status as string | undefined,
updatedAt: new Date(doc.updatedAt as number).toISOString(),
})
}
}
results.sort((a, b) => new Date(b.updatedAt).getTime() - new Date(a.updatedAt).getTime())
return reply.send({ results: results.slice(0, limit), total: results.length, hasMore: results.length > limit })
}
)
}Register in both apps/api/src/app.ts and apps/api/src/index.ts:
import { searchRouter } from "./routers/search"
await fastify.register(searchRouter, { prefix: "/v1/search" })apps/api/src/routers/admin/search.ts
Same structure as above, but:
- Requires
requireSuperAdmin()instead ofrequireAuth() - Includes
tenantsin the collections list - Accepts optional
tenantIdbody param to narrow scope - Each result in the response includes
tenantIdandtenantName - No
filter_bywhen notenantIdis provided (cross-tenant)
Register in both entry files:
import { adminSearchRouter } from "./routers/admin/search"
await fastify.register(adminSearchRouter, { prefix: "/v1/admin/search" })Step 5 — Adding Sync Calls to Existing Code
Pattern (API router example)
import { enqueueSearchSync } from "@leadmetrics/queue"
// Inside a PUT /blog/:id handler, after the Prisma write:
const updated = await db.blogPost.update({ where: { id }, data })
await enqueueSearchSync({
collection: "blogs",
operation: "upsert",
recordId: updated.id,
tenantId: updated.tenantId,
})Pattern (delete handler)
await db.blogPost.delete({ where: { id } })
await enqueueSearchSync({
collection: "blogs",
operation: "delete",
recordId: id,
tenantId, // from JWT — record is already gone from DB
})Pattern (agent worker)
// After the agent writes output back to Postgres:
const post = await db.blogPost.update({ where: { id: jobId }, data: { content, status: "dm_review" } })
await enqueueSearchSync({
collection: "blogs",
operation: "upsert",
recordId: post.id,
tenantId: post.tenantId,
})Step 6 — Environment Variables
Add to every service that calls enqueueSearchSync (the Fastify API and all agent workers)
and to the search-indexer server:
TYPESENSE_URL=http://localhost:8108
TYPESENSE_ADMIN_API_KEY=your-admin-key-hereThe search-indexer also needs REDIS_URL and DATABASE_URL, which it already has by
convention with other server apps.
See docs/env-vars.md for the full per-service list.
Step 7 — Docker Compose
Add to docker-compose.yml:
services:
typesense:
image: typesense/typesense:27.1
restart: unless-stopped
ports:
- "8108:8108"
volumes:
- typesense-data:/data
command: >
--data-dir /data
--api-key=${TYPESENSE_ADMIN_API_KEY}
--enable-cors
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8108/health"]
interval: 10s
timeout: 5s
retries: 5
volumes:
typesense-data:Testing
Unit tests — sync worker (packages/queue + apps/servers/search-indexer)
- Mock
db.*findUnique calls - Mock
getTypesenseClient()→ return object withcollections().documents().upsert()spy - Test: upsert path calls
upsertwith correctly shaped document - Test: delete path calls
deletewith correct id - Test: delete with 404 response does not throw
- Test: missing record (null from Postgres) is silently skipped
Unit tests — search router
- Mock
getTypesenseClient().multiSearch.perform() - Test:
tenantIdfrom JWT is injected intofilter_by(not taken from body) - Test:
entityTypesfilter reduces the collections searched - Test: Typesense error returns 503 with
{ error: "search_unavailable" }
E2E — GlobalSearch
- Press Ctrl+K → modal opens
- Type at least 2 characters → results appear within 1s
- Click a result → navigates to the correct deep-link URL
- Press Escape → modal closes
- Non-matching query → “No results” message shown (not an error state)