servers/scheduler — @leadmetrics/server-scheduler
A dedicated Node.js background service that polls the scheduled_task database table and
dispatches due tasks to handler functions. Unlike the reporting server (which uses node-cron
for fixed recurring schedules), the scheduler server handles dynamic one-off tasks — records
inserted into the DB by the API at the time of the triggering event.
Source: apps/servers/scheduler/
Why a Separate Service
| Concern | Reason |
|---|---|
| Dynamic scheduling | Tasks are written to the DB at event time (e.g. signup at 14:37 → reminder at 15:37); cron cannot express this |
| Isolation | Task failures never affect the API or agent workers |
| Horizontal scaling | Atomic DB claim (updateMany WHERE status = 'pending') prevents double-execution across replicas |
| Persistence | Tasks survive process restarts — the DB is the source of truth |
The reporting server uses
node-cronfor fixed recurring reports (daily, weekly). This server uses DB polling for dynamic one-off tasks. They are complementary patterns.
Task Types
| Task type | Handler | When triggered | Delay |
|---|---|---|---|
signup.reminder.early | handleSignupReminderEmail | Step 1/2 abandonment | +1 hour |
signup.reminder.late | handleSignupReminderEmail | Step 1/2 abandonment | +24 hours |
signup.alert.internal | handleSignupAlertEmail | Step 1 completion | Immediate |
Tasks are created by the API (apps/api/src/routers/auth/signup.ts) and stored in the
scheduled_task table. The scheduler polls and executes them.
Poll & Claim Pattern (src/task-runner.ts)
Every POLL_INTERVAL_MS (default 15 min):
1. SELECT * FROM scheduled_task WHERE status = 'pending' AND scheduledAt <= now()
ORDER BY scheduledAt ASC LIMIT MAX_TASKS_PER_POLL
2. For each task:
UPDATE scheduled_task SET status = 'running', attempts = attempts + 1
WHERE id = ? AND status = 'pending' ← optimistic lock
→ if 0 rows affected: another replica claimed it — skip
3. Execute handler
4. On success: UPDATE status = 'completed', completedAt = now()
On failure: UPDATE status = 'failed', errorMessage = err.messageTasks are not retried on failure — they move to failed and stay there. Deduplication
is handled at the enqueue side (dedupeKey on the BullMQ job for the notification).
Signup Email Handlers (src/handlers/signup-emails.ts)
handleSignupReminderEmail
- Checks
RegistrationSessionstill exists — if deleted (signup completed), skips silently. - Checks
currentStage !== "step_4" && !== "complete"— skips if user progressed past step 3. - Enqueues
enqueueNotificationtonotifications__emailwithtemplateSlugandresumeUrl. dedupeKey = <templateSlug>__<sessionId>prevents double-send on scheduler restart.
handleSignupAlertEmail
- No DB guard needed — always fires (internal sales alert).
- Enqueues to
notifications__emailwith recipient =config.SIGNUP_ALERT_EMAIL. dedupeKey = signup-alert-internal__<email>— one alert per prospective email.
Both handlers use sessionId as tenantId for the notification job (pre-account creation).
Config (src/config.ts)
| Variable | Required | Default | Description |
|---|---|---|---|
DATABASE_URL | yes | — | Prisma connection string |
REDIS_URL | no | redis://localhost:6379 | For enqueueNotification |
POLL_INTERVAL_MS | no | 900000 (15 min) | How often to poll for due tasks |
MAX_TASKS_PER_POLL | no | 50 | Max tasks claimed per poll cycle |
DASHBOARD_URL | no | https://app.leadmetrics.ai | Used to build signup resume links |
SIGNUP_ALERT_EMAIL | no | moble@leadmetrics.ai | Internal sales alert recipient (dev default) |
File Structure
apps/servers/scheduler/
|-- src/
| |-- index.ts Entry point — creates db singleton, starts task runner
| |-- config.ts Zod env validation (safeParse + formatted errors)
| |-- task-runner.ts Poll loop, atomic claim, executeTask() dispatch
| |
| |-- handlers/
| | +-- signup-emails.ts handleSignupReminderEmail, handleSignupAlertEmail
| |
| +-- __tests__/
| +-- signup-emails.test.ts Unit tests for both handlers (vi.hoisted mocks)
|
|-- .env
|-- package.json
+-- tsconfig.jsonStartup Flow
- Load config via Zod
safeParse. - Import
dbsingleton from@leadmetrics/db. - Call
startTaskRunner(db)— runs an immediate poll then schedules recurring polls. - Graceful shutdown on
SIGTERM/SIGINT: callstop()(clears the timer).