InkdownInkdown
Start writing

Edward

3 files·0 subfolders

Shared Workspace

Edward
Orchestration Layer

Stream Continuation

Shared from "Edward" on Inkdown

Edward Stream Continuation Architecture - Deep Dive

Principal Engineer's Technical Reference


Easy to digest HLD

alt


Table of Contents

  1. Executive Summary
  2. Architecture Principles
  3. Core Components Deep Dive
  4. Stream Continuation Flow
Overview
Stream Continuation
  • Checkpoint Mechanism
  • Event Sourcing & Replay
  • Frontend Reconnection Strategy
  • Worker Resumption
  • Failure Modes & Recovery
  • Infrastructure Cross-Questions
  • Key Files Reference

  • 1. Executive Summary

    What is Stream Continuation?

    Stream Continuation is Edward's resilience architecture that enables:

    • Seamless reconnection after client disconnects (page refresh, network blip, tab switch)
    • Worker crash recovery without losing in-progress code generation
    • Exact state replay from any point in the stream using cursor-based tracking
    • Multi-turn agent loop checkpointing for long-running operations
    Why This Matters

    Without stream continuation:

    • Page refresh = lost work, user frustration
    • Worker restart = orphaned runs, inconsistent state
    • Network hiccup = incomplete code generation
    • Long operations = no recovery from mid-execution failures

    With stream continuation:

    • Durable execution: Every event persisted, every state checkpointed
    • Resumable UX: Users can refresh/reconnect without losing progress
    • Operational resilience: Workers can restart without data loss
    • Audit trail: Complete event history for debugging and compliance

    2. Architecture Principles

    2.1 Event Sourcing

    All stream events are persisted as an immutable log:

    Plain text
    ┌─────────────────────────────────────────────────────────┐
    │  PostgreSQL: run_events table                           │
    │  ┌──────┬─────────┬──────┬──────────────┬─────────────┐│
    │  │ id   │ run_id  │ seq  │ event_type   │ event       ││
    │  ├──────┼─────────┼──────┼──────────────┼─────────────┤│
    │  │ uuid │ run-123 │ 1    │ meta         │ {...}       ││
    │  │ uuid │ run-123 │ 2    │ thinking_start│ {...}      ││
    │  │ uuid │ run-123 │ 3    │ text         │ {...}       ││
    │  │ uuid │ run-123 │ 4    │ sandbox_start│ {...}       ││
    │  │ uuid │ run-123 │ 5    │ file_start   │ {...}       ││
    │  └──────┴─────────┴──────┴──────────────┴─────────────┘│
    └─────────────────────────────────────────────────────────┘

    Benefits:

    • Exact replay from any sequence number
    • Audit trail for debugging
    • Supports multiple reconnections
    • Enables time-travel debugging
    2.2 Dual-Channel Delivery

    Events delivered via two channels simultaneously:

    Plain text
                        ┌─────────────────┐
                        │  Event Emitter  │
                        └────────┬────────┘
                                 │
                  ┌──────────────┴──────────────┐
                  │                             │
                  ▼                             ▼
        ┌─────────────────┐           ┌─────────────────┐
        │   PostgreSQL    │           │   Redis Pub/Sub │
        │  (Durable Store)│           │  (Real-time)    │
        └────────┬────────┘           └────────┬────────┘
                 │                             │
                 │                             │
                 ▼                             ▼
        ┌─────────────────┐           ┌─────────────────┐
        │ Replay on       │           │ Live events to  │
        │ reconnection    │           │ connected clients│
        └─────────────────┘           └─────────────────┘

    Why Both?

    • PostgreSQL: Durable, queryable, supports historical replay
    • Redis Pub/Sub: Low-latency, push-based, supports many subscribers
    2.3 Cursor-Based Resumption

    Frontend tracks last processed event:

    TypeScript
    // Cursor key format
    `sse_cursor:{chatId}:{runId}`
    
    // Stored in sessionStorage for persistence across refreshes
    sessionStorage.setItem(
      `sse_cursor:${chatId}:${runId}`,
      lastEventId  // e.g., "seq:12345"
    );

    Resumption Flow:

    1. Page loads → read cursor from sessionStorage
    2. Open SSE stream with ?lastEventId=seq:12345
    3. Backend replays events from PostgreSQL where seq > 12345
    4. Subscribe to Redis for live events
    5. Update cursor on each new event
    2.4 Checkpoint-Based Worker Resumption

    Agent loop state checkpointed on continuation turns:

    TypeScript
    interface RunResumeCheckpoint {
      turn: number;                    // Current agent turn
      fullRawResponse: string;         // Complete LLM response
      agentMessages: LlmChatMessage[]; // Conversation history
      sandboxTagDetected: boolean;     // Whether sandbox was triggered
      totalToolCallsInRun: number;     // Tool invocation count
      outputTokens?: number;           // Token usage tracking
      updatedAt: number;               // Timestamp
    }

    Persisted to: runs.metadata.resumeCheckpoint


    3. Core Components Deep Dive

    3.1 Event Persistence Layer

    File: apps/api/services/runs/runEvents.service.ts

    TypeScript
    export async function persistRunEvent(
      runId: string,
      event: StreamEvent,
      publisher?: Publisher,
    ): Promise<RunEventEnvelope> {
      // 1. Persist to PostgreSQL
      const row = await appendRunEvent({
        runId,
        eventType: event.type,
        event: event as unknown as Record<string, unknown>,
      });
    
      // 2. Build envelope with sequence number
      const envelope: RunEventEnvelope = {
        id: row.id,
        runId,
        seq: row.seq,  // Auto-increment sequence
        eventType: row.eventType,
        event: row.event as unknown as StreamEvent,
      };
    
      // 3. Publish to Redis for live delivery
      if (publisher) {
        await publisher.publish(
          getRunEventChannel(runId),
          JSON.stringify(envelope)
        );
      }
    
      return envelope;
    }

    Key Design Decisions:

    DecisionRationale
    Sequential seq numbersEnables cursor-based replay, ordering guarantee
    Dual write (Postgres + Redis)Durability + real-time delivery
    Envelope patternDecouples storage format from event schema
    Optional publisherAllows testing without Redis dependency

    3.2 SSE Streaming with Replay

    File: apps/api/services/run-event-stream-utils/service.ts

    Core Function: streamRunEventsFromPersistence()

    TypeScript
    export async function streamRunEventsFromPersistence({
      req,
      res,
      runId,
      explicitLastEventId,
    }: StreamRunEventsOptions): Promise<void> {
      let lastSeq = readLastEventId(req, explicitLastEventId);
      let terminalEventSeen = false;
      let replaying = true;
      const bufferedLiveEvents = new Map<number, RunEventEnvelope>();
    
      // 1. Subscribe to Redis for live events
      unsubscribeRedisChannel = await subscribeToRedisChannel(
        channel,
        onMessage  // Buffers during replay, processes after
      );
    
      // 2. Replay historical events from PostgreSQL
      while (!closed) {
        const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);
        if (replayRows.length === 0) break;
    
        for (const row of replayRows) {
          lastSeq = row.seq;
          emitPersistedEvent(row.id, row.event as StreamEvent);
        }
      }
    
      // 3. Flush buffered live events
      replaying = false;
      await flushBufferedLiveEvents();
    
      // 4. Continue with live events only
      // (Redis subscriber handles this)
    }

    Replay Logic:

    Plain text
    Client Request: GET /chat/{chatId}/runs/{runId}/stream?lastEventId=seq:12345
    
    Backend:
      1. Parse lastEventId → extract seq: 12345
      2. Query: SELECT * FROM run_events WHERE run_id = ? AND seq > 12345 ORDER BY seq ASC LIMIT 500
      3. Emit each event with SSE id: field
      4. Subscribe to Redis: edward:run-events:{runId}
      5. Buffer live events during replay
      6. Flush buffer after replay complete
      7. Stream live events as they arrive

    Buffering During Replay:

    TypeScript
    const onMessage = (payload: string) => {
      const envelope = JSON.parse(payload) as RunEventEnvelope;
      
      if (replaying) {
        // Buffer live events during replay
        bufferedLiveEvents.set(envelope.seq, envelope);
        return;
      }
    
      // Process live events after replay
      if (envelope.seq > lastSeq) {
        lastSeq = envelope.seq;
        emitPersistedEvent(envelope.id, envelope.event);
      }
    };

    Why Buffer?

    • Live events may arrive during replay
    • Need to prevent duplicates
    • Need to maintain ordering (seq-based)

    3.3 SSE Backpressure Handling

    File: apps/api/services/sse-utils/service.ts

    Problem: Slow clients can cause memory buildup

    Solution: Queue-based backpressure with graceful degradation

    TypeScript
    interface SSEWriterState {
      res: Response;
      queue: string[];        // Buffered writes
      queueBytes: number;     // Total bytes queued
      backpressured: boolean; // Currently backpressured
      ending: boolean;        // Graceful shutdown in progress
    }
    
    function enqueueWrite(state: SSEWriterState, data: string): boolean {
      if (state.res.writableEnded || !state.res.writable) {
        return false;
      }
    
      const payloadBytes = Buffer.byteLength(data);
    
      // Fast path: no backlog, write directly
      if (!state.backpressured && state.queue.length === 0) {
        const ok = state.res.write(data);
        if (!ok) {
          state.backpressured = true;
        }
        return true;
      }
    
      // Slow path: queue for later
      state.queue.push(data);
      state.queueBytes += payloadBytes;
      return true;
    }
    
    // On 'drain' event from Response
    res.on("drain", () => {
      flushQueue(state);  // Drain queued writes
    });

    Backpressure Configuration:

    TypeScript
    configureSSEBackpressure(res, {
      maxQueueBytes?: number;      // Max bytes before dropping
      maxQueueEvents?: number;     // Max events before dropping
      onSlowClient?: () => void;   // Callback when client is slow
    });

    Graceful Degradation:

    • Queue builds up → monitor size
    • Exceeds threshold → trigger onSlowClient
    • Close stream gracefully → client can reconnect

    3.4 Worker Event Capture

    File: apps/api/services/runs/agent-run-worker/processor.helpers.ts

    Problem: Worker needs to capture SSE stream and persist events

    Solution: Create mock Response object that intercepts writes

    TypeScript
    export function createRunEventCaptureResponse(
      onEvent: (event: StreamEvent) => Promise<void>,
    ): RunEventCaptureResponse {
      const response = new EventEmitter() as RunEventCaptureResponse;
      let sseBuffer = "";
      const utf8Decoder = new StringDecoder("utf8");
      let pending = Promise.resolve();
      let persistFailure: Error | null = null;
    
      response.write = (chunk: string | Buffer): boolean => {
        // 1. Decode chunk to text
        const text = typeof chunk === "string" ? chunk : utf8Decoder.write(chunk);
        sseBuffer += text;
    
        // 2. Parse SSE frames
        const normalized = sseBuffer.replaceAll("\r\n", "\n");
        const frames = normalized.split("\n\n");
        sseBuffer = frames.pop() ?? "";
    
        // 3. Extract event payloads
        for (const frame of frames) {
          const payload = frame
            .split("\n")
            .filter((line) => line.startsWith("data:"))
            .map((line) => line.slice(5).trimStart())
            .join("\n");
    
          if (!payload || payload === "[DONE]") continue;
    
          // 4. Persist each event
          pending = pending.then(async () => {
            try {
              const parsed = JSON.parse(payload) as StreamEvent;
              await onEvent(parsed);
            } catch (error) {
              persistFailure = error;
              logger.error({ error, payload }, "Failed to persist captured run event");
            }
          });
        }
    
        return true;
      };
    
      response.flushPending = async () => {
        await pending;
        if (persistFailure) throw persistFailure;
      };
    
      return response;
    }

    Why This Pattern?

    • Decouples stream session from persistence mechanism
    • Enables testing without actual HTTP response
    • Serializes event persistence (prevents race conditions)
    • Tracks failures for finalization

    4. Stream Continuation Flow

    4.1 Normal Flow (No Interruption)
    Plain text
    ┌─────────────────────────────────────────────────────────────────┐
    │  PHASE 1: ADMISSION                                             │
    │  ┌─────────────┐                                                │
    │  │ User Message│                                                │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Create Run  │ → run.status = "QUEUED"                        │
    │  └──────┬──────┘    run.metadata.resumeCheckpoint = null        │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Enqueue Job │ → Redis queue: agent-runs                      │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Stream SSE  │ → Open connection to browser                   │
    │  └─────────────┘                                                │
    └─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
    ┌─────────────────────────────────────────────────────────────────┐
    │  PHASE 2: WORKER EXECUTION                                      │
    │  ┌─────────────┐                                                │
    │  │ Pick Up Job │ → Load run from DB                             │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Check State │ → Not terminal, no checkpoint                  │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Stream Sess │ → runStreamSession()                           │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Agent Loop  │ → Turn 1, 2, 3...                              │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Finalize    │ → run.status = "COMPLETED"                     │
    │  └─────────────┘                                                │
    └─────────────────────────────────────────────────────────────────┘
    4.2 Client Disconnect + Reconnect
    Plain text
    ┌─────────────────────────────────────────────────────────────────┐
    │  DISCONNECT                                                     │
    │  ┌─────────────┐                                                │
    │  │ User Refresh│ → Browser closes SSE connection                │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ req.close   │ → Backend cleans up SSE stream                 │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         │ (Worker continues execution in background)             │
    │         │ Events persisted to PostgreSQL + Redis                 │
    └─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
    ┌─────────────────────────────────────────────────────────────────┐
    │  RECONNECT                                                      │
    │  ┌─────────────┐                                                │
    │  │ Page Load   │ → Read cursor from sessionStorage              │
    │  └──────┬──────┘    cursor = "seq:12345"                        │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ GET /stream │ → lastEventId=seq:12345                        │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Replay      │ → SELECT * WHERE seq > 12345                   │
    │  └──────┬──────┘    Emit events 12346, 12347, 12348...          │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Subscribe   │ → Redis: edward:run-events:{runId}             │
    │  └──────┬──────┘    Receive live events                         │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Update      │ → Persist new cursor on each event             │
    │  │ Cursor      │    sessionStorage.setItem(...)                  │
    │  └─────────────┘                                                │
    └─────────────────────────────────────────────────────────────────┘
    4.3 Worker Crash + Restart
    Plain text
    ┌─────────────────────────────────────────────────────────────────┐
    │  CRASH                                                          │
    │  ┌─────────────┐                                                │
    │  │ Worker Dies │ → Process killed, OOM, etc.                    │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         │ (Run remains in "RUNNING" state)                      │
    │         │ Events persisted up to crash point                    │
    └─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
    ┌─────────────────────────────────────────────────────────────────┐
    │  RESTART                                                        │
    │  ┌─────────────┐                                                │
    │  │ Job Retry   │ → Queue retries agent-run job                  │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Load Run    │ → run.metadata.resumeCheckpoint                │
    │  └──────┬──────┘    checkpoint.turn = 3                         │
    │         │         checkpoint.agentMessages = [...]               │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Initialize  │ → agentMessages = checkpoint.agentMessages     │
    │  │ from Checkpt│ → agentTurn = checkpoint.turn                  │
    │  └──────┬──────┘ → fullRawResponse = checkpoint.fullRawResponse │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Resume Loop │ → Continue from turn 4                         │
    │  └──────┬──────┘                                                │
    │         │                                                       │
    │         ▼                                                       │
    │  ┌─────────────┐                                                │
    │  │ Finalize    │ → Clear checkpoint on success                  │
    │  └─────────────┘                                                │
    └─────────────────────────────────────────────────────────────────┘

    5. Checkpoint Mechanism

    5.1 Checkpoint Structure

    File: apps/api/services/runs/runMetadata.ts

    TypeScript
    export interface RunResumeCheckpoint {
      turn: number;                    // Current agent turn (1-indexed)
      fullRawResponse: string;         // Complete raw LLM response accumulated
      agentMessages: LlmChatMessage[]; // LLM conversation for continuation
      sandboxTagDetected: boolean;     // Whether <edward_sandbox> was seen
      totalToolCallsInRun: number;     // Running count of tool invocations
      outputTokens?: number;           // Token usage (if trackable)
      updatedAt: number;               // Timestamp (ms since epoch)
    }

    Stored in: runs.metadata.resumeCheckpoint (JSONB column)

    5.2 When Checkpoints Are Created

    File: apps/api/services/chat/session/loop/agentLoop.turnOutcome.ts

    Checkpoints created on continuation turns (when agent loop continues):

    TypeScript
    async function checkpointContinuationMessages(params): Promise<LlmChatMessage[]> {
      const agentMessages: LlmChatMessage[] = [{
        role: MessageRole.User,
        content: params.prompt,  // Continuation prompt
      }];
      
      await params.onCheckpoint?.({
        turn: params.turn,
        fullRawResponse: params.fullRawResponse,
        agentMessages,  // Only the continuation prompt
        sandboxTagDetected: params.sandboxTagDetected,
        totalToolCallsInRun: params.totalToolCallsInRun,
        outputTokens: params.outputTokens,
        updatedAt: Date.now(),
      });
      
      return agentMessages;
    }

    Continuation Scenarios:

    ScenarioWhenWhy Checkpoint
    Tool Results ContinuationTools called but no code outputNeed to send tool results back to LLM
    No-Progress NudgeNo tools called, no outputNeed to send nudge prompt

    NOT Checkpointed:

    • Initial turn (turn 0)
    • Final turn (when loop exits)
    • Turns that produce code output
    5.3 Checkpoint Persistence Flow

    File: apps/api/services/runs/agent-run-worker/processor.session.ts

    TypeScript
    onCheckpoint: async (checkpoint: WorkerCheckpoint) => {
      // 1. Get current metadata
      const currentMetadata = getMetadata();
      
      // 2. Merge checkpoint into metadata
      const mergedMetadata: AgentRunMetadata = {
        ...currentMetadata,
        resumeCheckpoint: {
          turn: checkpoint.turn,
          fullRawResponse: checkpoint.fullRawResponse,
          agentMessages: checkpoint.agentMessages,
          sandboxTagDetected: checkpoint.sandboxTagDetected,
          totalToolCallsInRun: checkpoint.totalToolCallsInRun,
          outputTokens: checkpoint.outputTokens,
          updatedAt: checkpoint.updatedAt,
        } satisfies RunResumeCheckpoint,
      };
      
      // 3. Persist to database
      const metadataPatch: Record<string, unknown> = JSON.parse(
        JSON.stringify(mergedMetadata)
      );
      await updateRun(runId, {
        currentTurn: checkpoint.turn,
        metadata: metadataPatch,
      });
      
      // 4. Update in-memory metadata
      onMetadataUpdated(mergedMetadata);
      onTurnUpdated(checkpoint.turn);
    };
    5.4 Checkpoint Usage on Worker Restart

    File: apps/api/services/chat/session/loop/agentLoop.runner.ts

    TypeScript
    export async function runAgentLoop(
      params: RunAgentLoopParams,
    ): Promise<RunAgentLoopResult> {
      // Initialize from checkpoint if present
      let fullRawResponse = resumeCheckpoint?.fullRawResponse ?? "";
      let agentMessages: LlmChatMessage[] =
        resumeCheckpoint?.agentMessages ?? initialMessages;
      let agentTurn = resumeCheckpoint?.turn ?? 0;
      let totalToolCallsInRun = resumeCheckpoint?.totalToolCallsInRun ?? 0;
      let sandboxTagDetected = resumeCheckpoint?.sandboxTagDetected ?? false;
      let outputTokens = resumeCheckpoint?.outputTokens;
    
      // Continue loop from checkpoint.turn
      while (agentTurn < MAX_AGENT_TURNS) {
        agentTurn += 1;
        // ... execute turn
      }
    }
    5.5 Checkpoint Cleanup

    File: apps/api/services/runs/agent-run-worker/processor.finalize.ts

    On successful completion:

    TypeScript
    await updateRun(params.runId, {
      status: mapped.status,
      state: mapped.state,
      currentTurn: params.currentTurn,
      metadata: {
        ...params.metadata,
        resumeCheckpoint: null,  // Clear checkpoint
        firstTokenLatencyMs: params.firstTokenLatencyMs,
        runDurationMs: durationMs,
      },
    });

    Why Clear?

    • Run is complete, no need to resume
    • Reduces metadata size
    • Prevents accidental replay of completed runs

    6. Event Sourcing & Replay

    6.1 Event Schema

    File: packages/shared/src/streamEvents.ts

    TypeScript
    export interface RunEventEnvelope {
      id: string;      // UUID
      runId: string;   // Run identifier
      seq: number;     // Sequential number (auto-increment)
      eventType: string; // Event type (text, meta, file_start, etc.)
      event: StreamEvent; // Actual event payload
    }

    Event Types:

    CategoryEvents
    Lifecyclemeta (SESSION_START, TURN_START, TURN_COMPLETE, SESSION_COMPLETE)
    Contenttext, thinking_*, file_*
    Sandboxsandbox_*, install_*, command
    Toolsweb_search, url_scrape
    Errorserror (fatal, recoverable)
    Metricsmetrics, rate_limit_status, preview_url, build_status
    6.2 Event Persistence

    Database Schema:

    Sql
    CREATE TABLE run_events (
      id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
      run_id UUID NOT NULL REFERENCES runs(id),
      seq BIGINT NOT NULL,  -- Auto-increment per run
      event_type TEXT NOT NULL,
      event JSONB NOT NULL,
      created_at TIMESTAMPTZ DEFAULT NOW(),
      UNIQUE(run_id, seq)
    );
    
    CREATE INDEX idx_run_events_run_seq ON run_events(run_id, seq);

    Append Function:

    TypeScript
    export async function appendRunEvent(params: {
      runId: string;
      eventType: string;
      event: Record<string, unknown>;
    }): Promise<{ id: string; seq: number }> {
      const [row] = await db
        .insert(runEvents)
        .values({
          runId: params.runId,
          eventType: params.eventType,
          event: params.event,
        })
        .returning({ id: runEvents.id, seq: runEvents.seq });
      
      return row;
    }
    6.3 Event Replay Query

    File: apps/api/services/run-event-stream-utils/service.ts

    TypeScript
    const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);
    
    // Implementation:
    export async function getRunEventsAfter(
      runId: string,
      lastSeq: number,
      limit: number,
    ): Promise<RunEventRow[]> {
      const rows = await db
        .select({
          id: runEvents.id,
          seq: runEvents.seq,
          eventType: runEvents.eventType,
          event: runEvents.event,
        })
        .from(runEvents)
        .where(
          and(
            eq(runEvents.runId, runId),
            gt(runEvents.seq, lastSeq)
          )
        )
        .orderBy(runEvents.seq)
        .limit(limit);
      
      return rows;
    }

    Query Pattern:

    Sql
    SELECT id, seq, event_type, event
    FROM run_events
    WHERE run_id = $1 AND seq > $2
    ORDER BY seq ASC
    LIMIT $3;
    6.4 Replay Batching

    Why Batch?

    • Large runs may have thousands of events
    • Don't overwhelm client with single massive response
    • Allow progressive rendering

    Batching Logic:

    TypeScript
    const MAX_REPLAY_BATCH = 500;
    
    while (!closed) {
      const replayRows = await getRunEventsAfter(runId, lastSeq, MAX_REPLAY_BATCH);
      if (replayRows.length === 0) break;
    
      for (const row of replayRows) {
        if (closed) break;
        if (row.seq <= lastSeq) continue;
    
        lastSeq = row.seq;
        const ok = emitPersistedEvent(row.id, row.event as StreamEvent);
        if (!ok) {
          await closeStream({ sendDone: false });
          return;
        }
      }
    }

    Multiple Batches:

    • First batch: 500 events
    • If more events exist, loop continues
    • Next iteration: next 500 events
    • Continues until all events replayed

    7. Frontend Reconnection Strategy

    7.1 Cursor Persistence

    File: apps/web/stores/chatStream/cursorPersistence.ts

    TypeScript
    const SSE_CURSOR_STORAGE_PREFIX = "sse_cursor:";
    
    export function createStreamCursorPersistence(): StreamCursorPersistence {
      const streamCursor = new Map<string, string>();  // In-memory cache
    
      const persistCursor = (chatId: string, runId: string, lastEventId: string): void => {
        const key = `${chatId}:${runId}`;
        streamCursor.set(key, lastEventId);
        try {
          sessionStorage.setItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`, lastEventId);
        } catch {
          // sessionStorage may be unavailable in private-browsing
        }
      };
    
      const readCursor = (chatId: string, runId: string): string | undefined => {
        const key = `${chatId}:${runId}`;
        const inMemory = streamCursor.get(key);
        if (inMemory) return inMemory;
        
        try {
          return sessionStorage.getItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`) ?? undefined;
        } catch {
          return undefined;
        }
      };
    
      const clearCursor = (chatId: string, runId: string): void => {
        const key = `${chatId}:${runId}`;
        streamCursor.delete(key);
        try {
          sessionStorage.removeItem(`${SSE_CURSOR_STORAGE_PREFIX}${key}`);
        } catch {
          // no-op
        }
      };
    
      return { persistCursor, readCursor, clearCursor };
    }

    Storage Strategy:

    LayerPurposeLifetime
    In-memory MapFast access during sessionTab lifetime
    sessionStoragePersistence across refreshTab lifetime (survives refresh)

    Why Not localStorage?

    • Cursors are session-specific
    • Don't persist across browser restarts
    • Avoid stale cursors for old runs
    7.2 Stream Resumption

    File: apps/web/stores/chatStream/resumeRunStream.ts

    TypeScript
    export function createResumeRunStream(params): (chatId: string, runId: string) => void {
      return (chatId: string, runId: string) => {
        const resumeCursor = readCursor(chatId, runId);
        
        const response = await openRunEventsStream(chatId, runId, {
          signal: controller.signal,
          ...(resumeCursor ? { lastEventId: resumeCursor } : {}),
        });
    
        const streamResult = await processStreamResponse({
          response,
          chatId,
          dispatch,
          onCursorUpdate: (id: string, rId: string) =>
            persistCursor(resolvedChatId, rId, id),
          // ...
        });
    
        // Clear cursor on successful completion
        if (streamResult) {
          clearCursor(resolvedChatId, runId);
        }
      };
    }
    7.3 API Client

    File: apps/web/lib/api/chat.ts

    TypeScript
    export async function openRunEventsStream(
      chatId: string,
      runId: string,
      options?: { lastEventId?: string; signal?: AbortSignal },
    ): Promise<Response> {
      const params = new URLSearchParams();
      if (options?.lastEventId) {
        params.set("lastEventId", options.lastEventId);
      }
    
      const queryString = params.toString();
      return fetchApiResponse(
        `/chat/${chatId}/runs/${runId}/stream${queryString ? `?${queryString}` : ""}`,
        { method: "GET", signal: options?.signal }
      );
    }

    Request Format:

    Plain text
    GET /chat/{chatId}/runs/{runId}/stream?lastEventId=seq:12345
    Headers:
      Accept: text/event-stream
      Cache-Control: no-cache
      Connection: keep-alive
    7.4 Page-Level Orchestration

    File: apps/web/hooks/chat/useChatPageOrchestration.ts

    Active Run Lookup:

    TypeScript
    const AGGRESSIVE_ACTIVE_RUN_LOOKUP_ATTEMPTS = 6;
    const ACTIVE_RUN_LOOKUP_BASE_RETRY_MS = 350;
    const ACTIVE_RUN_LOOKUP_MAX_RETRY_MS = 2000;
    
    function getRetryDelayMs(attempt: number): number {
      return Math.min(
        ACTIVE_RUN_LOOKUP_MAX_RETRY_MS,
        ACTIVE_RUN_LOOKUP_BASE_RETRY_MS * 2 ** attempt
      );
    }
    
    // Exponential backoff lookup
    while (notFoundAttempts < maxNotFoundAttempts) {
      const response = await fetchActiveRun({
        signal: abortController.signal,
        staleTimeMs: 0,
      });
      const activeRun = response?.data.run;
    
      if (activeRun) {
        resumeRunStream(chatId, activeRun.id);
        return;
      }
    
      notFoundAttempts += 1;
      await waitForRetry(notFoundAttempts - 1, abortController.signal);
    }

    Lookup Strategy:

    ModeAttemptsUse Case
    Aggressive6Page load, user expects active run
    Single1Background check
    Defer0Streaming already in progress
    7.5 Stream Processor with Replay

    File: apps/web/lib/streaming/processors/chatStreamProcessor.ts

    Built-in Replay:

    TypeScript
    const MAX_REPLAY_ATTEMPTS = 3;
    const REPLAY_BACKOFF_BASE_MS = 500;
    
    if (
      !fatalError &&
      !sessionCompleted &&
      replayAttempt < MAX_REPLAY_ATTEMPTS &&
      metaEvent?.runId
    ) {
      try {
        await new Promise((resolve) =>
          setTimeout(
            resolve,
            Math.min(REPLAY_BACKOFF_BASE_MS * 2 ** replayAttempt, 5_000)
          )
        );
        
        const replayResponse = await openRunEventsStream(
          activeChatId,
          metaEvent.runId,
          lastEventId ? { lastEventId } : undefined
        );
        
        const replayResult = await processStreamResponse({
          response: replayResponse,
          replayAttempt: replayAttempt + 1,
          replayCursor: lastEventId,
          // ...
        });
    
        if (replayResult) {
          return mergeStreamResults(result, replayResult);
        }
      } catch {
        // Replay failed, report error
      }
    }

    Why Client-Side Replay?

    • Backend stream may end prematurely
    • Client detects incomplete session
    • Automatic retry with backoff
    • Merges results seamlessly

    8. Worker Resumption

    8.1 Worker Lifecycle

    File: apps/api/services/runs/agent-run-worker/processor.ts

    Startup:

    TypeScript
    export async function processAgentRunJob(
      runId: string,
      publisher: Publisher,
    ): Promise<void> {
      // 1. Load run from DB
      const run = await getRunById(runId);
      if (!run) throw new Error(`Run not found: ${runId}`);
    
      // 2. Check if already terminal
      if (isTerminalRunStatus(run.status)) {
        return;  // Already finished
      }
    
      // 3. Check for existing terminal event
      const terminalEvent = await getLatestSessionCompleteEvent(runId);
      if (terminalEvent) {
        // Run completed but status not updated
        await updateRunWithLog(runId, {
          status: mapped.status,
          state: mapped.state,
        }, "terminal-event-already-present");
        return;
      }
    
      // 4. Subscribe to cancel signal
      await cancelSub.subscribe(runCancelChannel);
      cancelSub.on("message", () => {
        workerAbort.abort();  // Stop everything
      });
    
      // 5. Parse metadata (includes checkpoint)
      const metadata = parseAgentRunMetadata(run.metadata);
      // metadata.resumeCheckpoint may be present
    
      // 6. Initialize progress from checkpoint
      const progress: RunEventProgress = {
        currentTurn: metadata.resumeCheckpoint?.turn ?? run.currentTurn ?? 0,
        // ...
      };
    }
    8.2 Checkpoint Detection
    TypeScript
    const metadata = parseAgentRunMetadata(run.metadata);
    
    // Check for checkpoint
    if (metadata.resumeCheckpoint) {
      logger.info(
        { runId, turn: metadata.resumeCheckpoint.turn },
        "Resuming run from checkpoint"
      );
    } else {
      logger.info({ runId }, "Starting fresh run");
    }
    8.3 Resumption Flow
    TypeScript
    await runStreamSession(
      buildWorkerRunSessionInput({
        req: fakeReq,
        res: capturedRes,
        externalSignal: workerAbort.signal,
        workflow: metadata.workflow,
        run,
        decryptedApiKey,
        historyMessages,
        projectContext,
        runId,
        resumeCheckpoint: metadata.resumeCheckpoint,  // Pass checkpoint
        onCheckpoint: async (checkpoint) => {
          // Persist checkpoint on each continuation
          await updateRun(runId, {
            currentTurn: checkpoint.turn,
            metadata: metadataPatch,
          });
        },
      })
    );
    8.4 Finalization

    File: apps/api/services/runs/agent-run-worker/processor.finalize.ts

    On Success:

    TypeScript
    export async function finalizeSuccessfulRun(params): Promise<void> {
      await updateRun(params.runId, {
        status: mapped.status,
        state: mapped.state,
        currentTurn: params.currentTurn,
        terminationReason: params.latestTerminationReason,
        completedAt: finishedAt,
        metadata: {
          ...params.metadata,
          resumeCheckpoint: null,  // Clear checkpoint
          firstTokenLatencyMs: params.firstTokenLatencyMs,
          runDurationMs: durationMs,
        },
      });
    }

    On Failure:

    TypeScript
    export async function finalizeFailedRun(params): Promise<void> {
      // 1. Flush pending events
      await params.flushCapturedEvents();
    
      // 2. Persist error event
      await persistRunEventWithLog(params.runId, errorEvent, params.publisher, "...");
    
      // 3. Persist completion meta event
      await persistRunEventWithLog(params.runId, completionMetaEvent, params.publisher, "...");
    
      // 4. Update run status (checkpoint NOT cleared - allows retry)
      await updateRunWithLog(params.runId, {
        status: RUN_STATUS.FAILED,
        state: "FAILED",
        currentTurn: params.currentTurn,
        terminationReason: StreamTerminationReason.STREAM_FAILED,
        errorMessage: latestErrorMessage,
        completedAt: new Date(),
      });
    }

    Why Keep Checkpoint on Failure?

    • Allows manual retry from checkpoint
    • Preserves state for debugging
    • Can be cleared by explicit retry logic

    9. Failure Modes & Recovery

    9.1 Failure Mode Matrix
    Failure ModeDetectionRecoveryData Loss
    Client disconnectreq.close eventReconnect with cursorNone
    Slow clientBackpressure queueGraceful closeNone (events persisted)
    Worker crashJob timeoutQueue retry + checkpointMinimal (since last checkpoint)
    Redis unavailableConnection errorFallback to PostgreSQL pollingNone
    PostgreSQL unavailableQuery errorRetry with backoffTemporary (events buffered)
    Stream timeoutGuard timerTerminate with reasonNone (events persisted)
    Network blipSSE connection dropAuto-reconnectNone
    9.2 Client Disconnect Handling

    File: apps/api/services/run-event-stream-utils/service.ts

    TypeScript
    req.on("close", () => {
      void closeStream({ sendDone: false });
    });
    
    const closeStream = async (options?: { sendDone?: boolean }): Promise<void> => {
      if (closed) return;
      closed = true;
    
      // Clean up heartbeat
      if (heartbeat) {
        clearInterval(heartbeat);
        heartbeat = null;
      }
    
      // Unsubscribe from Redis
      if (unsubscribeRedisChannel) {
        await unsubscribeRedisChannel();
      }
    
      // Close response
      if (!res.writableEnded) {
        res.end();
      }
    };

    What Happens:

    1. Client closes connection (refresh, navigate away)
    2. Backend receives req.close event
    3. Unsubscribe from Redis pub/sub
    4. Stop heartbeats
    5. Close response gracefully
    6. Worker continues execution (independent of client connection)
    9.3 Slow Client Handling

    File: apps/api/services/run-event-stream-utils/service.ts

    TypeScript
    configureSSEBackpressure(res, {
      onSlowClient: () => {
        void closeStream({ sendDone: false });
      },
    });

    Detection:

    • Write queue exceeds threshold
    • res.write() returns false (backpressure)
    • Queue not draining fast enough

    Recovery:

    1. Close stream gracefully
    2. Client can reconnect with cursor
    3. Events already persisted → replay on reconnect
    9.4 Worker Crash Recovery

    File: apps/api/services/runs/agent-run-worker/processor.ts

    Crash Detection:

    • Worker process dies (OOM, panic, etc.)
    • Job queue detects timeout
    • Run remains in "RUNNING" state

    Recovery:

    1. Queue retries job (configurable retries)
    2. New worker picks up job
    3. Load run from DB
    4. Check for checkpoint in metadata
    5. Resume from checkpoint.turn

    What's Lost:

    • Work since last checkpoint
    • Typically: partial turn execution
    • Events persisted before crash are safe
    9.5 Stream Timeout

    File: apps/api/services/chat/session/orchestrator/streamGuards.ts

    TypeScript
    const STREAM_GUARD_TIMEOUT_MS = 20 * 60 * 1000;  // 20 minutes
    
    export function setupStreamGuards(params): StreamGuardHandles {
      // Timeout guard
      const timeout = setTimeout(() => {
        params.abortController.abort();
        params.setTerminationReason(StreamTerminationReason.STREAM_TIMEOUT);
      }, STREAM_GUARD_TIMEOUT_MS);
    
      return { clear: () => clearTimeout(timeout) };
    }

    On Timeout:

    1. Abort controller triggered
    2. Agent loop stops
    3. Termination reason set to STREAM_TIMEOUT
    4. Session finalized with timeout reason
    5. Client receives error event
    9.6 Redis Unavailable

    File: apps/api/lib/redisPubSub.ts

    TypeScript
    export async function subscribeToRedisChannel(
      channel: string,
      handler: (payload: string) => void,
    ): Promise<() => Promise<void>> {
      // ... subscription logic
    
      subscriber.on("error", (error: unknown) => {
        logger.error({ error }, "Redis pub/sub subscriber error");
      });
    }

    Fallback Strategy:

    • Events still persisted to PostgreSQL
    • Client reconnects → replay from PostgreSQL
    • Live events missed during Redis outage
    • Mitigation: Poll PostgreSQL for new events

    10. Infrastructure Cross-Questions

    10.1 PostgreSQL

    Q: What's the event volume per run?

    Typical run:

    • Meta events: 5-10 (session start, turn start/complete x5, session complete)
    • Text events: 50-200 (narrative, explanations)
    • File events: 10-50 (file_start, file_content xN, file_end)
    • Command events: 5-20 (command execution results)
    • Total: 70-300 events per run

    Q: What's the storage requirement?

    Per run estimate:

    • Event envelope: ~200 bytes overhead
    • Event payload: ~500 bytes average (varies widely)
    • Per event: ~700 bytes
    • Per run (200 events): ~140 KB

    At 10,000 runs/day:

    • Daily: 1.4 GB
    • Monthly: 42 GB
    • Yearly: 500 GB

    Q: Should we partition the run_events table?

    Recommendation: Yes, partition by created_at (monthly partitions)

    Sql
    CREATE TABLE run_events (
      id UUID DEFAULT gen_random_uuid(),
      run_id UUID NOT NULL,
      seq BIGINT NOT NULL,
      event_type TEXT NOT NULL,
      event JSONB NOT NULL,
      created_at TIMESTAMPTZ DEFAULT NOW(),
      PRIMARY KEY (run_id, seq, created_at)
    ) PARTITION BY RANGE (created_at);
    
    CREATE TABLE run_events_2025_01 PARTITION OF run_events
      FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

    Benefits:

    • Faster cleanup (drop old partitions)
    • Improved query performance (partition pruning)
    • Easier backup/restore

    Q: What indexes are needed?

    Sql
    -- Primary lookup (replay)
    CREATE INDEX idx_run_events_run_seq ON run_events(run_id, seq);
    
    -- Cleanup (find old events)
    CREATE INDEX idx_run_events_created_at ON run_events(created_at);
    
    -- Covering index for replay (avoid table lookup)
    CREATE INDEX idx_run_events_run_seq_covering 
      ON run_events(run_id, seq) 
      INCLUDE (id, event_type, event);
    10.2 Redis

    Q: What's the Redis memory footprint?

    Pub/sub channels:

    • One channel per active run: edward:run-events:{runId}
    • Channel name: ~50 bytes
    • Subscriber overhead: ~100 bytes per subscriber
    • Per active run: ~150 bytes

    At 1,000 concurrent runs:

    • Channel memory: ~150 KB (negligible)

    Q: What about event buffering?

    Events are NOT buffered in Redis (only published):

    • Publisher sends event → all subscribers receive
    • No persistence in Redis
    • PostgreSQL is the source of truth

    Q: Redis cluster vs standalone?

    Recommendation: Standalone for pub/sub

    Why:

    • Pub/sub doesn't benefit from clustering (messages not persisted)
    • Simpler deployment
    • Lower latency

    Failover:

    • Sentinel for automatic failover
    • Clients reconnect on failover
    • Events persisted to PostgreSQL during outage

    Q: What's the pub/sub latency?

    Typical latency:

    • Publish to subscriber: <1ms (same region)
    • 99th percentile: <5ms

    Impact:

    • Live events arrive within milliseconds
    • Replay from PostgreSQL is the bottleneck (not Redis)
    10.3 SSE Infrastructure

    Q: How many concurrent SSE connections?

    Estimate:

    • Active runs: 1,000
    • Connections per run: 1-3 (user may have multiple tabs)
    • Concurrent connections: 1,000-3,000

    Q: Can a single server handle this?

    Node.js SSE capacity:

    • Memory per connection: ~10-50 KB
    • CPU per connection: minimal (event-driven)
    • Single server: 10,000+ connections feasible

    Q: What about horizontal scaling?

    Challenge: SSE connections are sticky (can't load balance mid-stream)

    Solutions:

    1. Sticky sessions (recommended)

      • Load balancer routes by cookie/session
      • Same server handles entire stream
      • Simple, effective
    2. Connection migration

      • On server shutdown, notify clients
      • Clients reconnect to new server
      • Complex, rarely needed
    3. WebSocket + migration

      • Use WebSocket instead of SSE
      • Supports connection migration
      • More complex protocol

    Q: What's the heartbeat strategy?

    TypeScript
    const HEARTBEAT_INTERVAL_MS = 15_000;  // 15 seconds
    
    heartbeat = setInterval(() => {
      sendSSEComment(res, "run-events-heartbeat");
    }, HEARTBEAT_INTERVAL_MS);

    Why Heartbeats?

    • Keep connection alive (prevent timeout)
    • Detect dead connections
    • Load balancers may close idle connections

    Q: How to handle connection limits?

    Browser limits:

    • Chrome: 6 connections per domain
    • Firefox: 6 connections per domain
    • Safari: 6 connections per domain

    Mitigation:

    • Single SSE connection per chat
    • Reuse connection for multiple runs
    • Use domain sharding if needed (not recommended)
    10.4 Worker Scaling

    Q: How many workers?

    Formula:

    Plain text
    workers = ceil(concurrent_runs / runs_per_worker)

    Typical:

    • Concurrent runs: 1,000
    • Runs per worker: 10-50 (depends on LLM latency)
    • Workers needed: 20-100

    Q: Worker autoscaling?

    Metrics to track:

    • Queue depth (Redis)
    • Average run duration
    • Worker CPU/memory

    Scaling policy:

    • Scale up: Queue depth > threshold for 2 minutes
    • Scale down: Queue depth = 0 for 10 minutes

    Q: What about worker affinity?

    No affinity needed:

    • Workers are stateless
    • State in PostgreSQL + checkpoint
    • Any worker can process any job
    10.5 Data Retention

    Q: How long to keep events?

    Recommendation: 30 days

    Why:

    • Supports debugging recent issues
    • Allows replay for active users
    • Compliance (audit trail)

    Cleanup strategy:

    Sql
    -- Daily cleanup job
    DELETE FROM run_events
    WHERE created_at < NOW() - INTERVAL '30 days';

    Q: What about checkpoint retention?

    Recommendation: Clear on completion, keep on failure

    Why:

    • Completed runs don't need checkpoint
    • Failed runs may need manual retry
    • Checkpoints are small (few KB)
    10.6 Monitoring

    Q: What metrics to track?

    Stream Health:

    • stream_reconnect_count: Reconnections per run
    • stream_replay_events: Events replayed per reconnect
    • stream_duration_ms: Total stream duration
    • stream_termination_reason: Distribution of termination reasons

    Checkpoint Health:

    • checkpoint_created_count: Checkpoints created
    • checkpoint_resume_count: Runs resumed from checkpoint
    • checkpoint_turn_distribution: Turns per checkpoint

    Event Volume:

    • events_persisted_count: Events per run
    • events_replay_lag_ms: Time to replay events
    • redis_pubsub_latency_ms: Pub/sub latency

    Q: What alerts to set?

    MetricThresholdAction
    stream_reconnect_count > 5Per runInvestigate network issues
    checkpoint_resume_count > 10%Of runsInvestigate worker stability
    events_replay_lag_ms > 5000P99Optimize replay queries
    redis_pubsub_latency_ms > 100P99Check Redis health

    11. Key Files Reference

    Core Orchestration
    FilePurposeLines
    apps/api/services/runs/runEvents.service.tsEvent persistence~40
    apps/api/services/run-event-stream-utils/service.tsSSE streaming + replay~250
    apps/api/services/sse-utils/service.tsSSE backpressure handling~200
    apps/api/services/runs/agent-run-worker/processor.tsWorker execution~350
    apps/api/services/runs/agent-run-worker/processor.helpers.tsEvent capture~150
    apps/api/services/runs/agent-run-worker/processor.finalize.tsRun finalization~150
    Checkpoint & Continuation
    FilePurposeLines
    apps/api/services/runs/runMetadata.tsCheckpoint schema~100
    apps/api/services/runs/agent-run-worker/processor.session.tsCheckpoint persistence~100
    apps/api/services/chat/session/loop/agentLoop.runner.tsAgent loop with checkpoint~200
    apps/api/services/chat/session/loop/agentLoop.turnOutcome.tsContinuation logic~250
    apps/api/services/chat/session/shared/checkpoint.types.tsCheckpoint types~15
    apps/api/services/chat/session/shared/continuation.tsContinuation prompts~250
    Frontend
    FilePurposeLines
    apps/web/stores/chatStream/cursorPersistence.tsCursor storage~50
    apps/web/stores/chatStream/resumeRunStream.tsStream resumption~150
    apps/web/lib/api/chat.tsSSE API client~100
    apps/web/lib/streaming/processors/chatStreamProcessor.tsStream processing~400
    apps/web/hooks/chat/useChatPageOrchestration.tsPage orchestration~250
    Shared Types
    FilePurposeLines
    packages/shared/src/streamEvents.tsEvent type definitions~250
    Infrastructure
    FilePurposeLines
    apps/api/lib/redisPubSub.tsRedis pub/sub~80
    apps/api/lib/redis.tsRedis client creation~20
    Tests
    FilePurpose
    apps/api/tests/services/runs/agentRun.processor.session.test.tsCheckpoint tests
    apps/api/tests/services/runs/runMetadata.test.tsMetadata parsing tests
    apps/api/tests/controllers/chat/streamSession.shared.test.tsContinuation prompt tests
    apps/api/tests/services/chat/runEventStream.utils.service.test.tsStream replay tests

    Appendix A: Glossary

    TermDefinition
    CheckpointSnapshot of agent loop state for resumption
    CursorLast processed event ID (client-side)
    Event SourcingPattern of persisting all state changes as events
    ReplayRe-emitting historical events on reconnection
    SSEServer-Sent Events (one-way streaming from server to client)
    TurnSingle iteration of agent loop (LLM call + tool execution)
    WorkerBackground process that executes agent runs

    Appendix B: Sequence Diagrams

    B.1 Normal Stream Flow
    Plain text
    Browser         API              Worker           Postgres         Redis
      |              |                |                 |               |
      |--POST /chat->|                |                 |               |
      |              |--Create Run---->|                 |               |
      |              |                |                 |--Insert------>|
      |              |--Enqueue Job-->|                 |               |
      |              |                |                 |               |
      |<--SSE Open---|                |                 |               |
      |              |                |                 |               |
      |              |              Pick Up             |               |
      |              |                |                 |               |
      |              |                |--Load Run------>|               |
      |              |                |                 |               |
      |              |              Stream Session      |               |
      |              |                |                 |               |
      |              |                |--Persist Event->|               |
      |              |                |                 |--Publish----->|
      |<--Event------|                |                 |               |
      |              |                |                 |               |
      |              |              (repeat events)     |               |
      |              |                |                 |               |
      |              |                |--Finalize------>|               |
      |<--[DONE]-----|                |                 |               |
      |              |                |                 |               |
    B.2 Reconnection Flow
    Plain text
    Browser         API              Worker           Postgres         Redis
      |              |                |                 |               |
      |--Refresh---->|                |                 |               |
      |              |                |                 |               |
      |  Read Cursor |                |                 |               |
      |  (sessionStorage)              |                 |               |
      |              |                |                 |               |
      |--GET /stream?lastEventId=12345                  |               |
      |              |                |                 |               |
      |              |--Replay Events->|                |               |
      |              |<----------------|                 |               |
      |<--Replay Events (seq > 12345)                   |               |
      |              |                |                 |               |
      |              |--Subscribe--------------------->|               |
      |              |                |                 |               |
      |              |                |                 |--Live Event-->|
      |<--Live Event-|                |                 |               |
      |              |                |                 |               |
      |  Save Cursor |                |                 |               |
      |  (sessionStorage)              |                 |               |
      |              |                |                 |               |

    Appendix C: Configuration Reference

    Environment Variables
    VariableDefaultDescription
    STREAM_GUARD_TIMEOUT_MS1200000Stream timeout (20 min)
    MAX_AGENT_CONTINUATION_PROMPT_CHARS20000Continuation prompt limit
    MAX_AGENT_TURNS10Max agent loop iterations
    MAX_AGENT_TOOL_CALLS_PER_TURN10Tool budget per turn
    MAX_AGENT_TOOL_CALLS_PER_RUN50Tool budget per run
    MAX_REPLAY_BATCH500Events per replay batch
    HEARTBEAT_INTERVAL_MS15000SSE heartbeat interval
    Redis Channels
    ChannelPurpose
    edward:run-events:{runId}Live event pub/sub
    edward:run-cancel:{runId}Cancel signal
    agent-runsWorker job queue
    Storage Keys
    KeyPurpose
    sse_cursor:{chatId}:{runId}Frontend cursor

    Document Version: 1.0
    Last Updated: March 25, 2026
    Author: Principal Engineering Review