Skip to Content
FeaturesReal-Time Agent Status — Socket.io

Real-Time Agent Status — Socket.io

Status: [To Build]

Purpose: Push live job-status events from BullMQ workers to connected dashboard clients so users see agent progress without polling or page refreshes.


Problem

The current /settings/context page (and any future agent-progress UI) relies on full-page reloads to pick up status changes. Users sit on a spinning “Generating…” screen with no feedback on which agent is running or how far along it is. The same problem will recur for every long-running agent job (strategy writer, deliverable planner, etc.).


Solution Overview

BullMQ Worker │ emits job lifecycle events (active / progress / completed / failed) Redis Pub/Sub channel ─────────────────────────────────────────────────────┐ apps/api ←── subscribes to Redis channel │ │ Socket.io server attached to Fastify HTTP server │ │ re-publishes to tenant-scoped Socket.io room │ apps/dashboard ←── Socket.io client │ │ joins room `tenant:{tenantId}` │ │ receives typed events, updates local React state │ UI components re-render in real time (no reload, no polling)

Why Socket.io over raw WebSockets or SSE?

Socket.ioRaw WebSocketSSE
ReconnectionAutomaticManualBrowser-native
Rooms (tenant isolation)Built-inManualNo
Fallback (long-poll)Built-inNoNo
BidirectionalYesYesNo (server→client only)
Next.js Server ComponentsWorks with a separate WS originSameEdge-compatible

Socket.io rooms give us tenant isolation for free: each connected client joins tenant:{tenantId} on connect and only receives events for their tenant.


Events Schema

All events are namespaced under /agents.

agent:active

Fired when a worker picks up a job and begins processing.

{ tenantId: string; jobId: string; queue: string; // e.g. "client-researcher" agentRole: string; // same as queue name runId: string; startedAt: string; // ISO }

agent:progress

Fired periodically as the agent streams output (optional — for agents that support streaming).

{ tenantId: string; jobId: string; agentRole: string; progress: number; // 0–100 message: string; // e.g. "Researching competitors..." }

agent:completed

Fired when a job finishes successfully.

{ tenantId: string; jobId: string; agentRole: string; runId: string; completedAt: string; // ISO nextAgent?: string; // e.g. "competitor-researcher" (if chaining) }

agent:failed

Fired when a job exhausts retries or throws an unrecoverable error.

{ tenantId: string; jobId: string; agentRole: string; error: string; failedAt: string; // ISO }

context:status_changed

High-level event specifically for context file status transitions (pending → generating → completed → approved). Consumed by /settings/context UI.

{ tenantId: string; status: "pending" | "generating" | "completed" | "approved"; version?: number; }

Architecture — Server Side

1. Socket.io server (apps/api)

Socket.io is mounted on the existing Fastify HTTP server. No separate process needed.

// apps/api/src/realtime/socket.ts import { Server } from "socket.io"; import type { FastifyInstance } from "fastify"; export function mountSocketIO(fastify: FastifyInstance): Server { const io = new Server(fastify.server, { cors: { origin: [process.env.DASHBOARD_URL!, process.env.DM_URL!] }, path: "/rt", // /rt instead of /socket.io to avoid conflicts }); io.on("connection", async (socket) => { // Client sends its access token on connect const token = socket.handshake.auth.token as string; const payload = verifyAccessToken(token); // existing JWT util if (!payload?.tenantId) { socket.disconnect(); return; } // Join tenant-scoped room — all events are isolated per tenant socket.join(`tenant:${payload.tenantId}`); socket.data.tenantId = payload.tenantId; }); return io; }

2. Redis bridge (packages/queue)

BullMQ exposes QueueEvents which fires on every job lifecycle change. A lightweight bridge process subscribes to these and re-publishes to the Socket.io server via Redis pub/sub (so it works across multiple API replicas).

// packages/queue/src/realtime/bridge.ts import { QueueEvents } from "bullmq"; import { getRedisConnection } from "../connection"; import type { Server } from "socket.io"; /** * Subscribe to BullMQ job lifecycle events for a tenant's queues * and re-emit them to connected Socket.io clients. */ export function bridgeQueueEvents(io: Server, tenantId: string, role: string): void { const queueName = `${tenantId}__${role}`; const queueEvents = new QueueEvents(queueName, { connection: getRedisConnection() }); queueEvents.on("active", ({ jobId }) => { io.to(`tenant:${tenantId}`).emit("agent:active", { tenantId, jobId, agentRole: role }); }); queueEvents.on("progress", ({ jobId, data }) => { io.to(`tenant:${tenantId}`).emit("agent:progress", { tenantId, jobId, agentRole: role, ...data }); }); queueEvents.on("completed", ({ jobId }) => { io.to(`tenant:${tenantId}`).emit("agent:completed", { tenantId, jobId, agentRole: role }); }); queueEvents.on("failed", ({ jobId, failedReason }) => { io.to(`tenant:${tenantId}`).emit("agent:failed", { tenantId, jobId, agentRole: role, error: failedReason }); }); }

Global bridge — called once per API startup for all active tenants. For new tenants (post-registration), bridgeQueueEvents is called inline right after enqueueSetupChain.

3. context:status_changed emission

The setup worker already updates clientContext.status in the DB. After each DB write, it should also emit context:status_changed:

// In processSetupJob, after each DB update: io.to(`tenant:${data.tenantId}`).emit("context:status_changed", { tenantId: data.tenantId, status: "generating", // or "completed" });

The io instance is passed into the worker factory at startup.


Architecture — Client Side

Package

pnpm add socket.io-client --filter dashboard

Singleton client (apps/dashboard/src/lib/socket.ts)

import { io, Socket } from "socket.io-client"; import { getAccessToken } from "./auth-client"; // existing util let _socket: Socket | null = null; export function getSocket(): Socket { if (!_socket) { _socket = io(process.env.NEXT_PUBLIC_API_URL!, { path: "/rt", auth: { token: getAccessToken() }, transports: ["websocket"], autoConnect: false, }); } return _socket; }

React hook (apps/dashboard/src/hooks/useAgentStatus.ts)

"use client"; import { useEffect, useState } from "react"; import { getSocket } from "@/lib/socket"; export type AgentRole = "client-researcher" | "competitor-researcher" | "context-file-writer"; export type AgentStatus = { role: AgentRole; state: "idle" | "active" | "completed" | "failed"; progress?: number; message?: string; }; export function useAgentStatus(roles: AgentRole[]) { const [statuses, setStatuses] = useState<Record<AgentRole, AgentStatus>>( Object.fromEntries(roles.map(r => [r, { role: r, state: "idle" }])) as Record<AgentRole, AgentStatus> ); useEffect(() => { const socket = getSocket(); socket.connect(); const patch = (role: AgentRole, update: Partial<AgentStatus>) => setStatuses(prev => ({ ...prev, [role]: { ...prev[role], ...update } })); socket.on("agent:active", e => patch(e.agentRole, { state: "active" })); socket.on("agent:progress", e => patch(e.agentRole, { progress: e.progress, message: e.message })); socket.on("agent:completed", e => patch(e.agentRole, { state: "completed" })); socket.on("agent:failed", e => patch(e.agentRole, { state: "failed" })); return () => { socket.off("agent:active"); socket.off("agent:progress"); socket.off("agent:completed"); socket.off("agent:failed"); socket.disconnect(); }; }, []); return statuses; } export function useContextStatus() { const [status, setStatus] = useState<string | null>(null); useEffect(() => { const socket = getSocket(); socket.connect(); socket.on("context:status_changed", e => setStatus(e.status)); return () => { socket.off("context:status_changed"); socket.disconnect(); }; }, []); return status; }

Usage in /settings/context page

The page currently server-renders and reads DB status. With real-time:

  1. Initial render — still server-rendered (shows DB state, correct on first load).
  2. Client hydrationuseContextStatus() hook subscribes to context:status_changed.
  3. On status === "completed" event → re-fetch the page (Next.js router.refresh()) to show the approve banner and content.
// apps/dashboard/src/app/(dashboard)/settings/context/ContextStatusListener.tsx "use client"; import { useEffect } from "react"; import { useRouter } from "next/navigation"; import { useContextStatus } from "@/hooks/useAgentStatus"; export function ContextStatusListener({ initialStatus }: { initialStatus: string }) { const router = useRouter(); const rtStatus = useContextStatus(); useEffect(() => { if (rtStatus === "completed" && initialStatus !== "completed") { router.refresh(); // re-run the server component to show approve banner } }, [rtStatus, initialStatus, router]); return null; }

Package — @leadmetrics/realtime

To keep the Socket.io server and event types shareable across apps/api and apps/dashboard, create a new internal package:

packages/realtime/ ├── package.json ├── src/ │ ├── events.ts # Typed event map (shared between server & client) │ ├── server.ts # mountSocketIO() + bridgeQueueEvents() │ └── index.ts

This avoids duplicating the event type definitions and ensures the client and server always agree on payload shapes.


Deployment Considerations

ConcernSolution
Multiple API replicasUse Socket.io Redis adapter (@socket.io/redis-adapter) so events emitted on replica A reach clients connected to replica B
Auth token expiryClient re-connects with a fresh token; server re-validates on reconnect
Tenant isolationRooms enforced server-side; client cannot join another tenant’s room
Missing events (reconnect gap)On reconnect, client triggers a router.refresh() to re-read DB state as ground truth
Worker process restartQueueEvents are stateless (Redis-backed); bridge restarts cleanly with the worker

Implementation Plan

StepTaskOwner
1Create packages/realtime with shared event types
2Attach Socket.io to Fastify in apps/api/src/index.ts
3Add bridgeQueueEvents() call per tenant at API startup
4Emit context:status_changed from setup.worker.ts after each DB write
5Add socket.io-client to dashboard; implement getSocket() singleton
6Implement useAgentStatus and useContextStatus hooks
7Add <ContextStatusListener> to /settings/context page
8Update /settings/context generating UI to show per-agent live state (active/done/failed icons)
9Add Redis adapter for multi-replica support
10Write Playwright tests: register → watch context page update live without refresh

Affected Files (when implemented)

FileChange
apps/api/src/index.tsMount Socket.io server
apps/api/src/routers/auth.tsCall bridgeQueueEvents after enqueueSetupChain
packages/queue/src/workers/setup.worker.tsEmit context:status_changed after each status DB write
packages/realtime/New package (events, server mount, bridge)
apps/dashboard/src/lib/socket.tsNew — singleton Socket.io client
apps/dashboard/src/hooks/useAgentStatus.tsNew — hooks for agent and context status
apps/dashboard/src/app/(dashboard)/settings/context/page.tsxAdd <ContextStatusListener>
apps/dashboard/src/app/(dashboard)/settings/context/ContextStatusListener.tsxNew client component

© 2026 Leadmetrics — Internal use only