Real-Time Agent Status — Socket.io
Status: [To Build]
Purpose: Push live job-status events from BullMQ workers to connected dashboard clients so users see agent progress without polling or page refreshes.
Problem
The current /settings/context page (and any future agent-progress UI) relies on full-page reloads to pick up status changes. Users sit on a spinning “Generating…” screen with no feedback on which agent is running or how far along it is. The same problem will recur for every long-running agent job (strategy writer, deliverable planner, etc.).
Solution Overview
BullMQ Worker
│ emits job lifecycle events (active / progress / completed / failed)
▼
Redis Pub/Sub channel ─────────────────────────────────────────────────────┐
│
apps/api ←── subscribes to Redis channel │
│ Socket.io server attached to Fastify HTTP server │
│ re-publishes to tenant-scoped Socket.io room │
▼
apps/dashboard ←── Socket.io client │
│ joins room `tenant:{tenantId}` │
│ receives typed events, updates local React state │
▼
UI components re-render in real time (no reload, no polling)Why Socket.io over raw WebSockets or SSE?
| Socket.io | Raw WebSocket | SSE | |
|---|---|---|---|
| Reconnection | Automatic | Manual | Browser-native |
| Rooms (tenant isolation) | Built-in | Manual | No |
| Fallback (long-poll) | Built-in | No | No |
| Bidirectional | Yes | Yes | No (server→client only) |
| Next.js Server Components | Works with a separate WS origin | Same | Edge-compatible |
Socket.io rooms give us tenant isolation for free: each connected client joins tenant:{tenantId} on connect and only receives events for their tenant.
Events Schema
All events are namespaced under /agents.
agent:active
Fired when a worker picks up a job and begins processing.
{
tenantId: string;
jobId: string;
queue: string; // e.g. "client-researcher"
agentRole: string; // same as queue name
runId: string;
startedAt: string; // ISO
}agent:progress
Fired periodically as the agent streams output (optional — for agents that support streaming).
{
tenantId: string;
jobId: string;
agentRole: string;
progress: number; // 0–100
message: string; // e.g. "Researching competitors..."
}agent:completed
Fired when a job finishes successfully.
{
tenantId: string;
jobId: string;
agentRole: string;
runId: string;
completedAt: string; // ISO
nextAgent?: string; // e.g. "competitor-researcher" (if chaining)
}agent:failed
Fired when a job exhausts retries or throws an unrecoverable error.
{
tenantId: string;
jobId: string;
agentRole: string;
error: string;
failedAt: string; // ISO
}context:status_changed
High-level event specifically for context file status transitions (pending → generating → completed → approved). Consumed by /settings/context UI.
{
tenantId: string;
status: "pending" | "generating" | "completed" | "approved";
version?: number;
}Architecture — Server Side
1. Socket.io server (apps/api)
Socket.io is mounted on the existing Fastify HTTP server. No separate process needed.
// apps/api/src/realtime/socket.ts
import { Server } from "socket.io";
import type { FastifyInstance } from "fastify";
export function mountSocketIO(fastify: FastifyInstance): Server {
const io = new Server(fastify.server, {
cors: { origin: [process.env.DASHBOARD_URL!, process.env.DM_URL!] },
path: "/rt", // /rt instead of /socket.io to avoid conflicts
});
io.on("connection", async (socket) => {
// Client sends its access token on connect
const token = socket.handshake.auth.token as string;
const payload = verifyAccessToken(token); // existing JWT util
if (!payload?.tenantId) { socket.disconnect(); return; }
// Join tenant-scoped room — all events are isolated per tenant
socket.join(`tenant:${payload.tenantId}`);
socket.data.tenantId = payload.tenantId;
});
return io;
}2. Redis bridge (packages/queue)
BullMQ exposes QueueEvents which fires on every job lifecycle change. A lightweight bridge process subscribes to these and re-publishes to the Socket.io server via Redis pub/sub (so it works across multiple API replicas).
// packages/queue/src/realtime/bridge.ts
import { QueueEvents } from "bullmq";
import { getRedisConnection } from "../connection";
import type { Server } from "socket.io";
/**
* Subscribe to BullMQ job lifecycle events for a tenant's queues
* and re-emit them to connected Socket.io clients.
*/
export function bridgeQueueEvents(io: Server, tenantId: string, role: string): void {
const queueName = `${tenantId}__${role}`;
const queueEvents = new QueueEvents(queueName, { connection: getRedisConnection() });
queueEvents.on("active", ({ jobId }) => {
io.to(`tenant:${tenantId}`).emit("agent:active", { tenantId, jobId, agentRole: role });
});
queueEvents.on("progress", ({ jobId, data }) => {
io.to(`tenant:${tenantId}`).emit("agent:progress", { tenantId, jobId, agentRole: role, ...data });
});
queueEvents.on("completed", ({ jobId }) => {
io.to(`tenant:${tenantId}`).emit("agent:completed", { tenantId, jobId, agentRole: role });
});
queueEvents.on("failed", ({ jobId, failedReason }) => {
io.to(`tenant:${tenantId}`).emit("agent:failed", { tenantId, jobId, agentRole: role, error: failedReason });
});
}Global bridge — called once per API startup for all active tenants. For new tenants (post-registration), bridgeQueueEvents is called inline right after enqueueSetupChain.
3. context:status_changed emission
The setup worker already updates clientContext.status in the DB. After each DB write, it should also emit context:status_changed:
// In processSetupJob, after each DB update:
io.to(`tenant:${data.tenantId}`).emit("context:status_changed", {
tenantId: data.tenantId,
status: "generating", // or "completed"
});The io instance is passed into the worker factory at startup.
Architecture — Client Side
Package
pnpm add socket.io-client --filter dashboardSingleton client (apps/dashboard/src/lib/socket.ts)
import { io, Socket } from "socket.io-client";
import { getAccessToken } from "./auth-client"; // existing util
let _socket: Socket | null = null;
export function getSocket(): Socket {
if (!_socket) {
_socket = io(process.env.NEXT_PUBLIC_API_URL!, {
path: "/rt",
auth: { token: getAccessToken() },
transports: ["websocket"],
autoConnect: false,
});
}
return _socket;
}React hook (apps/dashboard/src/hooks/useAgentStatus.ts)
"use client";
import { useEffect, useState } from "react";
import { getSocket } from "@/lib/socket";
export type AgentRole = "client-researcher" | "competitor-researcher" | "context-file-writer";
export type AgentStatus = {
role: AgentRole;
state: "idle" | "active" | "completed" | "failed";
progress?: number;
message?: string;
};
export function useAgentStatus(roles: AgentRole[]) {
const [statuses, setStatuses] = useState<Record<AgentRole, AgentStatus>>(
Object.fromEntries(roles.map(r => [r, { role: r, state: "idle" }])) as Record<AgentRole, AgentStatus>
);
useEffect(() => {
const socket = getSocket();
socket.connect();
const patch = (role: AgentRole, update: Partial<AgentStatus>) =>
setStatuses(prev => ({ ...prev, [role]: { ...prev[role], ...update } }));
socket.on("agent:active", e => patch(e.agentRole, { state: "active" }));
socket.on("agent:progress", e => patch(e.agentRole, { progress: e.progress, message: e.message }));
socket.on("agent:completed", e => patch(e.agentRole, { state: "completed" }));
socket.on("agent:failed", e => patch(e.agentRole, { state: "failed" }));
return () => {
socket.off("agent:active");
socket.off("agent:progress");
socket.off("agent:completed");
socket.off("agent:failed");
socket.disconnect();
};
}, []);
return statuses;
}
export function useContextStatus() {
const [status, setStatus] = useState<string | null>(null);
useEffect(() => {
const socket = getSocket();
socket.connect();
socket.on("context:status_changed", e => setStatus(e.status));
return () => { socket.off("context:status_changed"); socket.disconnect(); };
}, []);
return status;
}Usage in /settings/context page
The page currently server-renders and reads DB status. With real-time:
- Initial render — still server-rendered (shows DB state, correct on first load).
- Client hydration —
useContextStatus()hook subscribes tocontext:status_changed. - On
status === "completed"event → re-fetch the page (Next.jsrouter.refresh()) to show the approve banner and content.
// apps/dashboard/src/app/(dashboard)/settings/context/ContextStatusListener.tsx
"use client";
import { useEffect } from "react";
import { useRouter } from "next/navigation";
import { useContextStatus } from "@/hooks/useAgentStatus";
export function ContextStatusListener({ initialStatus }: { initialStatus: string }) {
const router = useRouter();
const rtStatus = useContextStatus();
useEffect(() => {
if (rtStatus === "completed" && initialStatus !== "completed") {
router.refresh(); // re-run the server component to show approve banner
}
}, [rtStatus, initialStatus, router]);
return null;
}Package — @leadmetrics/realtime
To keep the Socket.io server and event types shareable across apps/api and apps/dashboard, create a new internal package:
packages/realtime/
├── package.json
├── src/
│ ├── events.ts # Typed event map (shared between server & client)
│ ├── server.ts # mountSocketIO() + bridgeQueueEvents()
│ └── index.tsThis avoids duplicating the event type definitions and ensures the client and server always agree on payload shapes.
Deployment Considerations
| Concern | Solution |
|---|---|
| Multiple API replicas | Use Socket.io Redis adapter (@socket.io/redis-adapter) so events emitted on replica A reach clients connected to replica B |
| Auth token expiry | Client re-connects with a fresh token; server re-validates on reconnect |
| Tenant isolation | Rooms enforced server-side; client cannot join another tenant’s room |
| Missing events (reconnect gap) | On reconnect, client triggers a router.refresh() to re-read DB state as ground truth |
| Worker process restart | QueueEvents are stateless (Redis-backed); bridge restarts cleanly with the worker |
Implementation Plan
| Step | Task | Owner |
|---|---|---|
| 1 | Create packages/realtime with shared event types | |
| 2 | Attach Socket.io to Fastify in apps/api/src/index.ts | |
| 3 | Add bridgeQueueEvents() call per tenant at API startup | |
| 4 | Emit context:status_changed from setup.worker.ts after each DB write | |
| 5 | Add socket.io-client to dashboard; implement getSocket() singleton | |
| 6 | Implement useAgentStatus and useContextStatus hooks | |
| 7 | Add <ContextStatusListener> to /settings/context page | |
| 8 | Update /settings/context generating UI to show per-agent live state (active/done/failed icons) | |
| 9 | Add Redis adapter for multi-replica support | |
| 10 | Write Playwright tests: register → watch context page update live without refresh |
Affected Files (when implemented)
| File | Change |
|---|---|
apps/api/src/index.ts | Mount Socket.io server |
apps/api/src/routers/auth.ts | Call bridgeQueueEvents after enqueueSetupChain |
packages/queue/src/workers/setup.worker.ts | Emit context:status_changed after each status DB write |
packages/realtime/ | New package (events, server mount, bridge) |
apps/dashboard/src/lib/socket.ts | New — singleton Socket.io client |
apps/dashboard/src/hooks/useAgentStatus.ts | New — hooks for agent and context status |
apps/dashboard/src/app/(dashboard)/settings/context/page.tsx | Add <ContextStatusListener> |
apps/dashboard/src/app/(dashboard)/settings/context/ContextStatusListener.tsx | New client component |