Skip to Content
Serversservers/scheduler — @leadmetrics/server-scheduler

servers/scheduler — @leadmetrics/server-scheduler

A dedicated Node.js background service that polls the scheduled_task database table and dispatches due tasks to handler functions. Unlike the reporting server (which uses node-cron for fixed recurring schedules), the scheduler server handles dynamic one-off tasks — records inserted into the DB by the API at the time of the triggering event.

Source: apps/servers/scheduler/


Why a Separate Service

ConcernReason
Dynamic schedulingTasks are written to the DB at event time (e.g. signup at 14:37 → reminder at 15:37); cron cannot express this
IsolationTask failures never affect the API or agent workers
Horizontal scalingAtomic DB claim (updateMany WHERE status = 'pending') prevents double-execution across replicas
PersistenceTasks survive process restarts — the DB is the source of truth

The reporting server uses node-cron for fixed recurring reports (daily, weekly). This server uses DB polling for dynamic one-off tasks. They are complementary patterns.


Task Types

Task typeHandlerWhen triggeredDelay
signup.reminder.earlyhandleSignupReminderEmailStep 1/2 abandonment+1 hour
signup.reminder.latehandleSignupReminderEmailStep 1/2 abandonment+24 hours
signup.alert.internalhandleSignupAlertEmailStep 1 completionImmediate

Tasks are created by the API (apps/api/src/routers/auth/signup.ts) and stored in the scheduled_task table. The scheduler polls and executes them.


Poll & Claim Pattern (src/task-runner.ts)

Every POLL_INTERVAL_MS (default 15 min): 1. SELECT * FROM scheduled_task WHERE status = 'pending' AND scheduledAt <= now() ORDER BY scheduledAt ASC LIMIT MAX_TASKS_PER_POLL 2. For each task: UPDATE scheduled_task SET status = 'running', attempts = attempts + 1 WHERE id = ? AND status = 'pending' ← optimistic lock → if 0 rows affected: another replica claimed it — skip 3. Execute handler 4. On success: UPDATE status = 'completed', completedAt = now() On failure: UPDATE status = 'failed', errorMessage = err.message

Tasks are not retried on failure — they move to failed and stay there. Deduplication is handled at the enqueue side (dedupeKey on the BullMQ job for the notification).


Signup Email Handlers (src/handlers/signup-emails.ts)

handleSignupReminderEmail

  1. Checks RegistrationSession still exists — if deleted (signup completed), skips silently.
  2. Checks currentStage !== "step_4" && !== "complete" — skips if user progressed past step 3.
  3. Enqueues enqueueNotification to notifications__email with templateSlug and resumeUrl.
  4. dedupeKey = <templateSlug>__<sessionId> prevents double-send on scheduler restart.

handleSignupAlertEmail

  1. No DB guard needed — always fires (internal sales alert).
  2. Enqueues to notifications__email with recipient = config.SIGNUP_ALERT_EMAIL.
  3. dedupeKey = signup-alert-internal__<email> — one alert per prospective email.

Both handlers use sessionId as tenantId for the notification job (pre-account creation).


Config (src/config.ts)

VariableRequiredDefaultDescription
DATABASE_URLyesPrisma connection string
REDIS_URLnoredis://localhost:6379For enqueueNotification
POLL_INTERVAL_MSno900000 (15 min)How often to poll for due tasks
MAX_TASKS_PER_POLLno50Max tasks claimed per poll cycle
DASHBOARD_URLnohttps://app.leadmetrics.aiUsed to build signup resume links
SIGNUP_ALERT_EMAILnomoble@leadmetrics.aiInternal sales alert recipient (dev default)

File Structure

apps/servers/scheduler/ |-- src/ | |-- index.ts Entry point — creates db singleton, starts task runner | |-- config.ts Zod env validation (safeParse + formatted errors) | |-- task-runner.ts Poll loop, atomic claim, executeTask() dispatch | | | |-- handlers/ | | +-- signup-emails.ts handleSignupReminderEmail, handleSignupAlertEmail | | | +-- __tests__/ | +-- signup-emails.test.ts Unit tests for both handlers (vi.hoisted mocks) | |-- .env |-- package.json +-- tsconfig.json

Startup Flow

  1. Load config via Zod safeParse.
  2. Import db singleton from @leadmetrics/db.
  3. Call startTaskRunner(db) — runs an immediate poll then schedules recurring polls.
  4. Graceful shutdown on SIGTERM/SIGINT: call stop() (clears the timer).

© 2026 Leadmetrics — Internal use only