Stream Continuation
Last updated on March 25, 2026
Edward Stream Continuation Architecture - Deep Dive
Principal Engineer's Technical Reference
Easy to digest HLD

Table of Contents
- Executive Summary
- Architecture Principles
- Core Components Deep Dive
- Stream Continuation Flow
- Checkpoint Mechanism
- Event Sourcing & Replay
- Frontend Reconnection Strategy
- Worker Resumption
- Failure Modes & Recovery
- Infrastructure Cross-Questions
- Key Files Reference
1. Executive Summary
What is Stream Continuation?
Stream Continuation is Edward's resilience architecture that enables:
- Seamless reconnection after client disconnects (page refresh, network blip, tab switch)
- Worker crash recovery without losing in-progress code generation
- Exact state replay from any point in the stream using cursor-based tracking
- Multi-turn agent loop checkpointing for long-running operations
Why This Matters
Without stream continuation:
- Page refresh = lost work, user frustration
- Worker restart = orphaned runs, inconsistent state
- Network hiccup = incomplete code generation
- Long operations = no recovery from mid-execution failures
With stream continuation:
- Durable execution: Every event persisted, every state checkpointed
- Resumable UX: Users can refresh/reconnect without losing progress
- Operational resilience: Workers can restart without data loss
- Audit trail: Complete event history for debugging and compliance
2. Architecture Principles
2.1 Event Sourcing
All stream events are persisted as an immutable log:
┌─────────────────────────────────────────────────────────┐
│ PostgreSQL: run_events table │
│ ┌──────┬─────────┬──────┬──────────────┬─────────────┐│
│ │ id │ run_id │ seq │ event_type │ event ││
│ ├──────┼─────────┼──────┼──────────────┼─────────────┤│
│ │ uuid │ run-123 │ 1 │ meta │ {...} ││
│ │ uuid │ run-123 │ 2 │ thinking_start│ {...} ││
│ │ uuid │ run-123 │ 3 │ text │ {...} ││
│ │ uuid │ run-123 │ 4 │ sandbox_start│ {...} ││
│ │ uuid │ run-123 │ 5 │ file_start │ {...} ││
│ └──────┴─────────┴──────┴──────────────┴─────────────┘│
└─────────────────────────────────────────────────────────┘Benefits:
- Exact replay from any sequence number
- Audit trail for debugging
- Supports multiple reconnections
- Enables time-travel debugging
2.2 Dual-Channel Delivery
Events delivered via two channels simultaneously:
┌─────────────────┐
│ Event Emitter │
└────────┬────────┘
│
┌──────────────┴──────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ PostgreSQL │ │ Redis Pub/Sub │
│ (Durable Store)│ │ (Real-time) │
└────────┬────────┘ └────────┬────────┘
│ │
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Replay on │ │ Live events to │
│ reconnection │ │ connected clients│
└─────────────────┘ └─────────────────┘Why Both?
- PostgreSQL: Durable, queryable, supports historical replay
- Redis Pub/Sub: Low-latency, push-based, supports many subscribers
2.3 Cursor-Based Resumption
Frontend tracks last processed event:
// Cursor key format
`sse_cursor:{chatId}:{runId}`
// Stored in sessionStorage for persistence across refreshes
sessionStorage.setItem(
`sse_cursor:${chatId}:${runId}`,
lastEventId // e.g., "seq:12345"
);Resumption Flow:
- Page loads → read cursor from sessionStorage
- Open SSE stream with
?lastEventId=seq:12345 - Backend replays events from PostgreSQL where
seq > 12345 - Subscribe to Redis for live events
- Update cursor on each new event
2.4 Checkpoint-Based Worker Resumption
Agent loop state checkpointed on continuation turns:
interface RunResumeCheckpoint {
turn: number; // Current agent turn
fullRawResponse: string; // Complete LLM response
agentMessages: LlmChatMessage[]; // Conversation history
sandboxTagDetected: boolean; // Whether sandbox was triggered
totalToolCallsInRun: number; // Tool invocation count
outputTokens?: number; // Token usage tracking
updatedAt: number; // Timestamp
}Persisted to: runs.metadata.resumeCheckpoint
3. Core Components Deep Dive
3.1 Event Persistence Layer
File: apps/api/services/runs/runEvents.service.ts
export async function persistRunEvent(
runId: string,
event: StreamEvent,
publisher?: Publisher,
): Promise<RunEventEnvelope> {
// 1. Persist to PostgreSQL
const row = await appendRunEvent({
runId,
eventType: event.type,
event: event as unknown as Record<string, unknown>,
});
// 2. Build envelope with sequence number
const envelope: RunEventEnvelope = {
id: row.id,
runId,
seq: row.seq, // Auto-increment sequence
eventType: row.eventType,
event: row.event as unknown as StreamEvent,
};
// 3. Publish to Redis for live delivery
if (publisher) {
await publisher.publish(
getRunEventChannel(runId),
JSON.stringify(envelope)
);
}
return envelope;
}Key Design Decisions:
| Decision | Rationale |
|---|---|
Sequential seq numbers | Enables cursor-based replay, ordering guarantee |
| Dual write (Postgres + Redis) | Durability + real-time delivery |
| Envelope pattern | Decouples storage format from event schema |
| Optional publisher | Allows testing without Redis dependency |
3.2 SSE Streaming with Replay
File: apps/api/services/run-event-stream-utils/service.ts
Core Function: streamRunEventsFromPersistence()
export async function streamRunEventsFromPersistence({
req,
res,
runId,
explicitLastEventId,
}: StreamRunEventsOptions): Promise<void> {
let lastSeq = readLastEventId(req, explicitLastEventId);
let terminalEventSeen = false;
let replaying = true;
const bufferedLiveEvents = new Map<number, RunEventEnvelope>();
// 1. Subscribe to Redis for live events
unsubscribeRedisChannel = await subscribeToRedisChannel(
channel,
onMessage // Buffers during replay, processes after
);
// 2. Replay historical events from PostgreSQL
while (!closed) {
const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);
if (replayRows.length === 0) break;
for (const row of replayRows) {
lastSeq = row.seq;
emitPersistedEvent(row.id, row.event as StreamEvent);
}
}
// 3. Flush buffered live events
replaying = false;
await flushBufferedLiveEvents();
// 4. Continue with live events only
// (Redis subscriber handles this)
}Replay Logic:
Client Request: GET /chat/{chatId}/runs/{runId}/stream?lastEventId=seq:12345
Backend:
1. Parse lastEventId → extract seq: 12345
2. Query: SELECT * FROM run_events WHERE run_id = ? AND seq > 12345 ORDER BY seq ASC LIMIT 500
3. Emit each event with SSE id: field
4. Subscribe to Redis: edward:run-events:{runId}
5. Buffer live events during replay
6. Flush buffer after replay complete
7. Stream live events as they arriveBuffering During Replay:
const onMessage = (payload: string) => {
const envelope = JSON.parse(payload) as RunEventEnvelope;
if (replaying) {
// Buffer live events during replay
bufferedLiveEvents.set(envelope.seq, envelope);
return;
}
// Process live events after replay
if (envelope.seq > lastSeq) {
lastSeq = envelope.seq;
emitPersistedEvent(envelope.id, envelope.event);
}
};Why Buffer?
- Live events may arrive during replay
- Need to prevent duplicates
- Need to maintain ordering (seq-based)
3.3 SSE Backpressure Handling
File: apps/api/services/sse-utils/service.ts
Problem: Slow clients can cause memory buildup
Solution: Queue-based backpressure with graceful degradation
interface SSEWriterState {
res: Response;
queue: string[]; // Buffered writes
queueBytes: number; // Total bytes queued
backpressured: boolean; // Currently backpressured
ending: boolean; // Graceful shutdown in progress
}
function enqueueWrite(state: SSEWriterState, data: string): boolean {
if (state.res.writableEnded || !state.res.writable) {
return false;
}
const payloadBytes = Buffer.byteLength(data);
// Fast path: no backlog, write directly
if (!state.backpressured && state.queue.length === 0) {
const ok = state.res.write(data);
if (!ok) {
state.backpressured = true;
}
return true;
}
// Slow path: queue for later
state.queue.push(data);
state.queueBytes += payloadBytes;
return true;
}
// On 'drain' event from Response
res.on("drain", () => {
flushQueue(state); // Drain queued writes
});Backpressure Configuration:
configureSSEBackpressure(res, {
maxQueueBytes?: number; // Max bytes before dropping
maxQueueEvents?: number; // Max events before dropping
onSlowClient?: () => void; // Callback when client is slow
});Graceful Degradation:
- Queue builds up → monitor size
- Exceeds threshold → trigger
onSlowClient - Close stream gracefully → client can reconnect
3.4 Worker Event Capture
File: apps/api/services/runs/agent-run-worker/processor.helpers.ts
Problem: Worker needs to capture SSE stream and persist events
Solution: Create mock Response object that intercepts writes
export function createRunEventCaptureResponse(
onEvent: (event: StreamEvent) => Promise<void>,
): RunEventCaptureResponse {
const response = new EventEmitter() as RunEventCaptureResponse;
let sseBuffer = "";
const utf8Decoder = new StringDecoder("utf8");
let pending = Promise.resolve();
let persistFailure: Error | null = null;
response.write = (chunk: string | Buffer): boolean => {
// 1. Decode chunk to text
const text = typeof chunk === "string" ? chunk : utf8Decoder.write(chunk);
sseBuffer += text;
// 2. Parse SSE frames
const normalized = sseBuffer.replaceAll("\r\n", "\n");
const frames = normalized.split("\n\n");
sseBuffer = frames.pop() ?? "";
// 3. Extract event payloads
for (const frame of frames) {
const payload = frame
.split("\n")
.filter((line) => line.startsWith("data:"))
.map((line) => line.slice(5).trimStart())
.join("\n");
if (!payload || payload === "[DONE]") continue;
// 4. Persist each event
pending = pending.then(async () => {
try {
const parsed = JSON.parse(payload) as StreamEvent;
await onEvent(parsed);
} catch (error) {
persistFailure = error;
logger.error({ error, payload }, "Failed to persist captured run event");
}
});
}
return true;
};
response.flushPending = async () => {
await pending;
if (persistFailure) throw persistFailure;
};
return response;
}Why This Pattern?
- Decouples stream session from persistence mechanism
- Enables testing without actual HTTP response
- Serializes event persistence (prevents race conditions)
- Tracks failures for finalization
4. Stream Continuation Flow
4.1 Normal Flow (No Interruption)
┌─────────────────────────────────────────────────────────────────┐
│ PHASE 1: ADMISSION │
│ ┌─────────────┐ │
│ │ User Message│ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Create Run │ → run.status = "QUEUED" │
│ └──────┬──────┘ run.metadata.resumeCheckpoint = null │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Enqueue Job │ → Redis queue: agent-runs │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Stream SSE │ → Open connection to browser │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ PHASE 2: WORKER EXECUTION │
│ ┌─────────────┐ │
│ │ Pick Up Job │ → Load run from DB │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Check State │ → Not terminal, no checkpoint │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Stream Sess │ → runStreamSession() │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Agent Loop │ → Turn 1, 2, 3... │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Finalize │ → run.status = "COMPLETED" │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘4.2 Client Disconnect + Reconnect
┌─────────────────────────────────────────────────────────────────┐
│ DISCONNECT │
│ ┌─────────────┐ │
│ │ User Refresh│ → Browser closes SSE connection │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ req.close │ → Backend cleans up SSE stream │
│ └──────┬──────┘ │
│ │ │
│ │ (Worker continues execution in background) │
│ │ Events persisted to PostgreSQL + Redis │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ RECONNECT │
│ ┌─────────────┐ │
│ │ Page Load │ → Read cursor from sessionStorage │
│ └──────┬──────┘ cursor = "seq:12345" │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ GET /stream │ → lastEventId=seq:12345 │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Replay │ → SELECT * WHERE seq > 12345 │
│ └──────┬──────┘ Emit events 12346, 12347, 12348... │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Subscribe │ → Redis: edward:run-events:{runId} │
│ └──────┬──────┘ Receive live events │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Update │ → Persist new cursor on each event │
│ │ Cursor │ sessionStorage.setItem(...) │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘4.3 Worker Crash + Restart
┌─────────────────────────────────────────────────────────────────┐
│ CRASH │
│ ┌─────────────┐ │
│ │ Worker Dies │ → Process killed, OOM, etc. │
│ └──────┬──────┘ │
│ │ │
│ │ (Run remains in "RUNNING" state) │
│ │ Events persisted up to crash point │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ RESTART │
│ ┌─────────────┐ │
│ │ Job Retry │ → Queue retries agent-run job │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Load Run │ → run.metadata.resumeCheckpoint │
│ └──────┬──────┘ checkpoint.turn = 3 │
│ │ checkpoint.agentMessages = [...] │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Initialize │ → agentMessages = checkpoint.agentMessages │
│ │ from Checkpt│ → agentTurn = checkpoint.turn │
│ └──────┬──────┘ → fullRawResponse = checkpoint.fullRawResponse │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Resume Loop │ → Continue from turn 4 │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Finalize │ → Clear checkpoint on success │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘5. Checkpoint Mechanism
5.1 Checkpoint Structure
File: apps/api/services/runs/runMetadata.ts
export interface RunResumeCheckpoint {
turn: number; // Current agent turn (1-indexed)
fullRawResponse: string; // Complete raw LLM response accumulated
agentMessages: LlmChatMessage[]; // LLM conversation for continuation
sandboxTagDetected: boolean; // Whether <edward_sandbox> was seen
totalToolCallsInRun: number; // Running count of tool invocations
outputTokens?: number; // Token usage (if trackable)
updatedAt: number; // Timestamp (ms since epoch)
}Stored in: runs.metadata.resumeCheckpoint (JSONB column)
5.2 When Checkpoints Are Created
File: apps/api/services/chat/session/loop/agentLoop.turnOutcome.ts
Checkpoints created on continuation turns (when agent loop continues):
async function checkpointContinuationMessages(params): Promise<LlmChatMessage[]> {
const agentMessages: LlmChatMessage[] = [{
role: MessageRole.User,
content: params.prompt, // Continuation prompt
}];
await params.onCheckpoint?.({
turn: params.turn,
fullRawResponse: params.fullRawResponse,
agentMessages, // Only the continuation prompt
sandboxTagDetected: params.sandboxTagDetected,
totalToolCallsInRun: params.totalToolCallsInRun,
outputTokens: params.outputTokens,
updatedAt: Date.now(),
});
return agentMessages;
}Continuation Scenarios:
| Scenario | When | Why Checkpoint |
|---|---|---|
| Tool Results Continuation | Tools called but no code output | Need to send tool results back to LLM |
| No-Progress Nudge | No tools called, no output | Need to send nudge prompt |
NOT Checkpointed:
- Initial turn (turn 0)
- Final turn (when loop exits)
- Turns that produce code output
5.3 Checkpoint Persistence Flow
File: apps/api/services/runs/agent-run-worker/processor.session.ts
onCheckpoint: async (checkpoint: WorkerCheckpoint) => {
// 1. Get current metadata
const currentMetadata = getMetadata();
// 2. Merge checkpoint into metadata
const mergedMetadata: AgentRunMetadata = {
...currentMetadata,
resumeCheckpoint: {
turn: checkpoint.turn,
fullRawResponse: checkpoint.fullRawResponse,
agentMessages: checkpoint.agentMessages,
sandboxTagDetected: checkpoint.sandboxTagDetected,
totalToolCallsInRun: checkpoint.totalToolCallsInRun,
outputTokens: checkpoint.outputTokens,
updatedAt: checkpoint.updatedAt,
} satisfies RunResumeCheckpoint,
};
// 3. Persist to database
const metadataPatch: Record<string, unknown> = JSON.parse(
JSON.stringify(mergedMetadata)
);
await updateRun(runId, {
currentTurn: checkpoint.turn,
metadata: metadataPatch,
});
// 4. Update in-memory metadata
onMetadataUpdated(mergedMetadata);
onTurnUpdated(checkpoint.turn);
};5.4 Checkpoint Usage on Worker Restart
File: apps/api/services/chat/session/loop/agentLoop.runner.ts
export async function runAgentLoop(
params: RunAgentLoopParams,
): Promise<RunAgentLoopResult> {
// Initialize from checkpoint if present
let fullRawResponse = resumeCheckpoint?.fullRawResponse ?? "";
let agentMessages: LlmChatMessage[] =
resumeCheckpoint?.agentMessages ?? initialMessages;
let agentTurn = resumeCheckpoint?.turn ?? 0;
let totalToolCallsInRun = resumeCheckpoint?.totalToolCallsInRun ?? 0;
let sandboxTagDetected = resumeCheckpoint?.sandboxTagDetected ?? false;
let outputTokens = resumeCheckpoint?.outputTokens;
// Continue loop from checkpoint.turn
while (agentTurn < MAX_AGENT_TURNS) {
agentTurn += 1;
// ... execute turn
}
}5.5 Checkpoint Cleanup
File: apps/api/services/runs/agent-run-worker/processor.finalize.ts
On successful completion:
await updateRun(params.runId, {
status: mapped.status,
state: mapped.state,
currentTurn: params.currentTurn,
metadata: {
...params.metadata,
resumeCheckpoint: null, // Clear checkpoint
firstTokenLatencyMs: params.firstTokenLatencyMs,
runDurationMs: durationMs,
},
});Why Clear?
- Run is complete, no need to resume
- Reduces metadata size
- Prevents accidental replay of completed runs
6. Event Sourcing & Replay
6.1 Event Schema
File: packages/shared/src/streamEvents.ts
export interface RunEventEnvelope {
id: string; // UUID
runId: string; // Run identifier
seq: number; // Sequential number (auto-increment)
eventType: string; // Event type (text, meta, file_start, etc.)
event: StreamEvent; // Actual event payload
}Event Types:
| Category | Events |
|---|---|
| Lifecycle | meta (SESSION_START, TURN_START, TURN_COMPLETE, SESSION_COMPLETE) |
| Content | text, thinking_*, file_* |
| Sandbox | sandbox_*, install_*, command |
| Tools | web_search, url_scrape |
| Errors | error (fatal, recoverable) |
| Metrics | metrics, rate_limit_status, preview_url, build_status |
6.2 Event Persistence
Database Schema:
CREATE TABLE run_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID NOT NULL REFERENCES runs(id),
seq BIGINT NOT NULL, -- Auto-increment per run
event_type TEXT NOT NULL,
event JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(run_id, seq)
);
CREATE INDEX idx_run_events_run_seq ON run_events(run_id, seq);Append Function:
export async function appendRunEvent(params: {
runId: string;
eventType: string;
event: Record<string, unknown>;
}): Promise<{ id: string; seq: number }> {
const [row] = await db
.insert(runEvents)
.values({
runId: params.runId,
eventType: params.eventType,
event: params.event,
})
.returning({ id: runEvents.id, seq: runEvents.seq });
return row;
}6.3 Event Replay Query
File: apps/api/services/run-event-stream-utils/service.ts
const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);
// Implementation:
export async function getRunEventsAfter(
runId: string,
lastSeq: number,
limit: number,
): Promise<RunEventRow[]> {
const rows = await db
.select({
id: runEvents.id,
seq: runEvents.seq,
eventType: runEvents.eventType,
event: runEvents.event,
})
.from(runEvents)
.where(
and(
eq(runEvents.runId, runId),
gt(runEvents.seq, lastSeq)
)
)
.orderBy(runEvents.seq)
.limit(limit);
return rows;
}Query Pattern:
SELECT id, seq, event_type, event
FROM run_events
WHERE run_id = $1 AND seq > $2
ORDER BY seq ASC
LIMIT $3;6.4 Replay Batching
Why Batch?
- Large runs may have thousands of events
- Don't overwhelm client with single massive response
- Allow progressive rendering
Batching Logic:
const MAX_REPLAY_BATCH = 500;
while (!closed) {
const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);
if (replayRows.length === 0) break;
for (const row of replayRows) {
if (closed) break;
if (row.seq <= lastSeq) continue;
lastSeq = row.seq;
const ok = emitPersistedEvent(row.id, row.event as StreamEvent);
if (!ok) {
await closeStream({ sendDone: false });
return;
}
}
}Multiple Batches:
- First batch: 500 events
- If more events exist, loop continues
- Next iteration: next 500 events
- Continues until all events replayed
7. Frontend Reconnection Strategy
7.1 Cursor Persistence
File: apps/web/stores/chatStream/cursorPersistence.ts
const SSE_CURSOR_STORAGE_PREFIX = "sse_cursor:";
export function createStreamCursorPersistence(): StreamCursorPersistence {
const streamCursor = new Map<string, string>(); // In-memory cache
const persistCursor = (chatId: string, runId: string, lastEventId: string): void => {
const key = `${chatId}:${runId}`;
streamCursor.set(key, lastEventId);
try {
sessionStorage.setItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`, lastEventId);
} catch {
// sessionStorage may be unavailable in private-browsing
}
};
const readCursor = (chatId: string, runId: string): string | undefined => {
const key = `${chatId}:${runId}`;
const inMemory = streamCursor.get(key);
if (inMemory) return inMemory;
try {
return sessionStorage.getItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`) ?? undefined;
} catch {
return undefined;
}
};
const clearCursor = (chatId: string, runId: string): void => {
const key = `${chatId}:${runId}`;
streamCursor.delete(key);
try {
sessionStorage.removeItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`);
} catch {
// no-op
}
};
return { persistCursor, readCursor, clearCursor };
}Storage Strategy:
| Layer | Purpose | Lifetime |
|---|---|---|
| In-memory Map | Fast access during session | Tab lifetime |
| sessionStorage | Persistence across refresh | Tab lifetime (survives refresh) |
Why Not localStorage?
- Cursors are session-specific
- Don't persist across browser restarts
- Avoid stale cursors for old runs
7.2 Stream Resumption
File: apps/web/stores/chatStream/resumeRunStream.ts
export function createResumeRunStream(params): (chatId: string, runId: string) => void {
return (chatId: string, runId: string) => {
const resumeCursor = readCursor(chatId, runId);
const response = await openRunEventsStream(chatId, runId, {
signal: controller.signal,
...(resumeCursor ? { lastEventId: resumeCursor } : {}),
});
const streamResult = await processStreamResponse({
response,
chatId,
dispatch,
onCursorUpdate: (id: string, rId: string) =>
persistCursor(resolvedChatId, rId, id),
// ...
});
// Clear cursor on successful completion
if (streamResult) {
clearCursor(resolvedChatId, runId);
}
};
}7.3 API Client
File: apps/web/lib/api/chat.ts
export async function openRunEventsStream(
chatId: string,
runId: string,
options?: { lastEventId?: string; signal?: AbortSignal },
): Promise<Response> {
const params = new URLSearchParams();
if (options?.lastEventId) {
params.set("lastEventId", options.lastEventId);
}
const queryString = params.toString();
return fetchApiResponse(
`/chat/${chatId}/runs/${runId}/stream${queryString ? `?${queryString}` : ""}`,
{ method: "GET", signal: options?.signal }
);
}Request Format:
GET /chat/{chatId}/runs/{runId}/stream?lastEventId=seq:12345
Headers:
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive7.4 Page-Level Orchestration
File: apps/web/hooks/chat/useChatPageOrchestration.ts
Active Run Lookup:
const AGGRESSIVE_ACTIVE_RUN_LOOKUP_ATTEMPTS = 6;
const ACTIVE_RUN_LOOKUP_BASE_RETRY_MS = 350;
const ACTIVE_RUN_LOOKUP_MAX_RETRY_MS = 2000;
function getRetryDelayMs(attempt: number): number {
return Math.min(
ACTIVE_RUN_LOOKUP_MAX_RETRY_MS,
ACTIVE_RUN_LOOKUP_BASE_RETRY_MS * 2 ** attempt
);
}
// Exponential backoff lookup
while (notFoundAttempts < maxNotFoundAttempts) {
const response = await fetchActiveRun({
signal: abortController.signal,
staleTimeMs: 0,
});
const activeRun = response?.data.run;
if (activeRun) {
resumeRunStream(chatId, activeRun.id);
return;
}
notFoundAttempts += 1;
await waitForRetry(notFoundAttempts - 1, abortController.signal);
}Lookup Strategy:
| Mode | Attempts | Use Case |
|---|---|---|
| Aggressive | 6 | Page load, user expects active run |
| Single | 1 | Background check |
| Defer | 0 | Streaming already in progress |
7.5 Stream Processor with Replay
File: apps/web/lib/streaming/processors/chatStreamProcessor.ts
Built-in Replay:
const MAX_REPLAY_ATTEMPTS = 3;
const REPLAY_BACKOFF_BASE_MS = 500;
if (
!fatalError &&
!sessionCompleted &&
replayAttempt < MAX_REPLAY_ATTEMPTS &&
metaEvent?.runId
) {
try {
await new Promise((resolve) =>
setTimeout(
resolve,
Math.min(REPLAY_BACKOFF_BASE_MS * 2 ** replayAttempt, 5_000)
)
);
const replayResponse = await openRunEventsStream(
activeChatId,
metaEvent.runId,
lastEventId ? { lastEventId } : undefined
);
const replayResult = await processStreamResponse({
response: replayResponse,
replayAttempt: replayAttempt + 1,
replayCursor: lastEventId,
// ...
});
if (replayResult) {
return mergeStreamResults(result, replayResult);
}
} catch {
// Replay failed, report error
}
}Why Client-Side Replay?
- Backend stream may end prematurely
- Client detects incomplete session
- Automatic retry with backoff
- Merges results seamlessly
8. Worker Resumption
8.1 Worker Lifecycle
File: apps/api/services/runs/agent-run-worker/processor.ts
Startup:
export async function processAgentRunJob(
runId: string,
publisher: Publisher,
): Promise<void> {
// 1. Load run from DB
const run = await getRunById(runId);
if (!run) throw new Error(`Run not found: ${runId}`);
// 2. Check if already terminal
if (isTerminalRunStatus(run.status)) {
return; // Already finished
}
// 3. Check for existing terminal event
const terminalEvent = await getLatestSessionCompleteEvent(runId);
if (terminalEvent) {
// Run completed but status not updated
await updateRunWithLog(runId, {
status: mapped.status,
state: mapped.state,
}, "terminal-event-already-present");
return;
}
// 4. Subscribe to cancel signal
await cancelSub.subscribe(runCancelChannel);
cancelSub.on("message", () => {
workerAbort.abort(); // Stop everything
});
// 5. Parse metadata (includes checkpoint)
const metadata = parseAgentRunMetadata(run.metadata);
// metadata.resumeCheckpoint may be present
// 6. Initialize progress from checkpoint
const progress: RunEventProgress = {
currentTurn: metadata.resumeCheckpoint?.turn ?? run.currentTurn ?? 0,
// ...
};
}8.2 Checkpoint Detection
const metadata = parseAgentRunMetadata(run.metadata);
// Check for checkpoint
if (metadata.resumeCheckpoint) {
logger.info(
{ runId, turn: metadata.resumeCheckpoint.turn },
"Resuming run from checkpoint"
);
} else {
logger.info({ runId }, "Starting fresh run");
}8.3 Resumption Flow
await runStreamSession(
buildWorkerRunSessionInput({
req: fakeReq,
res: capturedRes,
externalSignal: workerAbort.signal,
workflow: metadata.workflow,
run,
decryptedApiKey,
historyMessages,
projectContext,
runId,
resumeCheckpoint: metadata.resumeCheckpoint, // Pass checkpoint
onCheckpoint: async (checkpoint) => {
// Persist checkpoint on each continuation
await updateRun(runId, {
currentTurn: checkpoint.turn,
metadata: metadataPatch,
});
},
})
);8.4 Finalization
File: apps/api/services/runs/agent-run-worker/processor.finalize.ts
On Success:
export async function finalizeSuccessfulRun(params): Promise<void> {
await updateRun(params.runId, {
status: mapped.status,
state: mapped.state,
currentTurn: params.currentTurn,
terminationReason: params.latestTerminationReason,
completedAt: finishedAt,
metadata: {
...params.metadata,
resumeCheckpoint: null, // Clear checkpoint
firstTokenLatencyMs: params.firstTokenLatencyMs,
runDurationMs: durationMs,
},
});
}On Failure:
export async function finalizeFailedRun(params): Promise<void> {
// 1. Flush pending events
await params.flushCapturedEvents();
// 2. Persist error event
await persistRunEventWithLog(params.runId, errorEvent, params.publisher, "...");
// 3. Persist completion meta event
await persistRunEventWithLog(params.runId, completionMetaEvent, params.publisher, "...");
// 4. Update run status (checkpoint NOT cleared - allows retry)
await updateRunWithLog(params.runId, {
status: RUN_STATUS.FAILED,
state: "FAILED",
currentTurn: params.currentTurn,
terminationReason: StreamTerminationReason.STREAM_FAILED,
errorMessage: latestErrorMessage,
completedAt: new Date(),
});
}Why Keep Checkpoint on Failure?
- Allows manual retry from checkpoint
- Preserves state for debugging
- Can be cleared by explicit retry logic
9. Failure Modes & Recovery
9.1 Failure Mode Matrix
| Failure Mode | Detection | Recovery | Data Loss |
|---|---|---|---|
| Client disconnect | req.close event | Reconnect with cursor | None |
| Slow client | Backpressure queue | Graceful close | None (events persisted) |
| Worker crash | Job timeout | Queue retry + checkpoint | Minimal (since last checkpoint) |
| Redis unavailable | Connection error | Fallback to PostgreSQL polling | None |
| PostgreSQL unavailable | Query error | Retry with backoff | Temporary (events buffered) |
| Stream timeout | Guard timer | Terminate with reason | None (events persisted) |
| Network blip | SSE connection drop | Auto-reconnect | None |
9.2 Client Disconnect Handling
File: apps/api/services/run-event-stream-utils/service.ts
req.on("close", () => {
void closeStream({ sendDone: false });
});
const closeStream = async (options?: { sendDone?: boolean }): Promise<void> => {
if (closed) return;
closed = true;
// Clean up heartbeat
if (heartbeat) {
clearInterval(heartbeat);
heartbeat = null;
}
// Unsubscribe from Redis
if (unsubscribeRedisChannel) {
await unsubscribeRedisChannel();
}
// Close response
if (!res.writableEnded) {
res.end();
}
};What Happens:
- Client closes connection (refresh, navigate away)
- Backend receives
req.closeevent - Unsubscribe from Redis pub/sub
- Stop heartbeats
- Close response gracefully
- Worker continues execution (independent of client connection)
9.3 Slow Client Handling
File: apps/api/services/run-event-stream-utils/service.ts
configureSSEBackpressure(res, {
onSlowClient: () => {
void closeStream({ sendDone: false });
},
});Detection:
- Write queue exceeds threshold
res.write()returnsfalse(backpressure)- Queue not draining fast enough
Recovery:
- Close stream gracefully
- Client can reconnect with cursor
- Events already persisted → replay on reconnect
9.4 Worker Crash Recovery
File: apps/api/services/runs/agent-run-worker/processor.ts
Crash Detection:
- Worker process dies (OOM, panic, etc.)
- Job queue detects timeout
- Run remains in "RUNNING" state
Recovery:
- Queue retries job (configurable retries)
- New worker picks up job
- Load run from DB
- Check for checkpoint in metadata
- Resume from checkpoint.turn
What's Lost:
- Work since last checkpoint
- Typically: partial turn execution
- Events persisted before crash are safe
9.5 Stream Timeout
File: apps/api/services/chat/session/orchestrator/streamGuards.ts
const STREAM_GUARD_TIMEOUT_MS = 20 * 60 * 1000; // 20 minutes
export function setupStreamGuards(params): StreamGuardHandles {
// Timeout guard
const timeout = setTimeout(() => {
params.abortController.abort();
params.setTerminationReason(StreamTerminationReason.STREAM_TIMEOUT);
}, STREAM_GUARD_TIMEOUT_MS);
return { clear: () => clearTimeout(timeout) };
}On Timeout:
- Abort controller triggered
- Agent loop stops
- Termination reason set to
STREAM_TIMEOUT - Session finalized with timeout reason
- Client receives error event
9.6 Redis Unavailable
File: apps/api/lib/redisPubSub.ts
export async function subscribeToRedisChannel(
channel: string,
handler: (payload: string) => void,
): Promise<() => Promise<void>> {
// ... subscription logic
subscriber.on("error", (error: unknown) => {
logger.error({ error }, "Redis pub/sub subscriber error");
});
}Fallback Strategy:
- Events still persisted to PostgreSQL
- Client reconnects → replay from PostgreSQL
- Live events missed during Redis outage
- Mitigation: Poll PostgreSQL for new events
10. Infrastructure Cross-Questions
10.1 PostgreSQL
Q: What's the event volume per run?
Typical run:
- Meta events: 5-10 (session start, turn start/complete x5, session complete)
- Text events: 50-200 (narrative, explanations)
- File events: 10-50 (file_start, file_content xN, file_end)
- Command events: 5-20 (command execution results)
- Total: 70-300 events per run
Q: What's the storage requirement?
Per run estimate:
- Event envelope: ~200 bytes overhead
- Event payload: ~500 bytes average (varies widely)
- Per event: ~700 bytes
- Per run (200 events): ~140 KB
At 10,000 runs/day:
- Daily: 1.4 GB
- Monthly: 42 GB
- Yearly: 500 GB
Q: Should we partition the run_events table?
Recommendation: Yes, partition by created_at (monthly partitions)
CREATE TABLE run_events (
id UUID DEFAULT gen_random_uuid(),
run_id UUID NOT NULL,
seq BIGINT NOT NULL,
event_type TEXT NOT NULL,
event JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (run_id, seq, created_at)
) PARTITION BY RANGE (created_at);
CREATE TABLE run_events_2025_01 PARTITION OF run_events
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');Benefits:
- Faster cleanup (drop old partitions)
- Improved query performance (partition pruning)
- Easier backup/restore
Q: What indexes are needed?
-- Primary lookup (replay)
CREATE INDEX idx_run_events_run_seq ON run_events(run_id, seq);
-- Cleanup (find old events)
CREATE INDEX idx_run_events_created_at ON run_events(created_at);
-- Covering index for replay (avoid table lookup)
CREATE INDEX idx_run_events_run_seq_covering
ON run_events(run_id, seq)
INCLUDE (id, event_type, event);10.2 Redis
Q: What's the Redis memory footprint?
Pub/sub channels:
- One channel per active run:
edward:run-events:{runId} - Channel name: ~50 bytes
- Subscriber overhead: ~100 bytes per subscriber
- Per active run: ~150 bytes
At 1,000 concurrent runs:
- Channel memory: ~150 KB (negligible)
Q: What about event buffering?
Events are NOT buffered in Redis (only published):
- Publisher sends event → all subscribers receive
- No persistence in Redis
- PostgreSQL is the source of truth
Q: Redis cluster vs standalone?
Recommendation: Standalone for pub/sub
Why:
- Pub/sub doesn't benefit from clustering (messages not persisted)
- Simpler deployment
- Lower latency
Failover:
- Sentinel for automatic failover
- Clients reconnect on failover
- Events persisted to PostgreSQL during outage
Q: What's the pub/sub latency?
Typical latency:
- Publish to subscriber: <1ms (same region)
- 99th percentile: <5ms
Impact:
- Live events arrive within milliseconds
- Replay from PostgreSQL is the bottleneck (not Redis)
10.3 SSE Infrastructure
Q: How many concurrent SSE connections?
Estimate:
- Active runs: 1,000
- Connections per run: 1-3 (user may have multiple tabs)
- Concurrent connections: 1,000-3,000
Q: Can a single server handle this?
Node.js SSE capacity:
- Memory per connection: ~10-50 KB
- CPU per connection: minimal (event-driven)
- Single server: 10,000+ connections feasible
Q: What about horizontal scaling?
Challenge: SSE connections are sticky (can't load balance mid-stream)
Solutions:
-
Sticky sessions (recommended)
- Load balancer routes by cookie/session
- Same server handles entire stream
- Simple, effective
-
Connection migration
- On server shutdown, notify clients
- Clients reconnect to new server
- Complex, rarely needed
-
WebSocket + migration
- Use WebSocket instead of SSE
- Supports connection migration
- More complex protocol
Q: What's the heartbeat strategy?
const HEARTBEAT_INTERVAL_MS = 15_000; // 15 seconds
heartbeat = setInterval(() => {
sendSSEComment(res, "run-events-heartbeat");
}, HEARTBEAT_INTERVAL_MS);Why Heartbeats?
- Keep connection alive (prevent timeout)
- Detect dead connections
- Load balancers may close idle connections
Q: How to handle connection limits?
Browser limits:
- Chrome: 6 connections per domain
- Firefox: 6 connections per domain
- Safari: 6 connections per domain
Mitigation:
- Single SSE connection per chat
- Reuse connection for multiple runs
- Use domain sharding if needed (not recommended)
10.4 Worker Scaling
Q: How many workers?
Formula:
workers = ceil(concurrent_runs / runs_per_worker)Typical:
- Concurrent runs: 1,000
- Runs per worker: 10-50 (depends on LLM latency)
- Workers needed: 20-100
Q: Worker autoscaling?
Metrics to track:
- Queue depth (Redis)
- Average run duration
- Worker CPU/memory
Scaling policy:
- Scale up: Queue depth > threshold for 2 minutes
- Scale down: Queue depth = 0 for 10 minutes
Q: What about worker affinity?
No affinity needed:
- Workers are stateless
- State in PostgreSQL + checkpoint
- Any worker can process any job
10.5 Data Retention
Q: How long to keep events?
Recommendation: 30 days
Why:
- Supports debugging recent issues
- Allows replay for active users
- Compliance (audit trail)
Cleanup strategy:
-- Daily cleanup job
DELETE FROM run_events
WHERE created_at < NOW() - INTERVAL '30 days';Q: What about checkpoint retention?
Recommendation: Clear on completion, keep on failure
Why:
- Completed runs don't need checkpoint
- Failed runs may need manual retry
- Checkpoints are small (few KB)
10.6 Monitoring
Q: What metrics to track?
Stream Health:
stream_reconnect_count: Reconnections per runstream_replay_events: Events replayed per reconnectstream_duration_ms: Total stream durationstream_termination_reason: Distribution of termination reasons
Checkpoint Health:
checkpoint_created_count: Checkpoints createdcheckpoint_resume_count: Runs resumed from checkpointcheckpoint_turn_distribution: Turns per checkpoint
Event Volume:
events_persisted_count: Events per runevents_replay_lag_ms: Time to replay eventsredis_pubsub_latency_ms: Pub/sub latency
Q: What alerts to set?
| Metric | Threshold | Action |
|---|---|---|
stream_reconnect_count > 5 | Per run | Investigate network issues |
checkpoint_resume_count > 10% | Of runs | Investigate worker stability |
events_replay_lag_ms > 5000 | P99 | Optimize replay queries |
redis_pubsub_latency_ms > 100 | P99 | Check Redis health |
11. Key Files Reference
Core Orchestration
| File | Purpose | Lines |
|---|---|---|
apps/api/services/runs/runEvents.service.ts | Event persistence | ~40 |
apps/api/services/run-event-stream-utils/service.ts | SSE streaming + replay | ~250 |
apps/api/services/sse-utils/service.ts | SSE backpressure handling | ~200 |
apps/api/services/runs/agent-run-worker/processor.ts | Worker execution | ~350 |
apps/api/services/runs/agent-run-worker/processor.helpers.ts | Event capture | ~150 |
apps/api/services/runs/agent-run-worker/processor.finalize.ts | Run finalization | ~150 |
Checkpoint & Continuation
| File | Purpose | Lines |
|---|---|---|
apps/api/services/runs/runMetadata.ts | Checkpoint schema | ~100 |
apps/api/services/runs/agent-run-worker/processor.session.ts | Checkpoint persistence | ~100 |
apps/api/services/chat/session/loop/agentLoop.runner.ts | Agent loop with checkpoint | ~200 |
apps/api/services/chat/session/loop/agentLoop.turnOutcome.ts | Continuation logic | ~250 |
apps/api/services/chat/session/shared/checkpoint.types.ts | Checkpoint types | ~15 |
apps/api/services/chat/session/shared/continuation.ts | Continuation prompts | ~250 |
Frontend
| File | Purpose | Lines |
|---|---|---|
apps/web/stores/chatStream/cursorPersistence.ts | Cursor storage | ~50 |
apps/web/stores/chatStream/resumeRunStream.ts | Stream resumption | ~150 |
apps/web/lib/api/chat.ts | SSE API client | ~100 |
apps/web/lib/streaming/processors/chatStreamProcessor.ts | Stream processing | ~400 |
apps/web/hooks/chat/useChatPageOrchestration.ts | Page orchestration | ~250 |
Shared Types
| File | Purpose | Lines |
|---|---|---|
packages/shared/src/streamEvents.ts | Event type definitions | ~250 |
Infrastructure
| File | Purpose | Lines |
|---|---|---|
apps/api/lib/redisPubSub.ts | Redis pub/sub | ~80 |
apps/api/lib/redis.ts | Redis client creation | ~20 |
Tests
| File | Purpose |
|---|---|
apps/api/tests/services/runs/agentRun.processor.session.test.ts | Checkpoint tests |
apps/api/tests/services/runs/runMetadata.test.ts | Metadata parsing tests |
apps/api/tests/controllers/chat/streamSession.shared.test.ts | Continuation prompt tests |
apps/api/tests/services/chat/runEventStream.utils.service.test.ts | Stream replay tests |
Appendix A: Glossary
| Term | Definition |
|---|---|
| Checkpoint | Snapshot of agent loop state for resumption |
| Cursor | Last processed event ID (client-side) |
| Event Sourcing | Pattern of persisting all state changes as events |
| Replay | Re-emitting historical events on reconnection |
| SSE | Server-Sent Events (one-way streaming from server to client) |
| Turn | Single iteration of agent loop (LLM call + tool execution) |
| Worker | Background process that executes agent runs |
Appendix B: Sequence Diagrams
B.1 Normal Stream Flow
Browser API Worker Postgres Redis
| | | | |
|--POST /chat->| | | |
| |--Create Run---->| | |
| | | |--Insert------>|
| |--Enqueue Job-->| | |
| | | | |
|<--SSE Open---| | | |
| | | | |
| | Pick Up | |
| | | | |
| | |--Load Run------>| |
| | | | |
| | Stream Session | |
| | | | |
| | |--Persist Event->| |
| | | |--Publish----->|
|<--Event------| | | |
| | | | |
| | (repeat events) | |
| | | | |
| | |--Finalize------>| |
|<--[DONE]-----| | | |
| | | | |B.2 Reconnection Flow
Browser API Worker Postgres Redis
| | | | |
|--Refresh---->| | | |
| | | | |
| Read Cursor | | | |
| (sessionStorage) | | |
| | | | |
|--GET /stream?lastEventId=12345 | |
| | | | |
| |--Replay Events->| | |
| |<----------------| | |
|<--Replay Events (seq > 12345) | |
| | | | |
| |--Subscribe--------------------->| |
| | | | |
| | | |--Live Event-->|
|<--Live Event-| | | |
| | | | |
| Save Cursor | | | |
| (sessionStorage) | | |
| | | | |Appendix C: Configuration Reference
Environment Variables
| Variable | Default | Description |
|---|---|---|
STREAM_GUARD_TIMEOUT_MS | 1200000 | Stream timeout (20 min) |
MAX_AGENT_CONTINUATION_PROMPT_CHARS | 20000 | Continuation prompt limit |
MAX_AGENT_TURNS | 10 | Max agent loop iterations |
MAX_AGENT_TOOL_CALLS_PER_TURN | 10 | Tool budget per turn |
MAX_AGENT_TOOL_CALLS_PER_RUN | 50 | Tool budget per run |
MAX_REPLAY_BATCH | 500 | Events per replay batch |
HEARTBEAT_INTERVAL_MS | 15000 | SSE heartbeat interval |
Redis Channels
| Channel | Purpose |
|---|---|
edward:run-events:{runId} | Live event pub/sub |
edward:run-cancel:{runId} | Cancel signal |
agent-runs | Worker job queue |
Storage Keys
| Key | Purpose |
|---|---|
sse_cursor:{chatId}:{runId} | Frontend cursor |
Document Version: 1.0
Last Updated: March 25, 2026
Author: Principal Engineering Review