InkdownInkdown
Start writing

Stream Continuation

Last updated on March 25, 2026

Edward Stream Continuation Architecture - Deep Dive

Principal Engineer's Technical Reference


Easy to digest HLD

alt


Table of Contents

  1. Executive Summary
  2. Architecture Principles
  3. Core Components Deep Dive
  4. Stream Continuation Flow
  5. Checkpoint Mechanism
  6. Event Sourcing & Replay
  7. Frontend Reconnection Strategy
  8. Worker Resumption
  9. Failure Modes & Recovery
  10. Infrastructure Cross-Questions
  11. Key Files Reference

1. Executive Summary

What is Stream Continuation?

Stream Continuation is Edward's resilience architecture that enables:

  • Seamless reconnection after client disconnects (page refresh, network blip, tab switch)
  • Worker crash recovery without losing in-progress code generation
  • Exact state replay from any point in the stream using cursor-based tracking
  • Multi-turn agent loop checkpointing for long-running operations

Why This Matters

Without stream continuation:

  • Page refresh = lost work, user frustration
  • Worker restart = orphaned runs, inconsistent state
  • Network hiccup = incomplete code generation
  • Long operations = no recovery from mid-execution failures

With stream continuation:

  • Durable execution: Every event persisted, every state checkpointed
  • Resumable UX: Users can refresh/reconnect without losing progress
  • Operational resilience: Workers can restart without data loss
  • Audit trail: Complete event history for debugging and compliance

2. Architecture Principles

2.1 Event Sourcing

All stream events are persisted as an immutable log:

Plain text
┌─────────────────────────────────────────────────────────┐
│  PostgreSQL: run_events table                           │
│  ┌──────┬─────────┬──────┬──────────────┬─────────────┐│
│  │ id   │ run_id  │ seq  │ event_type   │ event       ││
│  ├──────┼─────────┼──────┼──────────────┼─────────────┤│
│  │ uuid │ run-123 │ 1    │ meta         │ {...}       ││
│  │ uuid │ run-123 │ 2    │ thinking_start│ {...}      ││
│  │ uuid │ run-123 │ 3    │ text         │ {...}       ││
│  │ uuid │ run-123 │ 4    │ sandbox_start│ {...}       ││
│  │ uuid │ run-123 │ 5    │ file_start   │ {...}       ││
│  └──────┴─────────┴──────┴──────────────┴─────────────┘│
└─────────────────────────────────────────────────────────┘

Benefits:

  • Exact replay from any sequence number
  • Audit trail for debugging
  • Supports multiple reconnections
  • Enables time-travel debugging

2.2 Dual-Channel Delivery

Events delivered via two channels simultaneously:

Plain text
                    ┌─────────────────┐
                    │  Event Emitter  │
                    └────────┬────────┘
                             │
              ┌──────────────┴──────────────┐
              │                             │
              ▼                             ▼
    ┌─────────────────┐           ┌─────────────────┐
    │   PostgreSQL    │           │   Redis Pub/Sub │
    │  (Durable Store)│           │  (Real-time)    │
    └────────┬────────┘           └────────┬────────┘
             │                             │
             │                             │
             ▼                             ▼
    ┌─────────────────┐           ┌─────────────────┐
    │ Replay on       │           │ Live events to  │
    │ reconnection    │           │ connected clients│
    └─────────────────┘           └─────────────────┘

Why Both?

  • PostgreSQL: Durable, queryable, supports historical replay
  • Redis Pub/Sub: Low-latency, push-based, supports many subscribers

2.3 Cursor-Based Resumption

Frontend tracks last processed event:

TypeScript
// Cursor key format
`sse_cursor:{chatId}:{runId}`

// Stored in sessionStorage for persistence across refreshes
sessionStorage.setItem(
  `sse_cursor:${chatId}:${runId}`,
  lastEventId  // e.g., "seq:12345"
);

Resumption Flow:

  1. Page loads → read cursor from sessionStorage
  2. Open SSE stream with ?lastEventId=seq:12345
  3. Backend replays events from PostgreSQL where seq > 12345
  4. Subscribe to Redis for live events
  5. Update cursor on each new event

2.4 Checkpoint-Based Worker Resumption

Agent loop state checkpointed on continuation turns:

TypeScript
interface RunResumeCheckpoint {
  turn: number;                    // Current agent turn
  fullRawResponse: string;         // Complete LLM response
  agentMessages: LlmChatMessage[]; // Conversation history
  sandboxTagDetected: boolean;     // Whether sandbox was triggered
  totalToolCallsInRun: number;     // Tool invocation count
  outputTokens?: number;           // Token usage tracking
  updatedAt: number;               // Timestamp
}

Persisted to: runs.metadata.resumeCheckpoint


3. Core Components Deep Dive

3.1 Event Persistence Layer

File: apps/api/services/runs/runEvents.service.ts

TypeScript
export async function persistRunEvent(
  runId: string,
  event: StreamEvent,
  publisher?: Publisher,
): Promise<RunEventEnvelope> {
  // 1. Persist to PostgreSQL
  const row = await appendRunEvent({
    runId,
    eventType: event.type,
    event: event as unknown as Record<string, unknown>,
  });

  // 2. Build envelope with sequence number
  const envelope: RunEventEnvelope = {
    id: row.id,
    runId,
    seq: row.seq,  // Auto-increment sequence
    eventType: row.eventType,
    event: row.event as unknown as StreamEvent,
  };

  // 3. Publish to Redis for live delivery
  if (publisher) {
    await publisher.publish(
      getRunEventChannel(runId),
      JSON.stringify(envelope)
    );
  }

  return envelope;
}

Key Design Decisions:

DecisionRationale
Sequential seq numbersEnables cursor-based replay, ordering guarantee
Dual write (Postgres + Redis)Durability + real-time delivery
Envelope patternDecouples storage format from event schema
Optional publisherAllows testing without Redis dependency

3.2 SSE Streaming with Replay

File: apps/api/services/run-event-stream-utils/service.ts

Core Function: streamRunEventsFromPersistence()

TypeScript
export async function streamRunEventsFromPersistence({
  req,
  res,
  runId,
  explicitLastEventId,
}: StreamRunEventsOptions): Promise<void> {
  let lastSeq = readLastEventId(req, explicitLastEventId);
  let terminalEventSeen = false;
  let replaying = true;
  const bufferedLiveEvents = new Map<number, RunEventEnvelope>();

  // 1. Subscribe to Redis for live events
  unsubscribeRedisChannel = await subscribeToRedisChannel(
    channel,
    onMessage  // Buffers during replay, processes after
  );

  // 2. Replay historical events from PostgreSQL
  while (!closed) {
    const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);
    if (replayRows.length === 0) break;

    for (const row of replayRows) {
      lastSeq = row.seq;
      emitPersistedEvent(row.id, row.event as StreamEvent);
    }
  }

  // 3. Flush buffered live events
  replaying = false;
  await flushBufferedLiveEvents();

  // 4. Continue with live events only
  // (Redis subscriber handles this)
}

Replay Logic:

Plain text
Client Request: GET /chat/{chatId}/runs/{runId}/stream?lastEventId=seq:12345

Backend:
  1. Parse lastEventId → extract seq: 12345
  2. Query: SELECT * FROM run_events WHERE run_id = ? AND seq > 12345 ORDER BY seq ASC LIMIT 500
  3. Emit each event with SSE id: field
  4. Subscribe to Redis: edward:run-events:{runId}
  5. Buffer live events during replay
  6. Flush buffer after replay complete
  7. Stream live events as they arrive

Buffering During Replay:

TypeScript
const onMessage = (payload: string) => {
  const envelope = JSON.parse(payload) as RunEventEnvelope;
  
  if (replaying) {
    // Buffer live events during replay
    bufferedLiveEvents.set(envelope.seq, envelope);
    return;
  }

  // Process live events after replay
  if (envelope.seq > lastSeq) {
    lastSeq = envelope.seq;
    emitPersistedEvent(envelope.id, envelope.event);
  }
};

Why Buffer?

  • Live events may arrive during replay
  • Need to prevent duplicates
  • Need to maintain ordering (seq-based)

3.3 SSE Backpressure Handling

File: apps/api/services/sse-utils/service.ts

Problem: Slow clients can cause memory buildup

Solution: Queue-based backpressure with graceful degradation

TypeScript
interface SSEWriterState {
  res: Response;
  queue: string[];        // Buffered writes
  queueBytes: number;     // Total bytes queued
  backpressured: boolean; // Currently backpressured
  ending: boolean;        // Graceful shutdown in progress
}

function enqueueWrite(state: SSEWriterState, data: string): boolean {
  if (state.res.writableEnded || !state.res.writable) {
    return false;
  }

  const payloadBytes = Buffer.byteLength(data);

  // Fast path: no backlog, write directly
  if (!state.backpressured && state.queue.length === 0) {
    const ok = state.res.write(data);
    if (!ok) {
      state.backpressured = true;
    }
    return true;
  }

  // Slow path: queue for later
  state.queue.push(data);
  state.queueBytes += payloadBytes;
  return true;
}

// On 'drain' event from Response
res.on("drain", () => {
  flushQueue(state);  // Drain queued writes
});

Backpressure Configuration:

TypeScript
configureSSEBackpressure(res, {
  maxQueueBytes?: number;      // Max bytes before dropping
  maxQueueEvents?: number;     // Max events before dropping
  onSlowClient?: () => void;   // Callback when client is slow
});

Graceful Degradation:

  • Queue builds up → monitor size
  • Exceeds threshold → trigger onSlowClient
  • Close stream gracefully → client can reconnect

3.4 Worker Event Capture

File: apps/api/services/runs/agent-run-worker/processor.helpers.ts

Problem: Worker needs to capture SSE stream and persist events

Solution: Create mock Response object that intercepts writes

TypeScript
export function createRunEventCaptureResponse(
  onEvent: (event: StreamEvent) => Promise<void>,
): RunEventCaptureResponse {
  const response = new EventEmitter() as RunEventCaptureResponse;
  let sseBuffer = "";
  const utf8Decoder = new StringDecoder("utf8");
  let pending = Promise.resolve();
  let persistFailure: Error | null = null;

  response.write = (chunk: string | Buffer): boolean => {
    // 1. Decode chunk to text
    const text = typeof chunk === "string" ? chunk : utf8Decoder.write(chunk);
    sseBuffer += text;

    // 2. Parse SSE frames
    const normalized = sseBuffer.replaceAll("\r\n", "\n");
    const frames = normalized.split("\n\n");
    sseBuffer = frames.pop() ?? "";

    // 3. Extract event payloads
    for (const frame of frames) {
      const payload = frame
        .split("\n")
        .filter((line) => line.startsWith("data:"))
        .map((line) => line.slice(5).trimStart())
        .join("\n");

      if (!payload || payload === "[DONE]") continue;

      // 4. Persist each event
      pending = pending.then(async () => {
        try {
          const parsed = JSON.parse(payload) as StreamEvent;
          await onEvent(parsed);
        } catch (error) {
          persistFailure = error;
          logger.error({ error, payload }, "Failed to persist captured run event");
        }
      });
    }

    return true;
  };

  response.flushPending = async () => {
    await pending;
    if (persistFailure) throw persistFailure;
  };

  return response;
}

Why This Pattern?

  • Decouples stream session from persistence mechanism
  • Enables testing without actual HTTP response
  • Serializes event persistence (prevents race conditions)
  • Tracks failures for finalization

4. Stream Continuation Flow

4.1 Normal Flow (No Interruption)

Plain text
┌─────────────────────────────────────────────────────────────────┐
│  PHASE 1: ADMISSION                                             │
│  ┌─────────────┐                                                │
│  │ User Message│                                                │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Create Run  │ → run.status = "QUEUED"                        │
│  └──────┬──────┘    run.metadata.resumeCheckpoint = null        │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Enqueue Job │ → Redis queue: agent-runs                      │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Stream SSE  │ → Open connection to browser                   │
│  └─────────────┘                                                │
└─────────────────────────────────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│  PHASE 2: WORKER EXECUTION                                      │
│  ┌─────────────┐                                                │
│  │ Pick Up Job │ → Load run from DB                             │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Check State │ → Not terminal, no checkpoint                  │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Stream Sess │ → runStreamSession()                           │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Agent Loop  │ → Turn 1, 2, 3...                              │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Finalize    │ → run.status = "COMPLETED"                     │
│  └─────────────┘                                                │
└─────────────────────────────────────────────────────────────────┘

4.2 Client Disconnect + Reconnect

Plain text
┌─────────────────────────────────────────────────────────────────┐
│  DISCONNECT                                                     │
│  ┌─────────────┐                                                │
│  │ User Refresh│ → Browser closes SSE connection                │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ req.close   │ → Backend cleans up SSE stream                 │
│  └──────┬──────┘                                                │
│         │                                                       │
│         │ (Worker continues execution in background)             │
│         │ Events persisted to PostgreSQL + Redis                 │
└─────────────────────────────────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│  RECONNECT                                                      │
│  ┌─────────────┐                                                │
│  │ Page Load   │ → Read cursor from sessionStorage              │
│  └──────┬──────┘    cursor = "seq:12345"                        │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ GET /stream │ → lastEventId=seq:12345                        │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Replay      │ → SELECT * WHERE seq > 12345                   │
│  └──────┬──────┘    Emit events 12346, 12347, 12348...          │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Subscribe   │ → Redis: edward:run-events:{runId}             │
│  └──────┬──────┘    Receive live events                         │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Update      │ → Persist new cursor on each event             │
│  │ Cursor      │    sessionStorage.setItem(...)                  │
│  └─────────────┘                                                │
└─────────────────────────────────────────────────────────────────┘

4.3 Worker Crash + Restart

Plain text
┌─────────────────────────────────────────────────────────────────┐
│  CRASH                                                          │
│  ┌─────────────┐                                                │
│  │ Worker Dies │ → Process killed, OOM, etc.                    │
│  └──────┬──────┘                                                │
│         │                                                       │
│         │ (Run remains in "RUNNING" state)                      │
│         │ Events persisted up to crash point                    │
└─────────────────────────────────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────────┐
│  RESTART                                                        │
│  ┌─────────────┐                                                │
│  │ Job Retry   │ → Queue retries agent-run job                  │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Load Run    │ → run.metadata.resumeCheckpoint                │
│  └──────┬──────┘    checkpoint.turn = 3                         │
│         │         checkpoint.agentMessages = [...]               │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Initialize  │ → agentMessages = checkpoint.agentMessages     │
│  │ from Checkpt│ → agentTurn = checkpoint.turn                  │
│  └──────┬──────┘ → fullRawResponse = checkpoint.fullRawResponse │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Resume Loop │ → Continue from turn 4                         │
│  └──────┬──────┘                                                │
│         │                                                       │
│         ▼                                                       │
│  ┌─────────────┐                                                │
│  │ Finalize    │ → Clear checkpoint on success                  │
│  └─────────────┘                                                │
└─────────────────────────────────────────────────────────────────┘

5. Checkpoint Mechanism

5.1 Checkpoint Structure

File: apps/api/services/runs/runMetadata.ts

TypeScript
export interface RunResumeCheckpoint {
  turn: number;                    // Current agent turn (1-indexed)
  fullRawResponse: string;         // Complete raw LLM response accumulated
  agentMessages: LlmChatMessage[]; // LLM conversation for continuation
  sandboxTagDetected: boolean;     // Whether <edward_sandbox> was seen
  totalToolCallsInRun: number;     // Running count of tool invocations
  outputTokens?: number;           // Token usage (if trackable)
  updatedAt: number;               // Timestamp (ms since epoch)
}

Stored in: runs.metadata.resumeCheckpoint (JSONB column)

5.2 When Checkpoints Are Created

File: apps/api/services/chat/session/loop/agentLoop.turnOutcome.ts

Checkpoints created on continuation turns (when agent loop continues):

TypeScript
async function checkpointContinuationMessages(params): Promise<LlmChatMessage[]> {
  const agentMessages: LlmChatMessage[] = [{
    role: MessageRole.User,
    content: params.prompt,  // Continuation prompt
  }];
  
  await params.onCheckpoint?.({
    turn: params.turn,
    fullRawResponse: params.fullRawResponse,
    agentMessages,  // Only the continuation prompt
    sandboxTagDetected: params.sandboxTagDetected,
    totalToolCallsInRun: params.totalToolCallsInRun,
    outputTokens: params.outputTokens,
    updatedAt: Date.now(),
  });
  
  return agentMessages;
}

Continuation Scenarios:

ScenarioWhenWhy Checkpoint
Tool Results ContinuationTools called but no code outputNeed to send tool results back to LLM
No-Progress NudgeNo tools called, no outputNeed to send nudge prompt

NOT Checkpointed:

  • Initial turn (turn 0)
  • Final turn (when loop exits)
  • Turns that produce code output

5.3 Checkpoint Persistence Flow

File: apps/api/services/runs/agent-run-worker/processor.session.ts

TypeScript
onCheckpoint: async (checkpoint: WorkerCheckpoint) => {
  // 1. Get current metadata
  const currentMetadata = getMetadata();
  
  // 2. Merge checkpoint into metadata
  const mergedMetadata: AgentRunMetadata = {
    ...currentMetadata,
    resumeCheckpoint: {
      turn: checkpoint.turn,
      fullRawResponse: checkpoint.fullRawResponse,
      agentMessages: checkpoint.agentMessages,
      sandboxTagDetected: checkpoint.sandboxTagDetected,
      totalToolCallsInRun: checkpoint.totalToolCallsInRun,
      outputTokens: checkpoint.outputTokens,
      updatedAt: checkpoint.updatedAt,
    } satisfies RunResumeCheckpoint,
  };
  
  // 3. Persist to database
  const metadataPatch: Record<string, unknown> = JSON.parse(
    JSON.stringify(mergedMetadata)
  );
  await updateRun(runId, {
    currentTurn: checkpoint.turn,
    metadata: metadataPatch,
  });
  
  // 4. Update in-memory metadata
  onMetadataUpdated(mergedMetadata);
  onTurnUpdated(checkpoint.turn);
};

5.4 Checkpoint Usage on Worker Restart

File: apps/api/services/chat/session/loop/agentLoop.runner.ts

TypeScript
export async function runAgentLoop(
  params: RunAgentLoopParams,
): Promise<RunAgentLoopResult> {
  // Initialize from checkpoint if present
  let fullRawResponse = resumeCheckpoint?.fullRawResponse ?? "";
  let agentMessages: LlmChatMessage[] =
    resumeCheckpoint?.agentMessages ?? initialMessages;
  let agentTurn = resumeCheckpoint?.turn ?? 0;
  let totalToolCallsInRun = resumeCheckpoint?.totalToolCallsInRun ?? 0;
  let sandboxTagDetected = resumeCheckpoint?.sandboxTagDetected ?? false;
  let outputTokens = resumeCheckpoint?.outputTokens;

  // Continue loop from checkpoint.turn
  while (agentTurn < MAX_AGENT_TURNS) {
    agentTurn += 1;
    // ... execute turn
  }
}

5.5 Checkpoint Cleanup

File: apps/api/services/runs/agent-run-worker/processor.finalize.ts

On successful completion:

TypeScript
await updateRun(params.runId, {
  status: mapped.status,
  state: mapped.state,
  currentTurn: params.currentTurn,
  metadata: {
    ...params.metadata,
    resumeCheckpoint: null,  // Clear checkpoint
    firstTokenLatencyMs: params.firstTokenLatencyMs,
    runDurationMs: durationMs,
  },
});

Why Clear?

  • Run is complete, no need to resume
  • Reduces metadata size
  • Prevents accidental replay of completed runs

6. Event Sourcing & Replay

6.1 Event Schema

File: packages/shared/src/streamEvents.ts

TypeScript
export interface RunEventEnvelope {
  id: string;      // UUID
  runId: string;   // Run identifier
  seq: number;     // Sequential number (auto-increment)
  eventType: string; // Event type (text, meta, file_start, etc.)
  event: StreamEvent; // Actual event payload
}

Event Types:

CategoryEvents
Lifecyclemeta (SESSION_START, TURN_START, TURN_COMPLETE, SESSION_COMPLETE)
Contenttext, thinking_*, file_*
Sandboxsandbox_*, install_*, command
Toolsweb_search, url_scrape
Errorserror (fatal, recoverable)
Metricsmetrics, rate_limit_status, preview_url, build_status

6.2 Event Persistence

Database Schema:

Sql
CREATE TABLE run_events (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  run_id UUID NOT NULL REFERENCES runs(id),
  seq BIGINT NOT NULL,  -- Auto-increment per run
  event_type TEXT NOT NULL,
  event JSONB NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW(),
  UNIQUE(run_id, seq)
);

CREATE INDEX idx_run_events_run_seq ON run_events(run_id, seq);

Append Function:

TypeScript
export async function appendRunEvent(params: {
  runId: string;
  eventType: string;
  event: Record<string, unknown>;
}): Promise<{ id: string; seq: number }> {
  const [row] = await db
    .insert(runEvents)
    .values({
      runId: params.runId,
      eventType: params.eventType,
      event: params.event,
    })
    .returning({ id: runEvents.id, seq: runEvents.seq });
  
  return row;
}

6.3 Event Replay Query

File: apps/api/services/run-event-stream-utils/service.ts

TypeScript
const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);

// Implementation:
export async function getRunEventsAfter(
  runId: string,
  lastSeq: number,
  limit: number,
): Promise<RunEventRow[]> {
  const rows = await db
    .select({
      id: runEvents.id,
      seq: runEvents.seq,
      eventType: runEvents.eventType,
      event: runEvents.event,
    })
    .from(runEvents)
    .where(
      and(
        eq(runEvents.runId, runId),
        gt(runEvents.seq, lastSeq)
      )
    )
    .orderBy(runEvents.seq)
    .limit(limit);
  
  return rows;
}

Query Pattern:

Sql
SELECT id, seq, event_type, event
FROM run_events
WHERE run_id = $1 AND seq > $2
ORDER BY seq ASC
LIMIT $3;

6.4 Replay Batching

Why Batch?

  • Large runs may have thousands of events
  • Don't overwhelm client with single massive response
  • Allow progressive rendering

Batching Logic:

TypeScript
const MAX_REPLAY_BATCH = 500;

while (!closed) {
  const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);
  if (replayRows.length === 0) break;

  for (const row of replayRows) {
    if (closed) break;
    if (row.seq <= lastSeq) continue;

    lastSeq = row.seq;
    const ok = emitPersistedEvent(row.id, row.event as StreamEvent);
    if (!ok) {
      await closeStream({ sendDone: false });
      return;
    }
  }
}

Multiple Batches:

  • First batch: 500 events
  • If more events exist, loop continues
  • Next iteration: next 500 events
  • Continues until all events replayed

7. Frontend Reconnection Strategy

7.1 Cursor Persistence

File: apps/web/stores/chatStream/cursorPersistence.ts

TypeScript
const SSE_CURSOR_STORAGE_PREFIX = "sse_cursor:";

export function createStreamCursorPersistence(): StreamCursorPersistence {
  const streamCursor = new Map<string, string>();  // In-memory cache

  const persistCursor = (chatId: string, runId: string, lastEventId: string): void => {
    const key = `${chatId}:${runId}`;
    streamCursor.set(key, lastEventId);
    try {
      sessionStorage.setItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`, lastEventId);
    } catch {
      // sessionStorage may be unavailable in private-browsing
    }
  };

  const readCursor = (chatId: string, runId: string): string | undefined => {
    const key = `${chatId}:${runId}`;
    const inMemory = streamCursor.get(key);
    if (inMemory) return inMemory;
    
    try {
      return sessionStorage.getItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`) ?? undefined;
    } catch {
      return undefined;
    }
  };

  const clearCursor = (chatId: string, runId: string): void => {
    const key = `${chatId}:${runId}`;
    streamCursor.delete(key);
    try {
      sessionStorage.removeItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`);
    } catch {
      // no-op
    }
  };

  return { persistCursor, readCursor, clearCursor };
}

Storage Strategy:

LayerPurposeLifetime
In-memory MapFast access during sessionTab lifetime
sessionStoragePersistence across refreshTab lifetime (survives refresh)

Why Not localStorage?

  • Cursors are session-specific
  • Don't persist across browser restarts
  • Avoid stale cursors for old runs

7.2 Stream Resumption

File: apps/web/stores/chatStream/resumeRunStream.ts

TypeScript
export function createResumeRunStream(params): (chatId: string, runId: string) => void {
  return (chatId: string, runId: string) => {
    const resumeCursor = readCursor(chatId, runId);
    
    const response = await openRunEventsStream(chatId, runId, {
      signal: controller.signal,
      ...(resumeCursor ? { lastEventId: resumeCursor } : {}),
    });

    const streamResult = await processStreamResponse({
      response,
      chatId,
      dispatch,
      onCursorUpdate: (id: string, rId: string) =>
        persistCursor(resolvedChatId, rId, id),
      // ...
    });

    // Clear cursor on successful completion
    if (streamResult) {
      clearCursor(resolvedChatId, runId);
    }
  };
}

7.3 API Client

File: apps/web/lib/api/chat.ts

TypeScript
export async function openRunEventsStream(
  chatId: string,
  runId: string,
  options?: { lastEventId?: string; signal?: AbortSignal },
): Promise<Response> {
  const params = new URLSearchParams();
  if (options?.lastEventId) {
    params.set("lastEventId", options.lastEventId);
  }

  const queryString = params.toString();
  return fetchApiResponse(
    `/chat/${chatId}/runs/${runId}/stream${queryString ? `?${queryString}` : ""}`,
    { method: "GET", signal: options?.signal }
  );
}

Request Format:

Plain text
GET /chat/{chatId}/runs/{runId}/stream?lastEventId=seq:12345
Headers:
  Accept: text/event-stream
  Cache-Control: no-cache
  Connection: keep-alive

7.4 Page-Level Orchestration

File: apps/web/hooks/chat/useChatPageOrchestration.ts

Active Run Lookup:

TypeScript
const AGGRESSIVE_ACTIVE_RUN_LOOKUP_ATTEMPTS = 6;
const ACTIVE_RUN_LOOKUP_BASE_RETRY_MS = 350;
const ACTIVE_RUN_LOOKUP_MAX_RETRY_MS = 2000;

function getRetryDelayMs(attempt: number): number {
  return Math.min(
    ACTIVE_RUN_LOOKUP_MAX_RETRY_MS,
    ACTIVE_RUN_LOOKUP_BASE_RETRY_MS * 2 ** attempt
  );
}

// Exponential backoff lookup
while (notFoundAttempts < maxNotFoundAttempts) {
  const response = await fetchActiveRun({
    signal: abortController.signal,
    staleTimeMs: 0,
  });
  const activeRun = response?.data.run;

  if (activeRun) {
    resumeRunStream(chatId, activeRun.id);
    return;
  }

  notFoundAttempts += 1;
  await waitForRetry(notFoundAttempts - 1, abortController.signal);
}

Lookup Strategy:

ModeAttemptsUse Case
Aggressive6Page load, user expects active run
Single1Background check
Defer0Streaming already in progress

7.5 Stream Processor with Replay

File: apps/web/lib/streaming/processors/chatStreamProcessor.ts

Built-in Replay:

TypeScript
const MAX_REPLAY_ATTEMPTS = 3;
const REPLAY_BACKOFF_BASE_MS = 500;

if (
  !fatalError &&
  !sessionCompleted &&
  replayAttempt < MAX_REPLAY_ATTEMPTS &&
  metaEvent?.runId
) {
  try {
    await new Promise((resolve) =>
      setTimeout(
        resolve,
        Math.min(REPLAY_BACKOFF_BASE_MS * 2 ** replayAttempt, 5_000)
      )
    );
    
    const replayResponse = await openRunEventsStream(
      activeChatId,
      metaEvent.runId,
      lastEventId ? { lastEventId } : undefined
    );
    
    const replayResult = await processStreamResponse({
      response: replayResponse,
      replayAttempt: replayAttempt + 1,
      replayCursor: lastEventId,
      // ...
    });

    if (replayResult) {
      return mergeStreamResults(result, replayResult);
    }
  } catch {
    // Replay failed, report error
  }
}

Why Client-Side Replay?

  • Backend stream may end prematurely
  • Client detects incomplete session
  • Automatic retry with backoff
  • Merges results seamlessly

8. Worker Resumption

8.1 Worker Lifecycle

File: apps/api/services/runs/agent-run-worker/processor.ts

Startup:

TypeScript
export async function processAgentRunJob(
  runId: string,
  publisher: Publisher,
): Promise<void> {
  // 1. Load run from DB
  const run = await getRunById(runId);
  if (!run) throw new Error(`Run not found: ${runId}`);

  // 2. Check if already terminal
  if (isTerminalRunStatus(run.status)) {
    return;  // Already finished
  }

  // 3. Check for existing terminal event
  const terminalEvent = await getLatestSessionCompleteEvent(runId);
  if (terminalEvent) {
    // Run completed but status not updated
    await updateRunWithLog(runId, {
      status: mapped.status,
      state: mapped.state,
    }, "terminal-event-already-present");
    return;
  }

  // 4. Subscribe to cancel signal
  await cancelSub.subscribe(runCancelChannel);
  cancelSub.on("message", () => {
    workerAbort.abort();  // Stop everything
  });

  // 5. Parse metadata (includes checkpoint)
  const metadata = parseAgentRunMetadata(run.metadata);
  // metadata.resumeCheckpoint may be present

  // 6. Initialize progress from checkpoint
  const progress: RunEventProgress = {
    currentTurn: metadata.resumeCheckpoint?.turn ?? run.currentTurn ?? 0,
    // ...
  };
}

8.2 Checkpoint Detection

TypeScript
const metadata = parseAgentRunMetadata(run.metadata);

// Check for checkpoint
if (metadata.resumeCheckpoint) {
  logger.info(
    { runId, turn: metadata.resumeCheckpoint.turn },
    "Resuming run from checkpoint"
  );
} else {
  logger.info({ runId }, "Starting fresh run");
}

8.3 Resumption Flow

TypeScript
await runStreamSession(
  buildWorkerRunSessionInput({
    req: fakeReq,
    res: capturedRes,
    externalSignal: workerAbort.signal,
    workflow: metadata.workflow,
    run,
    decryptedApiKey,
    historyMessages,
    projectContext,
    runId,
    resumeCheckpoint: metadata.resumeCheckpoint,  // Pass checkpoint
    onCheckpoint: async (checkpoint) => {
      // Persist checkpoint on each continuation
      await updateRun(runId, {
        currentTurn: checkpoint.turn,
        metadata: metadataPatch,
      });
    },
  })
);

8.4 Finalization

File: apps/api/services/runs/agent-run-worker/processor.finalize.ts

On Success:

TypeScript
export async function finalizeSuccessfulRun(params): Promise<void> {
  await updateRun(params.runId, {
    status: mapped.status,
    state: mapped.state,
    currentTurn: params.currentTurn,
    terminationReason: params.latestTerminationReason,
    completedAt: finishedAt,
    metadata: {
      ...params.metadata,
      resumeCheckpoint: null,  // Clear checkpoint
      firstTokenLatencyMs: params.firstTokenLatencyMs,
      runDurationMs: durationMs,
    },
  });
}

On Failure:

TypeScript
export async function finalizeFailedRun(params): Promise<void> {
  // 1. Flush pending events
  await params.flushCapturedEvents();

  // 2. Persist error event
  await persistRunEventWithLog(params.runId, errorEvent, params.publisher, "...");

  // 3. Persist completion meta event
  await persistRunEventWithLog(params.runId, completionMetaEvent, params.publisher, "...");

  // 4. Update run status (checkpoint NOT cleared - allows retry)
  await updateRunWithLog(params.runId, {
    status: RUN_STATUS.FAILED,
    state: "FAILED",
    currentTurn: params.currentTurn,
    terminationReason: StreamTerminationReason.STREAM_FAILED,
    errorMessage: latestErrorMessage,
    completedAt: new Date(),
  });
}

Why Keep Checkpoint on Failure?

  • Allows manual retry from checkpoint
  • Preserves state for debugging
  • Can be cleared by explicit retry logic

9. Failure Modes & Recovery

9.1 Failure Mode Matrix

Failure ModeDetectionRecoveryData Loss
Client disconnectreq.close eventReconnect with cursorNone
Slow clientBackpressure queueGraceful closeNone (events persisted)
Worker crashJob timeoutQueue retry + checkpointMinimal (since last checkpoint)
Redis unavailableConnection errorFallback to PostgreSQL pollingNone
PostgreSQL unavailableQuery errorRetry with backoffTemporary (events buffered)
Stream timeoutGuard timerTerminate with reasonNone (events persisted)
Network blipSSE connection dropAuto-reconnectNone

9.2 Client Disconnect Handling

File: apps/api/services/run-event-stream-utils/service.ts

TypeScript
req.on("close", () => {
  void closeStream({ sendDone: false });
});

const closeStream = async (options?: { sendDone?: boolean }): Promise<void> => {
  if (closed) return;
  closed = true;

  // Clean up heartbeat
  if (heartbeat) {
    clearInterval(heartbeat);
    heartbeat = null;
  }

  // Unsubscribe from Redis
  if (unsubscribeRedisChannel) {
    await unsubscribeRedisChannel();
  }

  // Close response
  if (!res.writableEnded) {
    res.end();
  }
};

What Happens:

  1. Client closes connection (refresh, navigate away)
  2. Backend receives req.close event
  3. Unsubscribe from Redis pub/sub
  4. Stop heartbeats
  5. Close response gracefully
  6. Worker continues execution (independent of client connection)

9.3 Slow Client Handling

File: apps/api/services/run-event-stream-utils/service.ts

TypeScript
configureSSEBackpressure(res, {
  onSlowClient: () => {
    void closeStream({ sendDone: false });
  },
});

Detection:

  • Write queue exceeds threshold
  • res.write() returns false (backpressure)
  • Queue not draining fast enough

Recovery:

  1. Close stream gracefully
  2. Client can reconnect with cursor
  3. Events already persisted → replay on reconnect

9.4 Worker Crash Recovery

File: apps/api/services/runs/agent-run-worker/processor.ts

Crash Detection:

  • Worker process dies (OOM, panic, etc.)
  • Job queue detects timeout
  • Run remains in "RUNNING" state

Recovery:

  1. Queue retries job (configurable retries)
  2. New worker picks up job
  3. Load run from DB
  4. Check for checkpoint in metadata
  5. Resume from checkpoint.turn

What's Lost:

  • Work since last checkpoint
  • Typically: partial turn execution
  • Events persisted before crash are safe

9.5 Stream Timeout

File: apps/api/services/chat/session/orchestrator/streamGuards.ts

TypeScript
const STREAM_GUARD_TIMEOUT_MS = 20 * 60 * 1000;  // 20 minutes

export function setupStreamGuards(params): StreamGuardHandles {
  // Timeout guard
  const timeout = setTimeout(() => {
    params.abortController.abort();
    params.setTerminationReason(StreamTerminationReason.STREAM_TIMEOUT);
  }, STREAM_GUARD_TIMEOUT_MS);

  return { clear: () => clearTimeout(timeout) };
}

On Timeout:

  1. Abort controller triggered
  2. Agent loop stops
  3. Termination reason set to STREAM_TIMEOUT
  4. Session finalized with timeout reason
  5. Client receives error event

9.6 Redis Unavailable

File: apps/api/lib/redisPubSub.ts

TypeScript
export async function subscribeToRedisChannel(
  channel: string,
  handler: (payload: string) => void,
): Promise<() => Promise<void>> {
  // ... subscription logic

  subscriber.on("error", (error: unknown) => {
    logger.error({ error }, "Redis pub/sub subscriber error");
  });
}

Fallback Strategy:

  • Events still persisted to PostgreSQL
  • Client reconnects → replay from PostgreSQL
  • Live events missed during Redis outage
  • Mitigation: Poll PostgreSQL for new events

10. Infrastructure Cross-Questions

10.1 PostgreSQL

Q: What's the event volume per run?

Typical run:

  • Meta events: 5-10 (session start, turn start/complete x5, session complete)
  • Text events: 50-200 (narrative, explanations)
  • File events: 10-50 (file_start, file_content xN, file_end)
  • Command events: 5-20 (command execution results)
  • Total: 70-300 events per run

Q: What's the storage requirement?

Per run estimate:

  • Event envelope: ~200 bytes overhead
  • Event payload: ~500 bytes average (varies widely)
  • Per event: ~700 bytes
  • Per run (200 events): ~140 KB

At 10,000 runs/day:

  • Daily: 1.4 GB
  • Monthly: 42 GB
  • Yearly: 500 GB

Q: Should we partition the run_events table?

Recommendation: Yes, partition by created_at (monthly partitions)

Sql
CREATE TABLE run_events (
  id UUID DEFAULT gen_random_uuid(),
  run_id UUID NOT NULL,
  seq BIGINT NOT NULL,
  event_type TEXT NOT NULL,
  event JSONB NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW(),
  PRIMARY KEY (run_id, seq, created_at)
) PARTITION BY RANGE (created_at);

CREATE TABLE run_events_2025_01 PARTITION OF run_events
  FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

Benefits:

  • Faster cleanup (drop old partitions)
  • Improved query performance (partition pruning)
  • Easier backup/restore

Q: What indexes are needed?

Sql
-- Primary lookup (replay)
CREATE INDEX idx_run_events_run_seq ON run_events(run_id, seq);

-- Cleanup (find old events)
CREATE INDEX idx_run_events_created_at ON run_events(created_at);

-- Covering index for replay (avoid table lookup)
CREATE INDEX idx_run_events_run_seq_covering 
  ON run_events(run_id, seq) 
  INCLUDE (id, event_type, event);

10.2 Redis

Q: What's the Redis memory footprint?

Pub/sub channels:

  • One channel per active run: edward:run-events:{runId}
  • Channel name: ~50 bytes
  • Subscriber overhead: ~100 bytes per subscriber
  • Per active run: ~150 bytes

At 1,000 concurrent runs:

  • Channel memory: ~150 KB (negligible)

Q: What about event buffering?

Events are NOT buffered in Redis (only published):

  • Publisher sends event → all subscribers receive
  • No persistence in Redis
  • PostgreSQL is the source of truth

Q: Redis cluster vs standalone?

Recommendation: Standalone for pub/sub

Why:

  • Pub/sub doesn't benefit from clustering (messages not persisted)
  • Simpler deployment
  • Lower latency

Failover:

  • Sentinel for automatic failover
  • Clients reconnect on failover
  • Events persisted to PostgreSQL during outage

Q: What's the pub/sub latency?

Typical latency:

  • Publish to subscriber: <1ms (same region)
  • 99th percentile: <5ms

Impact:

  • Live events arrive within milliseconds
  • Replay from PostgreSQL is the bottleneck (not Redis)

10.3 SSE Infrastructure

Q: How many concurrent SSE connections?

Estimate:

  • Active runs: 1,000
  • Connections per run: 1-3 (user may have multiple tabs)
  • Concurrent connections: 1,000-3,000

Q: Can a single server handle this?

Node.js SSE capacity:

  • Memory per connection: ~10-50 KB
  • CPU per connection: minimal (event-driven)
  • Single server: 10,000+ connections feasible

Q: What about horizontal scaling?

Challenge: SSE connections are sticky (can't load balance mid-stream)

Solutions:

  1. Sticky sessions (recommended)

    • Load balancer routes by cookie/session
    • Same server handles entire stream
    • Simple, effective
  2. Connection migration

    • On server shutdown, notify clients
    • Clients reconnect to new server
    • Complex, rarely needed
  3. WebSocket + migration

    • Use WebSocket instead of SSE
    • Supports connection migration
    • More complex protocol

Q: What's the heartbeat strategy?

TypeScript
const HEARTBEAT_INTERVAL_MS = 15_000;  // 15 seconds

heartbeat = setInterval(() => {
  sendSSEComment(res, "run-events-heartbeat");
}, HEARTBEAT_INTERVAL_MS);

Why Heartbeats?

  • Keep connection alive (prevent timeout)
  • Detect dead connections
  • Load balancers may close idle connections

Q: How to handle connection limits?

Browser limits:

  • Chrome: 6 connections per domain
  • Firefox: 6 connections per domain
  • Safari: 6 connections per domain

Mitigation:

  • Single SSE connection per chat
  • Reuse connection for multiple runs
  • Use domain sharding if needed (not recommended)

10.4 Worker Scaling

Q: How many workers?

Formula:

Plain text
workers = ceil(concurrent_runs / runs_per_worker)

Typical:

  • Concurrent runs: 1,000
  • Runs per worker: 10-50 (depends on LLM latency)
  • Workers needed: 20-100

Q: Worker autoscaling?

Metrics to track:

  • Queue depth (Redis)
  • Average run duration
  • Worker CPU/memory

Scaling policy:

  • Scale up: Queue depth > threshold for 2 minutes
  • Scale down: Queue depth = 0 for 10 minutes

Q: What about worker affinity?

No affinity needed:

  • Workers are stateless
  • State in PostgreSQL + checkpoint
  • Any worker can process any job

10.5 Data Retention

Q: How long to keep events?

Recommendation: 30 days

Why:

  • Supports debugging recent issues
  • Allows replay for active users
  • Compliance (audit trail)

Cleanup strategy:

Sql
-- Daily cleanup job
DELETE FROM run_events
WHERE created_at < NOW() - INTERVAL '30 days';

Q: What about checkpoint retention?

Recommendation: Clear on completion, keep on failure

Why:

  • Completed runs don't need checkpoint
  • Failed runs may need manual retry
  • Checkpoints are small (few KB)

10.6 Monitoring

Q: What metrics to track?

Stream Health:

  • stream_reconnect_count: Reconnections per run
  • stream_replay_events: Events replayed per reconnect
  • stream_duration_ms: Total stream duration
  • stream_termination_reason: Distribution of termination reasons

Checkpoint Health:

  • checkpoint_created_count: Checkpoints created
  • checkpoint_resume_count: Runs resumed from checkpoint
  • checkpoint_turn_distribution: Turns per checkpoint

Event Volume:

  • events_persisted_count: Events per run
  • events_replay_lag_ms: Time to replay events
  • redis_pubsub_latency_ms: Pub/sub latency

Q: What alerts to set?

MetricThresholdAction
stream_reconnect_count > 5Per runInvestigate network issues
checkpoint_resume_count > 10%Of runsInvestigate worker stability
events_replay_lag_ms > 5000P99Optimize replay queries
redis_pubsub_latency_ms > 100P99Check Redis health

11. Key Files Reference

Core Orchestration

FilePurposeLines
apps/api/services/runs/runEvents.service.tsEvent persistence~40
apps/api/services/run-event-stream-utils/service.tsSSE streaming + replay~250
apps/api/services/sse-utils/service.tsSSE backpressure handling~200
apps/api/services/runs/agent-run-worker/processor.tsWorker execution~350
apps/api/services/runs/agent-run-worker/processor.helpers.tsEvent capture~150
apps/api/services/runs/agent-run-worker/processor.finalize.tsRun finalization~150

Checkpoint & Continuation

FilePurposeLines
apps/api/services/runs/runMetadata.tsCheckpoint schema~100
apps/api/services/runs/agent-run-worker/processor.session.tsCheckpoint persistence~100
apps/api/services/chat/session/loop/agentLoop.runner.tsAgent loop with checkpoint~200
apps/api/services/chat/session/loop/agentLoop.turnOutcome.tsContinuation logic~250
apps/api/services/chat/session/shared/checkpoint.types.tsCheckpoint types~15
apps/api/services/chat/session/shared/continuation.tsContinuation prompts~250

Frontend

FilePurposeLines
apps/web/stores/chatStream/cursorPersistence.tsCursor storage~50
apps/web/stores/chatStream/resumeRunStream.tsStream resumption~150
apps/web/lib/api/chat.tsSSE API client~100
apps/web/lib/streaming/processors/chatStreamProcessor.tsStream processing~400
apps/web/hooks/chat/useChatPageOrchestration.tsPage orchestration~250

Shared Types

FilePurposeLines
packages/shared/src/streamEvents.tsEvent type definitions~250

Infrastructure

FilePurposeLines
apps/api/lib/redisPubSub.tsRedis pub/sub~80
apps/api/lib/redis.tsRedis client creation~20

Tests

FilePurpose
apps/api/tests/services/runs/agentRun.processor.session.test.tsCheckpoint tests
apps/api/tests/services/runs/runMetadata.test.tsMetadata parsing tests
apps/api/tests/controllers/chat/streamSession.shared.test.tsContinuation prompt tests
apps/api/tests/services/chat/runEventStream.utils.service.test.tsStream replay tests

Appendix A: Glossary

TermDefinition
CheckpointSnapshot of agent loop state for resumption
CursorLast processed event ID (client-side)
Event SourcingPattern of persisting all state changes as events
ReplayRe-emitting historical events on reconnection
SSEServer-Sent Events (one-way streaming from server to client)
TurnSingle iteration of agent loop (LLM call + tool execution)
WorkerBackground process that executes agent runs

Appendix B: Sequence Diagrams

B.1 Normal Stream Flow

Plain text
Browser         API              Worker           Postgres         Redis
  |              |                |                 |               |
  |--POST /chat->|                |                 |               |
  |              |--Create Run---->|                 |               |
  |              |                |                 |--Insert------>|
  |              |--Enqueue Job-->|                 |               |
  |              |                |                 |               |
  |<--SSE Open---|                |                 |               |
  |              |                |                 |               |
  |              |              Pick Up             |               |
  |              |                |                 |               |
  |              |                |--Load Run------>|               |
  |              |                |                 |               |
  |              |              Stream Session      |               |
  |              |                |                 |               |
  |              |                |--Persist Event->|               |
  |              |                |                 |--Publish----->|
  |<--Event------|                |                 |               |
  |              |                |                 |               |
  |              |              (repeat events)     |               |
  |              |                |                 |               |
  |              |                |--Finalize------>|               |
  |<--[DONE]-----|                |                 |               |
  |              |                |                 |               |

B.2 Reconnection Flow

Plain text
Browser         API              Worker           Postgres         Redis
  |              |                |                 |               |
  |--Refresh---->|                |                 |               |
  |              |                |                 |               |
  |  Read Cursor |                |                 |               |
  |  (sessionStorage)              |                 |               |
  |              |                |                 |               |
  |--GET /stream?lastEventId=12345                  |               |
  |              |                |                 |               |
  |              |--Replay Events->|                |               |
  |              |<----------------|                 |               |
  |<--Replay Events (seq > 12345)                   |               |
  |              |                |                 |               |
  |              |--Subscribe--------------------->|               |
  |              |                |                 |               |
  |              |                |                 |--Live Event-->|
  |<--Live Event-|                |                 |               |
  |              |                |                 |               |
  |  Save Cursor |                |                 |               |
  |  (sessionStorage)              |                 |               |
  |              |                |                 |               |

Appendix C: Configuration Reference

Environment Variables

VariableDefaultDescription
STREAM_GUARD_TIMEOUT_MS1200000Stream timeout (20 min)
MAX_AGENT_CONTINUATION_PROMPT_CHARS20000Continuation prompt limit
MAX_AGENT_TURNS10Max agent loop iterations
MAX_AGENT_TOOL_CALLS_PER_TURN10Tool budget per turn
MAX_AGENT_TOOL_CALLS_PER_RUN50Tool budget per run
MAX_REPLAY_BATCH500Events per replay batch
HEARTBEAT_INTERVAL_MS15000SSE heartbeat interval

Redis Channels

ChannelPurpose
edward:run-events:{runId}Live event pub/sub
edward:run-cancel:{runId}Cancel signal
agent-runsWorker job queue

Storage Keys

KeyPurpose
sse_cursor:{chatId}:{runId}Frontend cursor

Document Version: 1.0
Last Updated: March 25, 2026
Author: Principal Engineering Review

Shared via Inkdown