Gap 4: BullMQ Workers and LangGraph Agents Are Completely Isolated
Problem
Leadmetrics has two separate agent execution systems that do not communicate:
| System | Purpose | Communication |
|---|---|---|
| BullMQ workers | Background content generation (blog-writer, context-file-writer, setup chain, insights, etc.) | Job payloads via Redis |
| LangGraph agents | Interactive chat (analytics agent, future executor agent) | SSE stream to frontend |
The LangGraph executor agent (phase 3) is being built to take actions on behalf of users — but it has no way to trigger a BullMQ job. If a user asks “regenerate this blog post with a more casual tone”, the executor agent cannot act on it. Conversely, when a BullMQ worker finishes a long-running job, it has no way to push a result into an active LangGraph conversation thread.
This is the HuggingGPT architecture gap: in HuggingGPT, the planner, model selector, task executor, and response synthesiser all share state and can delegate to each other. In Leadmetrics, interactive and background agents are siloed.
What breaks without this bridge
- Executor agent (phase 3) cannot regenerate content, trigger setup chains, enqueue insight refreshes, or schedule publishing
- Users in Agent Chat have no way to act on the platform — only query it
- Long-running BullMQ completions cannot surface results back into an active chat session
- No way to compose: “run these 3 background agents, wait for results, then summarise”
What to Build
1. triggerAgentJob tool for LangGraph executor
Add a structured LangChain tool to the executor agent that enqueues a BullMQ job:
// packages/ai-chat/src/tools/executor/trigger-agent-job.ts
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { enqueueAgentJob } from "@leadmetrics/queue";
export const triggerAgentJob = tool(
async ({ role, inputSummary, options }, config) => {
const { tenantId } = config.configurable.agentContext;
const job = await enqueueAgentJob({
role,
tenantId,
inputSummary,
triggeredBy: "agent_chat",
options,
});
return {
jobId: job.id,
status: "queued",
message: `Enqueued ${role} agent. Job ID: ${job.id}`,
};
},
{
name: "triggerAgentJob",
description: "Trigger a background agent job (e.g. regenerate blog post, refresh insights, run setup chain)",
schema: z.object({
role: z.enum([
"blog-writer", "social-post-writer", "context-file-writer",
"strategy-writer", "keyword-researcher", "insight-refresh",
]),
inputSummary: z.string().describe("What this job should do"),
options: z.record(z.unknown()).optional().describe("Additional job-specific parameters"),
}),
}
);2. pollJobStatus tool
The executor needs to check whether a triggered job completed:
export const pollJobStatus = tool(
async ({ jobId }, config) => {
const run = await db.agentRun.findFirst({
where: { runId: jobId },
select: { status: true, output: true, error: true, completedAt: true },
});
if (!run) return { status: "not_found" };
return { status: run.status, completedAt: run.completedAt, output: run.output };
},
{
name: "pollJobStatus",
description: "Check the status of a previously triggered background agent job",
schema: z.object({ jobId: z.string() }),
}
);3. Chat thread notification from BullMQ completion
When a BullMQ worker completes a job that was triggeredBy: "agent_chat", push a notification into the originating chat thread:
// In setup.worker.ts / blog-writer.worker.ts completion handler:
if (jobData.triggeredBy === "agent_chat" && jobData.chatThreadId) {
await resumeChatThread(jobData.chatThreadId, {
role: "tool",
content: `Agent job completed: ${agentRole}. Output ready.`,
toolCallId: jobData.chatToolCallId,
});
}resumeChatThread appends a message to the LangGraph thread via the checkpointer so the next user turn picks it up automatically.
4. Compose pattern: fan-out and barrier
With this bridge in place, the executor can implement the parallel setup chain pattern described in Gap (original analysis item 3):
User: "Refresh my brand context"
Executor:
1. triggerAgentJob("client-researcher") → jobId_1
2. triggerAgentJob("competitor-researcher") → jobId_2
3. pollJobStatus(jobId_1) until complete
4. pollJobStatus(jobId_2) until complete
5. triggerAgentJob("context-file-writer", { clientJobId: jobId_1, competitorJobId: jobId_2 })
6. "Your brand context is being regenerated. I'll notify you when it's ready."This replaces the hardcoded sequential setup chain with a dynamic, orchestrated version.
Files to Change
- New file:
packages/ai-chat/src/tools/executor/trigger-agent-job.ts - New file:
packages/ai-chat/src/tools/executor/poll-job-status.ts packages/ai-chat/src/agents/executor.ts— wire in new tools (phase 3 executor agent)packages/queue/src/queues.ts— addtriggeredByandchatThreadIdto job data typespackages/agents/src/workers/setup.worker.ts— notify chat thread on completionpackages/agents/src/workers/blog-writer.worker.ts— same
Related
- Gap 7: Priority queue differentiation (executor-triggered jobs need higher priority)
- Original improvement #3: parallel setup chain (enabled once the bridge exists)