Skip to Content
Global SearchGlobal Search — Implementation

Global Search — Implementation

Overview

This document is the build guide. It lists every new file and package to create, shows the code patterns to follow, and identifies every existing file that needs a sync call added. For the collection schemas and API contract, see architecture.md. For the UI component, see ui.md.


Monorepo Additions

leadmetrics-v3/ ├── packages/ │ └── provider-typesense/ NEW — Typesense client + collection bootstrap │ ├── src/ │ │ ├── index.ts exports: typesenseClient, bootstrapCollections, SearchCollection │ │ ├── client.ts singleton TypesenseClient from env vars │ │ ├── collections.ts createCollection(), collectionExists(), COLLECTION_SCHEMAS │ │ └── types.ts SearchDocument, SearchCollection, SearchResult │ └── package.json ├── packages/queue/src/ │ ├── types.ts ADD: SearchSyncJob, SearchCollection types │ └── index.ts ADD: enqueueSearchSync() ├── apps/servers/ │ └── search-indexer/ NEW — BullMQ worker server │ ├── src/ │ │ ├── index.ts starts worker, graceful shutdown │ │ ├── config.ts Zod config (TYPESENSE_URL, TYPESENSE_ADMIN_API_KEY, REDIS_URL, DATABASE_URL) │ │ └── workers/ │ │ └── sync.worker.ts reads Postgres, upserts/deletes in Typesense │ ├── package.json │ └── tsconfig.json ├── apps/api/src/routers/ │ ├── search.ts NEW — POST /v1/search (tenant-scoped) │ └── admin/search.ts NEW — POST /v1/admin/search (cross-tenant) └── packages/ui/src/components/ ├── GlobalSearch.tsx NEW — Ctrl+K modal (see ui.md) └── GlobalSearchResult.tsx NEW — individual result row

Step 1 — packages/provider-typesense

package.json

{ "name": "@leadmetrics/provider-typesense", "version": "0.0.1", "main": "./src/index.ts", "dependencies": { "typesense": "^1.8.2" } }

src/client.ts

import Typesense from "typesense" let _client: Typesense.Client | null = null export function getTypesenseClient(): Typesense.Client { if (_client) return _client _client = new Typesense.Client({ nodes: [{ url: process.env.TYPESENSE_URL! }], apiKey: process.env.TYPESENSE_ADMIN_API_KEY!, connectionTimeoutSeconds: 5, }) return _client }

src/collections.ts

import { getTypesenseClient } from "./client" import { COLLECTION_SCHEMAS } from "./schemas" // the JSON schemas from architecture.md export async function collectionExists(name: string): Promise<boolean> { try { await getTypesenseClient().collections(name).retrieve() return true } catch { return false } } export async function bootstrapCollections(): Promise<void> { const client = getTypesenseClient() for (const schema of COLLECTION_SCHEMAS) { if (!(await collectionExists(schema.name))) { await client.collections().create(schema) } } }

src/index.ts

export { getTypesenseClient } from "./client" export { bootstrapCollections, collectionExists } from "./collections" export { COLLECTION_SCHEMAS } from "./schemas" export type { SearchDocument, SearchCollection, SearchResult } from "./types"

src/types.ts

export type SearchCollection = | "blogs" | "social_posts" | "landing_pages" | "newsletters" | "activities" | "campaigns" | "content_briefs" | "contacts" | "leads" | "keywords" | "reports" | "backlinks" | "tenants" export interface SearchDocument { id: string tenantId: string updatedAt: number // Unix timestamp (ms) — Typesense requires int64 [key: string]: unknown } export interface SearchResult { type: SearchCollection id: string title: string subtitle?: string href: string status?: string tenantId?: string tenantName?: string updatedAt: string }

Step 2 — Queue additions in packages/queue

Add to src/types.ts (or wherever job types live)

export type SearchCollection = | "blogs" | "social_posts" | "landing_pages" | "newsletters" | "activities" | "campaigns" | "content_briefs" | "contacts" | "leads" | "keywords" | "reports" | "backlinks" | "tenants" export type SearchSyncJobData = { collection: SearchCollection operation: "upsert" | "delete" recordId: string tenantId: string }

Add to src/index.ts

const SEARCH_SYNC_QUEUE = "search__sync" export async function enqueueSearchSync(data: SearchSyncJobData): Promise<void> { const queue = getQueue<SearchSyncJobData>(SEARCH_SYNC_QUEUE) await queue.add("sync", data, { attempts: 5, backoff: { type: "exponential", delay: 2000 }, removeOnComplete: { count: 100 }, removeOnFail: { count: 50 }, }) }

No deduplication key — every write must produce a sync job, even rapid successive updates.


Step 3 — apps/servers/search-indexer

src/config.ts

import { z } from "zod" const schema = z.object({ TYPESENSE_URL: z.string().url(), TYPESENSE_ADMIN_API_KEY: z.string().min(1), REDIS_URL: z.string().url(), DATABASE_URL: z.string().url(), NODE_ENV: z.enum(["development", "production", "test"]).default("development"), }) export const config = schema.parse(process.env)

src/workers/sync.worker.ts

The worker maps each collection to a fetch function and a document shaper:

import { Worker } from "bullmq" import type IORedis from "ioredis" import { db } from "@leadmetrics/db" import { getTypesenseClient } from "@leadmetrics/provider-typesense" import type { SearchSyncJobData } from "@leadmetrics/queue" const FETCHERS: Record<string, (id: string) => Promise<unknown>> = { blogs: (id) => db.blogPost.findUnique({ where: { id } }), social_posts: (id) => db.socialPost.findUnique({ where: { id } }), landing_pages: (id) => db.landingPage.findUnique({ where: { id } }), newsletters: (id) => db.emailNewsletter.findUnique({ where: { id } }), activities: (id) => db.activity.findUnique({ where: { id } }), campaigns: (id) => db.campaign.findUnique({ where: { id } }), content_briefs: (id) => db.contentBrief.findUnique({ where: { id } }), contacts: (id) => db.contact.findUnique({ where: { id } }), leads: (id) => db.lead.findUnique({ where: { id } }), keywords: (id) => db.keyword.findUnique({ where: { id } }), reports: (id) => db.report.findUnique({ where: { id } }), backlinks: (id) => db.backlink.findUnique({ where: { id } }), tenants: (id) => db.tenant.findUnique({ where: { id } }), } // Maps a Postgres record to a flat Typesense document. // Only include fields declared in the collection schema. function toDocument(collection: string, record: Record<string, unknown>) { const base = { id: record.id as string, tenantId: (record.tenantId ?? "") as string, updatedAt: new Date(record.updatedAt as string).getTime(), } const FIELD_MAP: Record<string, string[]> = { blogs: ["title", "metaDescription", "status"], social_posts: ["bodyText", "engagementHook", "platform", "status"], landing_pages: ["title", "metaDescription", "status"], newsletters: ["subject", "previewText", "status"], activities: ["label", "notes", "type", "status"], campaigns: ["name", "status"], content_briefs: ["title", "topic", "angle", "status"], contacts: ["name", "email", "company", "stage"], leads: ["name", "company", "jobTitle", "status"], keywords: ["keyword", "source"], reports: ["label"], backlinks: ["sourceDomain", "anchorText", "status"], tenants: ["name", "website", "pocName", "industry", "status"], } const doc: Record<string, unknown> = { ...base } for (const field of FIELD_MAP[collection] ?? []) { if (record[field] !== undefined && record[field] !== null) { doc[field] = record[field] } } return doc } export function startSyncWorker(redis: IORedis): Worker<SearchSyncJobData> { const client = getTypesenseClient() return new Worker<SearchSyncJobData>( "search__sync", async (job) => { const { collection, operation, recordId } = job.data if (operation === "delete") { try { await client.collections(collection).documents(recordId).delete() } catch (err: unknown) { // 404 means already deleted — not an error if ((err as { httpStatus?: number }).httpStatus !== 404) throw err } return } const fetcher = FETCHERS[collection] if (!fetcher) throw new Error(`Unknown collection: ${collection}`) const record = await fetcher(recordId) if (!record) return // deleted before job ran const document = toDocument(collection, record as Record<string, unknown>) await client.collections(collection).documents().upsert(document) }, { connection: redis, concurrency: 10 } ) }

src/index.ts

import IORedis from "ioredis" import { bootstrapCollections } from "@leadmetrics/provider-typesense" import { startSyncWorker } from "./workers/sync.worker" import { config } from "./config" async function main() { await bootstrapCollections() const redis = new IORedis(config.REDIS_URL, { maxRetriesPerRequest: null }) const worker = startSyncWorker(redis) const shutdown = async () => { await worker.close() await redis.quit() process.exit(0) } process.on("SIGTERM", shutdown) process.on("SIGINT", shutdown) } main().catch((err) => { console.error(err) process.exit(1) })

Step 4 — Fastify Search Router

apps/api/src/routers/search.ts

import type { FastifyInstance } from "fastify" import { requireAuth } from "../lib/auth" import { getTypesenseClient, type SearchCollection } from "@leadmetrics/provider-typesense" import type { SearchResult } from "@leadmetrics/provider-typesense" const QUERY_FIELDS: Record<SearchCollection, string> = { blogs: "title,metaDescription", social_posts: "bodyText,engagementHook", landing_pages: "title,metaDescription", newsletters: "subject,previewText", activities: "label,notes", campaigns: "name", content_briefs: "title,topic,angle", contacts: "name,email,company", leads: "name,company,jobTitle", keywords: "keyword", reports: "label", backlinks: "sourceDomain,anchorText", tenants: "name,website,pocName", // included for admin route reuse } const TENANT_COLLECTIONS: SearchCollection[] = [ "blogs", "social_posts", "landing_pages", "newsletters", "activities", "campaigns", "content_briefs", "contacts", "leads", "keywords", "reports", "backlinks", ] function toHref(type: SearchCollection, id: string): string { const MAP: Record<SearchCollection, string> = { blogs: `/blog/${id}`, social_posts: `/social/${id}`, landing_pages: `/landing-pages/${id}`, newsletters: `/newsletters/${id}`, activities: `/activities/${id}`, campaigns: `/campaigns/${id}`, content_briefs: `/content-briefs/${id}`, contacts: `/contacts/${id}`, leads: `/leads/${id}`, keywords: `/seo/keywords`, reports: `/reports/${id}`, backlinks: `/seo/backlinks/${id}`, tenants: `/tenants/${id}`, } return MAP[type] } function toTitle(type: SearchCollection, doc: Record<string, unknown>): string { const titleField: Record<SearchCollection, string> = { blogs: "title", social_posts: "bodyText", landing_pages: "title", newsletters: "subject", activities: "label", campaigns: "name", content_briefs: "title", contacts: "name", leads: "name", keywords: "keyword", reports: "label", backlinks: "sourceDomain", tenants: "name", } const val = doc[titleField[type]] return typeof val === "string" ? val.slice(0, 120) : "Untitled" } export async function searchRouter(fastify: FastifyInstance) { fastify.post<{ Body: { query: string; entityTypes?: string[]; limit?: number; offset?: number } }>( "/", { schema: { body: { type: "object", required: ["query"], properties: { query: { type: "string", minLength: 2 }, entityTypes: { type: "array", items: { type: "string" } }, limit: { type: "number", minimum: 1, maximum: 50, default: 20 }, offset: { type: "number", minimum: 0, default: 0 }, }, }, response: { 200: { type: "object", additionalProperties: true } }, }, }, async (request, reply) => { const { tenantId } = await requireAuth(request, reply) const { query, entityTypes, limit = 20, offset = 0 } = request.body const collections = entityTypes ? TENANT_COLLECTIONS.filter((c) => entityTypes.includes(c)) : TENANT_COLLECTIONS const client = getTypesenseClient() const perCollection = Math.max(5, Math.ceil(limit / collections.length)) const searches = collections.map((collection) => ({ collection, q: query, query_by: QUERY_FIELDS[collection], filter_by: `tenantId:=${tenantId}`, per_page: perCollection, offset, sort_by: "_text_match:desc,updatedAt:desc", })) let multiSearchResults try { multiSearchResults = await client.multiSearch.perform({ searches }) } catch { return reply.status(503).send({ error: "search_unavailable" }) } const results: SearchResult[] = [] for (let i = 0; i < collections.length; i++) { const collection = collections[i] const hits = multiSearchResults.results[i]?.hits ?? [] for (const hit of hits) { const doc = hit.document as Record<string, unknown> results.push({ type: collection, id: doc.id as string, title: toTitle(collection, doc), subtitle: typeof doc.status === "string" ? doc.status : undefined, href: toHref(collection, doc.id as string), status: doc.status as string | undefined, updatedAt: new Date(doc.updatedAt as number).toISOString(), }) } } results.sort((a, b) => new Date(b.updatedAt).getTime() - new Date(a.updatedAt).getTime()) return reply.send({ results: results.slice(0, limit), total: results.length, hasMore: results.length > limit }) } ) }

Register in both apps/api/src/app.ts and apps/api/src/index.ts:

import { searchRouter } from "./routers/search" await fastify.register(searchRouter, { prefix: "/v1/search" })

apps/api/src/routers/admin/search.ts

Same structure as above, but:

  • Requires requireSuperAdmin() instead of requireAuth()
  • Includes tenants in the collections list
  • Accepts optional tenantId body param to narrow scope
  • Each result in the response includes tenantId and tenantName
  • No filter_by when no tenantId is provided (cross-tenant)

Register in both entry files:

import { adminSearchRouter } from "./routers/admin/search" await fastify.register(adminSearchRouter, { prefix: "/v1/admin/search" })

Step 5 — Adding Sync Calls to Existing Code

Pattern (API router example)

import { enqueueSearchSync } from "@leadmetrics/queue" // Inside a PUT /blog/:id handler, after the Prisma write: const updated = await db.blogPost.update({ where: { id }, data }) await enqueueSearchSync({ collection: "blogs", operation: "upsert", recordId: updated.id, tenantId: updated.tenantId, })

Pattern (delete handler)

await db.blogPost.delete({ where: { id } }) await enqueueSearchSync({ collection: "blogs", operation: "delete", recordId: id, tenantId, // from JWT — record is already gone from DB })

Pattern (agent worker)

// After the agent writes output back to Postgres: const post = await db.blogPost.update({ where: { id: jobId }, data: { content, status: "dm_review" } }) await enqueueSearchSync({ collection: "blogs", operation: "upsert", recordId: post.id, tenantId: post.tenantId, })

Step 6 — Environment Variables

Add to every service that calls enqueueSearchSync (the Fastify API and all agent workers) and to the search-indexer server:

TYPESENSE_URL=http://localhost:8108 TYPESENSE_ADMIN_API_KEY=your-admin-key-here

The search-indexer also needs REDIS_URL and DATABASE_URL, which it already has by convention with other server apps.

See docs/env-vars.md for the full per-service list.


Step 7 — Docker Compose

Add to docker-compose.yml:

services: typesense: image: typesense/typesense:27.1 restart: unless-stopped ports: - "8108:8108" volumes: - typesense-data:/data command: > --data-dir /data --api-key=${TYPESENSE_ADMIN_API_KEY} --enable-cors healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8108/health"] interval: 10s timeout: 5s retries: 5 volumes: typesense-data:

Testing

Unit tests — sync worker (packages/queue + apps/servers/search-indexer)

  • Mock db.* findUnique calls
  • Mock getTypesenseClient() → return object with collections().documents().upsert() spy
  • Test: upsert path calls upsert with correctly shaped document
  • Test: delete path calls delete with correct id
  • Test: delete with 404 response does not throw
  • Test: missing record (null from Postgres) is silently skipped

Unit tests — search router

  • Mock getTypesenseClient().multiSearch.perform()
  • Test: tenantId from JWT is injected into filter_by (not taken from body)
  • Test: entityTypes filter reduces the collections searched
  • Test: Typesense error returns 503 with { error: "search_unavailable" }

E2E — GlobalSearch

  • Press Ctrl+K → modal opens
  • Type at least 2 characters → results appear within 1s
  • Click a result → navigates to the correct deep-link URL
  • Press Escape → modal closes
  • Non-matching query → “No results” message shown (not an error state)

© 2026 Leadmetrics — Internal use only