Notification Server � Design & Implementation
Detailed walkthrough of apps/servers/notifications/ � file structure, startup flow,
per-handler dispatch logic, template system, dev-mode filtering, and configuration.
File Structure
apps/servers/notifications/
|-- src/
| |-- index.ts Entry point � wires Redis, starts all 5 workers, handles shutdown
| |-- config.ts Zod env validation � fails fast on missing/bad config
| |-- types.ts NotificationJob, Recipient, DispatchResult, NotificationType
| |-- resolver.ts Per-tenant provider resolution (email / whatsapp / telegram)
| |-- dev-filter.ts Outbound allow-list enforcement in dev/test environments
| |
| |-- handlers/
| | |-- email.handler.ts Queue: notifications__email Concurrency: 10
| | |-- whatsapp.handler.ts Queue: notifications__whatsapp Concurrency: 3
| | |-- telegram.handler.ts Queue: notifications__telegram Concurrency: 5
| | |-- sms.handler.ts Queue: notifications__sms Concurrency: 2 (stub)
| | +-- web.handler.ts Queue: notifications__web Concurrency: 20
| |
| +-- templates/
| |-- renderer.ts Handlebars compile + render
| |-- email-loader.ts DB template loader with 5-min in-process TTL cache
| |-- sms-templates.ts Plain-text SMS strings keyed by NotificationType
| +-- whatsapp-templates.ts Meta pre-approved template name registry
|
|-- .env
|-- .env.example
|-- package.json
+-- tsconfig.jsonStartup Flow (src/index.ts)
-
Config validation �
loadConfig()runs Zod parse ofprocess.env; throws on first invalid/missing required variable so the process never starts in a broken state. -
Redis connection � single
IORedisinstance shared across all 5 workers.maxRetriesPerRequest: nullis required by BullMQ v5. -
Worker registration � each handler factory receives the shared Redis instance:
const workers = [ startEmailWorker(redis), startWhatsAppWorker(redis), startTelegramWorker(redis), startSmsWorker(redis), startWebWorker(redis), ]; -
Graceful shutdown �
SIGTERM/SIGINTcallworker.close()on every worker thenredis.quit(). Workers drain in-flight jobs before closing. There is no hard timeout by default � the OS process manager (Docker / systemd) enforces the deadline.
Email Handler (src/handlers/email.handler.ts)
Queue: notifications__email | Concurrency: 10 | Attempts: 3 | Backoff: exponential 5 s
Dispatch flow:
Job arrives
1. filterRecipientsForDev(recipients, "email")
-> in dev: keep only addresses whose domain is in DEV_ALLOWED_EMAIL_DOMAINS
-> in production/staging: pass all through
-> if 0 recipients after filter: return { status: "skipped_dev_filter" }
2. resolveEmailProvider(tenantId)
-> query notification_provider WHERE tenantId = X AND channel = "email" AND verifiedAt != null
-> found: new SmtpProvider(cfg) | SendGridProvider(cfg) | SesProvider(cfg)
-> not found: platform SendGrid default (SENDGRID_API_KEY + EMAIL_FROM_ADDRESS env)
3. loadEmailTemplate(templateSlug ?? type, tenantId)
-> check in-process cache (5-min TTL, key = "<tenantId>:<slug>")
-> DB: SELECT WHERE slug = ? AND (tenantId = ? OR tenantId IS NULL)
ORDER BY tenantId DESC -- tenant row sorts first
-> returns { subject, html, text }
-> miss: returns { subject: slug, html: "", text: "" } (soft fallback)
4. renderTemplate(tpl.subject | html | text, variables)
-> Handlebars.compile(template)({ ...variables })
5. provider.send({ to, subject, html, text })
-> returns { messageId, provider }
6. Return { status: "sent", messageId }Worker events:
worker.on("failed", (job, err) => log.error({ jobId, tenantId, err }, "Email job failed"));
worker.on("completed", (job, result) => log.info({ jobId, tenantId, status }, "Email job completed"));WhatsApp Handler (src/handlers/whatsapp.handler.ts)
Queue: notifications__whatsapp | Concurrency: 3 | Attempts: 3
WhatsApp Business API only supports pre-approved Meta templates. The handler maps
NotificationType ? Meta template name:
const TEMPLATE_MAP: Partial<Record<string, string>> = {
approval_required: "leadmetrics_approval_required",
credits_exhausted: "leadmetrics_credits_exhausted",
credits_warning_80: "leadmetrics_credits_warning",
payment_failed: "leadmetrics_payment_failed",
pipeline_blocked: "leadmetrics_pipeline_blocked",
monthly_report_ready: "leadmetrics_monthly_report_ready",
};If type is not in the map, the job returns { status: "no_template" } (not an error �
no retry is triggered).
Dispatch flow:
1. filterRecipientsForDev(recipients, "whatsapp")
-> keep only numbers starting with DEV_ALLOWED_PHONE_PREFIX in dev
2. resolveWhatsAppProvider(tenantId)
-> tenant own Meta Business account, or platform default
3. per-recipient: provider.sendTemplate({ to: phone, template: { name, languageCode }, variables })
4. Promise.allSettled � partial success returns { status: "partial" }Telegram Handler (src/handlers/telegram.handler.ts)
Queue: notifications__telegram | Concurrency: 5 | Attempts: 3
Telegram does not use DB templates � messages are rendered inline via renderTemplate
with a key of telegram_<type> (e.g. telegram_agent_error). This is intentional: ops
alerts are short structured HTML text strings, not marketing email templates.
Recipients may supply an explicit chatId. If none is supplied the message falls back to
TELEGRAM_DEFAULT_CHAT_ID (the platform ops channel). This makes Telegram the primary
channel for internal ops alerting.
Dispatch flow:
1. resolveTelegramProvider(tenantId)
2. renderTemplate("telegram_<type>", variables) -> HTML string
3. per-recipient: provider.send({ chatId, text, parseMode: "HTML" })
4. Promise.allSettled � partial success if any chatId sends failSMS Handler (src/handlers/sms.handler.ts)
Queue: notifications__sms | Concurrency: 2 | Stub
The BullMQ worker is wired and running, but the actual dispatch is a no-op.
The handler logs a warning and returns { status: "sent", sent: 0 }.
This is intentional � the queue infrastructure is live so callers can already enqueue
SMS jobs. When @leadmetrics/provider-msg91 and/or @leadmetrics/provider-twilio are
built, only resolver.ts and sms.handler.ts need to change.
Web Handler (src/handlers/web.handler.ts)
Queue: notifications__web | Concurrency: 20 | Attempts: 3
In-app notifications are stored in the notification PostgreSQL table (visible in the
Dashboard notification centre) and optionally pushed live via an external SSE hub.
Dispatch flow:
1. renderTemplate("web_<type>", variables) -> short message string
2. Per-recipient: db.notification.create({
tenantId, userId, type, message, refId, refType, read: false
})
3. If NOTIFICATION_HUB_URL is configured (optional):
POST <hub>/push { tenantId, type, message, recipients: [userId, ...] }
x-hub-key: NOTIFICATION_HUB_KEY
-> SSE push to connected Dashboard clients
-> best-effort: hub unavailable = notification persisted, no live pushTemplate System
Handlebars renderer (src/templates/renderer.ts)
All templates � email HTML/text, Telegram messages, web messages � pass through:
export function renderTemplate(
template: string,
variables: Record<string, unknown> = {}
): string {
const compiled = Handlebars.compile(template, { noEscape: true });
return compiled(variables);
}Variables use {{variable}} Mustache syntax. noEscape: true is set so HTML in email
bodies is not double-escaped.
Email loader (src/templates/email-loader.ts)
Email templates are stored in the email_template PostgreSQL table (Prisma model).
Resolution order:
1. In-memory cache (5-min TTL, key = "<tenantId>:<slug>")
2. Tenant-specific (WHERE tenantId = X AND slug = Y)
3. Platform default (WHERE tenantId IS NULL AND slug = Y)
-> Prisma: orderBy tenantId desc (non-null rows sort before null)
4. Soft miss -> { subject: slug, html: "", text: "" }invalidateEmailTemplateCache(slug, tenantId) is exported for future dashboard UI use
(tenant updates their template ? invalidate so next job picks it up immediately).
Seeded platform templates
packages/db/src/seed.ts seeds the welcome template as the platform default
(tenantId = null). Use findFirst + conditional create/update � Prisma cannot
upsert on a nullable composite unique key.
Dev-Mode Recipient Filtering (src/dev-filter.ts)
In development and test environments the server enforces outbound allow-lists.
| Channel | Allow-list config | Behaviour when not set |
|---|---|---|
DEV_ALLOWED_EMAIL_DOMAINS (comma-sep) | All recipients pass through | |
| SMS / WhatsApp | DEV_ALLOWED_PHONE_PREFIX (E.164 prefix) | All recipients pass through |
| Telegram | � | Always passes through |
| Web | � | Always passes through |
In production and staging all allow-lists are bypassed entirely.
# .env (development)
DEV_ALLOWED_EMAIL_DOMAINS=leadmetrics.ai
DEV_ALLOWED_PHONE_PREFIX=+91Provider Resolution (src/resolver.ts)
The resolver is the only file that imports concrete provider packages. Handlers depend only on the provider interface.
resolveEmailProvider(tenantId)
|-- query notification_provider
| WHERE tenantId = X AND channel = "email"
| AND verifiedAt IS NOT NULL
|-- found, provider = "smtp" -> new SmtpProvider(decrypt(row.config))
|-- found, provider = "sendgrid" -> new SendGridProvider(decrypt(row.config))
|-- found, provider = "ses" -> new SesProvider(decrypt(row.config))
+-- no row / unverified -> new SendGridProvider(platform env defaults)
resolveWhatsAppProvider(tenantId)
+-- no row / unverified -> new WhatsAppBusinessProvider(platform env defaults)
resolveTelegramProvider(tenantId)
+-- no row / unverified -> new TelegramBotProvider(platform env defaults)Encryption note:
decrypt()is currently aJSON.parsepassthrough. In production this must be replaced with AES-256-GCM decryption from@leadmetrics/crypto.
Config (src/config.ts)
Zod schema with safeParse � prints all invalid fields on startup failure.
| Variable | Required | Default | Description |
|---|---|---|---|
NODE_ENV | no | development | development / staging / production / test |
DATABASE_URL | yes | � | Prisma connection string |
REDIS_URL | no | redis://localhost:6379 | BullMQ connection |
EMAIL_FROM_ADDRESS | yes | � | Sender address for all outbound email |
EMAIL_FROM_NAME | no | Leadmetrics | Sender display name |
SENDGRID_API_KEY | no | � | Platform-default SendGrid key |
WHATSAPP_API_KEY | no | � | Platform-default Meta Graph API token |
WHATSAPP_BASE_URL | no | https://graph.facebook.com/v19.0 | Meta Graph API base |
WHATSAPP_PHONE_NUMBER_ID | no | � | Meta phone number ID |
TELEGRAM_BOT_TOKEN | no | � | Platform-default Telegram bot token |
TELEGRAM_DEFAULT_CHAT_ID | no | � | Fallback ops chat ID |
NOTIFICATION_HUB_URL | no | � | SSE hub URL for live in-app push |
NOTIFICATION_HUB_KEY | no | � | Shared secret for the hub |
DEV_ALLOWED_EMAIL_DOMAINS | no | � | Comma-sep; email allow-list in dev/test |
DEV_ALLOWED_PHONE_PREFIX | no | � | E.164 prefix; SMS/WhatsApp allow-list |