Skip to Content
ServersNotificationsNotification Server � Design & Implementation

Notification Server � Design & Implementation

Detailed walkthrough of apps/servers/notifications/ � file structure, startup flow, per-handler dispatch logic, template system, dev-mode filtering, and configuration.


File Structure

apps/servers/notifications/ |-- src/ | |-- index.ts Entry point � wires Redis, starts all 5 workers, handles shutdown | |-- config.ts Zod env validation � fails fast on missing/bad config | |-- types.ts NotificationJob, Recipient, DispatchResult, NotificationType | |-- resolver.ts Per-tenant provider resolution (email / whatsapp / telegram) | |-- dev-filter.ts Outbound allow-list enforcement in dev/test environments | | | |-- handlers/ | | |-- email.handler.ts Queue: notifications__email Concurrency: 10 | | |-- whatsapp.handler.ts Queue: notifications__whatsapp Concurrency: 3 | | |-- telegram.handler.ts Queue: notifications__telegram Concurrency: 5 | | |-- sms.handler.ts Queue: notifications__sms Concurrency: 2 (stub) | | +-- web.handler.ts Queue: notifications__web Concurrency: 20 | | | +-- templates/ | |-- renderer.ts Handlebars compile + render | |-- email-loader.ts DB template loader with 5-min in-process TTL cache | |-- sms-templates.ts Plain-text SMS strings keyed by NotificationType | +-- whatsapp-templates.ts Meta pre-approved template name registry | |-- .env |-- .env.example |-- package.json +-- tsconfig.json

Startup Flow (src/index.ts)

  1. Config validationloadConfig() runs Zod parse of process.env; throws on first invalid/missing required variable so the process never starts in a broken state.

  2. Redis connection � single IORedis instance shared across all 5 workers. maxRetriesPerRequest: null is required by BullMQ v5.

  3. Worker registration � each handler factory receives the shared Redis instance:

    const workers = [ startEmailWorker(redis), startWhatsAppWorker(redis), startTelegramWorker(redis), startSmsWorker(redis), startWebWorker(redis), ];
  4. Graceful shutdownSIGTERM / SIGINT call worker.close() on every worker then redis.quit(). Workers drain in-flight jobs before closing. There is no hard timeout by default � the OS process manager (Docker / systemd) enforces the deadline.


Email Handler (src/handlers/email.handler.ts)

Queue: notifications__email | Concurrency: 10 | Attempts: 3 | Backoff: exponential 5 s

Dispatch flow:

Job arrives 1. filterRecipientsForDev(recipients, "email") -> in dev: keep only addresses whose domain is in DEV_ALLOWED_EMAIL_DOMAINS -> in production/staging: pass all through -> if 0 recipients after filter: return { status: "skipped_dev_filter" } 2. resolveEmailProvider(tenantId) -> query notification_provider WHERE tenantId = X AND channel = "email" AND verifiedAt != null -> found: new SmtpProvider(cfg) | SendGridProvider(cfg) | SesProvider(cfg) -> not found: platform SendGrid default (SENDGRID_API_KEY + EMAIL_FROM_ADDRESS env) 3. loadEmailTemplate(templateSlug ?? type, tenantId) -> check in-process cache (5-min TTL, key = "<tenantId>:<slug>") -> DB: SELECT WHERE slug = ? AND (tenantId = ? OR tenantId IS NULL) ORDER BY tenantId DESC -- tenant row sorts first -> returns { subject, html, text } -> miss: returns { subject: slug, html: "", text: "" } (soft fallback) 4. renderTemplate(tpl.subject | html | text, variables) -> Handlebars.compile(template)({ ...variables }) 5. provider.send({ to, subject, html, text }) -> returns { messageId, provider } 6. Return { status: "sent", messageId }

Worker events:

worker.on("failed", (job, err) => log.error({ jobId, tenantId, err }, "Email job failed")); worker.on("completed", (job, result) => log.info({ jobId, tenantId, status }, "Email job completed"));

WhatsApp Handler (src/handlers/whatsapp.handler.ts)

Queue: notifications__whatsapp | Concurrency: 3 | Attempts: 3

WhatsApp Business API only supports pre-approved Meta templates. The handler maps NotificationType ? Meta template name:

const TEMPLATE_MAP: Partial<Record<string, string>> = { approval_required: "leadmetrics_approval_required", credits_exhausted: "leadmetrics_credits_exhausted", credits_warning_80: "leadmetrics_credits_warning", payment_failed: "leadmetrics_payment_failed", pipeline_blocked: "leadmetrics_pipeline_blocked", monthly_report_ready: "leadmetrics_monthly_report_ready", };

If type is not in the map, the job returns { status: "no_template" } (not an error � no retry is triggered).

Dispatch flow:

1. filterRecipientsForDev(recipients, "whatsapp") -> keep only numbers starting with DEV_ALLOWED_PHONE_PREFIX in dev 2. resolveWhatsAppProvider(tenantId) -> tenant own Meta Business account, or platform default 3. per-recipient: provider.sendTemplate({ to: phone, template: { name, languageCode }, variables }) 4. Promise.allSettled � partial success returns { status: "partial" }

Telegram Handler (src/handlers/telegram.handler.ts)

Queue: notifications__telegram | Concurrency: 5 | Attempts: 3

Telegram does not use DB templates � messages are rendered inline via renderTemplate with a key of telegram_<type> (e.g. telegram_agent_error). This is intentional: ops alerts are short structured HTML text strings, not marketing email templates.

Recipients may supply an explicit chatId. If none is supplied the message falls back to TELEGRAM_DEFAULT_CHAT_ID (the platform ops channel). This makes Telegram the primary channel for internal ops alerting.

Dispatch flow:

1. resolveTelegramProvider(tenantId) 2. renderTemplate("telegram_<type>", variables) -> HTML string 3. per-recipient: provider.send({ chatId, text, parseMode: "HTML" }) 4. Promise.allSettled � partial success if any chatId sends fail

SMS Handler (src/handlers/sms.handler.ts)

Queue: notifications__sms | Concurrency: 2 | Stub

The BullMQ worker is wired and running, but the actual dispatch is a no-op. The handler logs a warning and returns { status: "sent", sent: 0 }.

This is intentional � the queue infrastructure is live so callers can already enqueue SMS jobs. When @leadmetrics/provider-msg91 and/or @leadmetrics/provider-twilio are built, only resolver.ts and sms.handler.ts need to change.


Web Handler (src/handlers/web.handler.ts)

Queue: notifications__web | Concurrency: 20 | Attempts: 3

In-app notifications are stored in the notification PostgreSQL table (visible in the Dashboard notification centre) and optionally pushed live via an external SSE hub.

Dispatch flow:

1. renderTemplate("web_<type>", variables) -> short message string 2. Per-recipient: db.notification.create({ tenantId, userId, type, message, refId, refType, read: false }) 3. If NOTIFICATION_HUB_URL is configured (optional): POST <hub>/push { tenantId, type, message, recipients: [userId, ...] } x-hub-key: NOTIFICATION_HUB_KEY -> SSE push to connected Dashboard clients -> best-effort: hub unavailable = notification persisted, no live push

Template System

Handlebars renderer (src/templates/renderer.ts)

All templates � email HTML/text, Telegram messages, web messages � pass through:

export function renderTemplate( template: string, variables: Record<string, unknown> = {} ): string { const compiled = Handlebars.compile(template, { noEscape: true }); return compiled(variables); }

Variables use {{variable}} Mustache syntax. noEscape: true is set so HTML in email bodies is not double-escaped.

Email loader (src/templates/email-loader.ts)

Email templates are stored in the email_template PostgreSQL table (Prisma model).

Resolution order: 1. In-memory cache (5-min TTL, key = "<tenantId>:<slug>") 2. Tenant-specific (WHERE tenantId = X AND slug = Y) 3. Platform default (WHERE tenantId IS NULL AND slug = Y) -> Prisma: orderBy tenantId desc (non-null rows sort before null) 4. Soft miss -> { subject: slug, html: "", text: "" }

invalidateEmailTemplateCache(slug, tenantId) is exported for future dashboard UI use (tenant updates their template ? invalidate so next job picks it up immediately).

Seeded platform templates

packages/db/src/seed.ts seeds the welcome template as the platform default (tenantId = null). Use findFirst + conditional create/update � Prisma cannot upsert on a nullable composite unique key.


Dev-Mode Recipient Filtering (src/dev-filter.ts)

In development and test environments the server enforces outbound allow-lists.

ChannelAllow-list configBehaviour when not set
EmailDEV_ALLOWED_EMAIL_DOMAINS (comma-sep)All recipients pass through
SMS / WhatsAppDEV_ALLOWED_PHONE_PREFIX (E.164 prefix)All recipients pass through
TelegramAlways passes through
WebAlways passes through

In production and staging all allow-lists are bypassed entirely.

# .env (development) DEV_ALLOWED_EMAIL_DOMAINS=leadmetrics.ai DEV_ALLOWED_PHONE_PREFIX=+91

Provider Resolution (src/resolver.ts)

The resolver is the only file that imports concrete provider packages. Handlers depend only on the provider interface.

resolveEmailProvider(tenantId) |-- query notification_provider | WHERE tenantId = X AND channel = "email" | AND verifiedAt IS NOT NULL |-- found, provider = "smtp" -> new SmtpProvider(decrypt(row.config)) |-- found, provider = "sendgrid" -> new SendGridProvider(decrypt(row.config)) |-- found, provider = "ses" -> new SesProvider(decrypt(row.config)) +-- no row / unverified -> new SendGridProvider(platform env defaults) resolveWhatsAppProvider(tenantId) +-- no row / unverified -> new WhatsAppBusinessProvider(platform env defaults) resolveTelegramProvider(tenantId) +-- no row / unverified -> new TelegramBotProvider(platform env defaults)

Encryption note: decrypt() is currently a JSON.parse passthrough. In production this must be replaced with AES-256-GCM decryption from @leadmetrics/crypto.


Config (src/config.ts)

Zod schema with safeParse � prints all invalid fields on startup failure.

VariableRequiredDefaultDescription
NODE_ENVnodevelopmentdevelopment / staging / production / test
DATABASE_URLyesPrisma connection string
REDIS_URLnoredis://localhost:6379BullMQ connection
EMAIL_FROM_ADDRESSyesSender address for all outbound email
EMAIL_FROM_NAMEnoLeadmetricsSender display name
SENDGRID_API_KEYnoPlatform-default SendGrid key
WHATSAPP_API_KEYnoPlatform-default Meta Graph API token
WHATSAPP_BASE_URLnohttps://graph.facebook.com/v19.0Meta Graph API base
WHATSAPP_PHONE_NUMBER_IDnoMeta phone number ID
TELEGRAM_BOT_TOKENnoPlatform-default Telegram bot token
TELEGRAM_DEFAULT_CHAT_IDnoFallback ops chat ID
NOTIFICATION_HUB_URLnoSSE hub URL for live in-app push
NOTIFICATION_HUB_KEYnoShared secret for the hub
DEV_ALLOWED_EMAIL_DOMAINSnoComma-sep; email allow-list in dev/test
DEV_ALLOWED_PHONE_PREFIXnoE.164 prefix; SMS/WhatsApp allow-list

© 2026 Leadmetrics — Internal use only