InkdownInkdown
Start writing

Study

59 files·8 subfolders

Shared Workspace

Study
core

05-Streaming

Shared from "Study" on Inkdown

Streaming Architecture

Overview

The streaming system delivers real-time Server-Sent Events (SSE) to clients, enabling live updates as AI processes requests, executes tools, and generates responses.


SSE (Server-Sent Events) Protocol

Plain text
Client                    Server
  │                         │
  │ ─────── REQUEST ──────> │
  │                         │
  │ <───── SSE STREAM ─────│
  │   event: message       │
  │   data: {"type":...}   │
  │                         │
  │   event: message       │
  │   data: {"content":..} │
  │                         │
  │   event: message       │
  │   data: {"toolCall":..}│
  │                         │
  │   event: end           │
  │   data: {}             │
  │                         │

SSE Format:

programming-language-concepts.md
zero-language-explanation.md
DB
01-introduction.md
02-relational-databases.md
03-database-design.md
04-indexing.md
05-transactions-acid.md
06-nosql-databases.md
07-query-optimization.md
08-replication-ha.md
09-sharding-partitioning.md
10-caching-strategies.md
11-cap-theorem.md
12-connection-pooling.md
13-backup-recovery.md
14-monitoring.md
15-database-selection.md
README.md
JS
Event loop
Merlin Backend
01-Orchestration.md
02-DeepResearch.md
03-Search.md
04-Scraping.md
05-Streaming.md
06-MultiProviderLLM.md
07-MemoryAndContext.md
08-ErrorHandling.md
09-RateLimiting.md
10-TaskQueue.md
11-SecurityAndAuth.md
Orchestration-2nd-draft
OpenAI Agents Python
00_OVERVIEW.md
01_AGENT_SYSTEM.md
02_RUNNER_SYSTEM.md
03_TOOL_SYSTEM.md
04_ITEMS_SYSTEM.md
05_GUARDRAILS.md
06_HANDOFFS.md
07_MEMORY_SESSIONS.md
08_MODEL_PROVIDERS.md
09_SANDBOX_SYSTEM.md
10_TRACING.md
11_RUN_STATE.md
12_CONTEXT.md
13_LIFECYCLE_HOOKS.md
14_CONFIGURATION.md
15_ERROR_HANDLING.md
16_STREAMING.md
17_EXTENSIONS.md
18_MCP_INTEGRATION.md
19_BEST_PRACTICES.md
20_ARCHITECTURE_PATTERNS.md
opencode-study
context-handling
core
Python
Alembic
Basics
sqlalchemy - fastapi
SQLAlchemy overview
tweets
system_design_for_agentic_apps.md
Plain text
event: message
data: {"index": 0, "type": "text", "text": "Hello"}

event: progress
data: {"payload": {"name": "Searching", "status": "IN_PROGRESS"}}

event: end
data: {}

Streamer Core Functions

File: src/server/repositories/streamer/streamer.ts

1. Initialize SSE Connection

File: src/server/repositories/streamer/streamer.ts:129

TypeScript
const init = (response: Response) => {
	response.writeHead(200, {
		"Cache-Control": "no-cache",
		"Content-Type": "text/event-stream",
		"Access-Control-Allow-Origin": "*",
		"Access-Control-Allow-Headers": "*",
		"Connection": "keep-alive",
	});
};

Sets up the SSE connection with proper headers.

2. Stream V1 (Legacy)

File: src/server/repositories/streamer/streamer.ts:208

TypeScript
const stream = async (
	_req: Request,
	response: NodeJS.WritableStream,
	inputStream: NodeJS.ReadableStream,
): Promise<StreamedChunks> => {
	inputStream.setEncoding("utf8");

	// Signal initialization
	appendEvent("INIT_MESSAGE_CONTENT", {});

	return new Promise<StreamedChunks>((resolve) => {
		let chunkBuffer = "";
		const contentChunks: string[] = [];
		const reasoningContentChunks: Array<ChoiceDelta["reasoning"]> = [];
		const citations: CitationBlocks[] = [];
		const passThrough = new PassThrough();

		let toolCallsAgg:
			| Array<{ name: string; arguments: string; id: string }>
			| undefined;
		let usage:
			| {
					prompt_tokens: number;
					completion_tokens: number;
					total_tokens: number;
			  }
			| undefined;

		inputStream.pipe(passThrough);

		// Process stream chunks
		(async (): Promise<StreamedChunks> => {
			let contentSize = 5; // Chars per write
			let timeout = 5; // Ms delay
			let citationIndex = -1;

			outer: for await (const chunk of passThrough) {
				const { events, bfr } = extractEventsFromChunk(chunk, chunkBuffer);
				chunkBuffer = bfr;

				for (let i = 0; i < events.length; i++) {
					// CONTENT PROCESSING
					contents: {
						const content = events?.[i]?.choices?.[0]?.delta?.content ?? "";
						const contentIndex = events?.[i]?.choices?.[0]?.delta?.index ?? -1;

						// Handle citations
						if (
							citationIndex != -1 &&
							contentIndex != -1 &&
							citationIndex < contentIndex
						) {
							contentChunks.push(sendCitation(response, citations.at(-1)));
							citationIndex = -1;
						}

						// Stream content in small chunks with adaptive pacing
						let idx = 0;
						while (idx < content.length) {
							if (shouldReturnEarly()) break outer;

							const contentSlice = content.slice(idx, idx + contentSize);
							contentChunks.push(contentSlice);

							response.write(
								Buffer.from(getMessage({ data: { content: contentSlice } })),
							);

							idx += contentSize;

							// Adaptive pacing based on buffer
							// @ts-expect-error Accessing internal state
							const bufferLength = passThrough._readableState.buffer.length;
							switch (true) {
								case bufferLength < 1:
									contentSize = 2;
									timeout = 15;
									break;
								case bufferLength < 5:
									contentSize = 3;
									timeout = 10;
									break;
								case bufferLength < 10:
									contentSize = 4;
									timeout = 5;
									break;
								default:
									contentSize = 5;
									timeout = 0;
									break;
							}
							await new Promise((resolve) => setTimeout(resolve, timeout));
						}
					}

					// TOOL CALLS PROCESSING
					toolCalls: {
						try {
							const toolCalls = events?.[i]?.choices?.[0]?.delta?.tool_calls;
							if (toolCalls && toolCalls.length > 0) {
								if (!toolCallsAgg) toolCallsAgg = [];

								for (const toolCall of toolCalls) {
									if ("type" in toolCall) {
										// New tool call
										toolCallsAgg[toolCall.index] = {
											name: toolCall.function.name,
											arguments: toolCall.function.arguments,
											id: toolCall.id,
										};
									} else {
										// Continuation of existing
										const toolAtIndex = toolCallsAgg[toolCall.index];
										if (toolAtIndex) {
											toolAtIndex.arguments += toolCall.function.arguments;
										} else {
											throw new Error("Tool agg does not exist");
										}
									}
								}
							}
						} catch (e) {
							logger.error(e, "ERROR/STREAMER");
						}
					}

					// REASONING CONTENT (for models like Claude)
					reasoning_content: {
						const reasoningContent =
							events?.[i]?.choices?.[0]?.delta?.reasoning;
						if (reasoningContent) {
							if (shouldReturnEarly()) break outer;
							reasoningContentChunks.push(reasoningContent);
							// Note: Not streamed to client (internal use)
						}
					}

					// CITATIONS
					citation: {
						const citation = events?.[i].choices?.[0]?.delta?.citation;
						if (citation) {
							if (shouldReturnEarly()) break outer;
							citations.push(citation);
							citationIndex = events?.[i].choices?.[0]?.delta?.index ?? -1;
						}
					}

					// USAGE METRICS
					usage: {
						const usageChunk = events?.[i]?.usage;
						if (usageChunk) {
							usage = { ...clone(usage), ...usageChunk };
						}
					}
				}
			}

			// Finalize tool calls
			const toolCalls = toolCallsAgg?.reduce(
				(acc, curr) => {
					if (curr.arguments) {
						if (!acc) acc = [];
						try {
							acc.push({
								type: "function",
								function: { name: curr.name, arguments: curr.arguments },
								id: curr.id,
							});
						} catch (e) {
							logger.error(e, "ERROR/STREAMER");
						}
					}
					return acc;
				},
				undefined as StreamedChunks["toolCalls"],
			);

			return {
				content: contentChunks,
				reasoningContent: reasoningContentChunks,
				citations,
				toolCalls,
				usage,
			};
		})()
			.then(resolve)
			.catch((error) => {
				logger.error(error, "ERROR/STREAMER");
				passThrough.destroy();
				// Graceful degradation - resolve with partial results
				resolve({
					content: contentChunks,
					reasoningContent: reasoningContentChunks,
					citations,
					toolCalls: undefined,
					usage,
				});
			});
	});
};

Key Features:

  • Adaptive Pacing: Slows down when buffer is full (better UX)
  • Tool Call Assembly: Accumulates multi-chunk tool calls
  • Citation Handling: Special format for source citations
  • Graceful Errors: Resolves with partial results on failure
3. Stream V2 (Current)

File: src/server/repositories/streamer/streamer.ts:469

TypeScript
const streamV2 = async (
	_req: Request,
	response: NodeJS.WritableStream,
	inputStream: NodeJS.ReadableStream,
	config: {
		contentIndex: number; // CRITICAL: Index for ordering
		streamAsToolResult?: {
			// If streaming tool result
			toolResultIndex: number;
		};
		suppressUIStreaming?: boolean; // Don't send to UI (sub-agent)
	},
): Promise<StreamedChunks> => {
	inputStream.setEncoding("utf8");

	return new Promise<StreamedChunks>((resolve) => {
		let chunkBuffer = "";
		const contentChunks: string[] = [];
		const reasoningContentChunks: Array<ChoiceDelta["reasoning"]> = [];
		const citations: CitationBlocks[] = [];
		const passThrough = new PassThrough();

		const toolCallsAgg: Array<{ name: string; arguments: string; id: string }> =
			[];
		let usage: CompletionUsage | undefined;

		inputStream.pipe(passThrough);

		(async (): Promise<StreamedChunks> => {
			outer: for await (const chunk of passThrough) {
				if (shouldReturnEarly()) break outer;

				const { events, bfr } = extractEventsFromChunk(chunk, chunkBuffer);
				chunkBuffer = bfr;

				for (let i = 0; i < events.length; i++) {
					// CONTENT PROCESSING
					contents: {
						const content = events?.[i]?.choices?.[0]?.delta?.content ?? "";
						contentChunks.push(content);

						if (content && !config.suppressUIStreaming) {
							if (config.streamAsToolResult) {
								// Streaming as sub-agent tool result
								response.write(
									Buffer.from(
										getMessage({
											data: {
												content: "",
												index: config.contentIndex,
												type: "toolCallResult",
												toolCallResult: {
													index: config.streamAsToolResult.toolResultIndex,
													delta: content, // Partial content
												},
											},
										}),
									),
								);
							} else {
								// Standard text streaming
								response.write(
									Buffer.from(
										getMessage({
											data: {
												content: "",
												index: config.contentIndex,
												type: "text",
												text: content,
											},
										}),
									),
								);
							}
						}
					}

					// TOOL CALLS (simplified from V1)
					toolCalls: {
						try {
							const toolCalls = events?.[i]?.choices?.[0]?.delta?.tool_calls;
							if (toolCalls && toolCalls.length > 0) {
								const firstToolCall = toolCalls[0];
								if ("type" in firstToolCall) {
									toolCallsAgg[firstToolCall.index] = {
										name: firstToolCall.function.name,
										arguments: firstToolCall.function.arguments,
										id: firstToolCall.id,
									};
								} else {
									const toolAtIndex = toolCallsAgg[firstToolCall.index];
									if (toolAtIndex) {
										toolAtIndex.arguments += firstToolCall.function.arguments;
									} else {
										throw new Error("Tool agg does not exist");
									}
								}
							}
						} catch (e) {
							logger.error(e, "ERROR/STREAMER");
						}
					}

					// REASONING (streamed to client in V2)
					reasoning_content: {
						const reasoningContent =
							events?.[i]?.choices?.[0]?.delta?.reasoning;
						if (reasoningContent) {
							if (shouldReturnEarly()) break outer;
							reasoningContentChunks.push(reasoningContent);
							if (reasoningContent.text) {
								response.write(
									Buffer.from(
										getMessage({
											data: {
												content: "",
												index: config.contentIndex,
												type: "reasoning",
												reasoning: reasoningContent.text,
											},
										}),
									),
								);
							}
						}
					}

					// USAGE
					usage: {
						const usageChunk = events?.[i]?.usage;
						if (usageChunk) {
							usage = { ...clone(usage), ...usageChunk };
						}
					}
				}
			}

			const toolCalls = toolCallsAgg?.reduce(
				(acc, curr) => {
					if (!acc) acc = [];
					if (curr.arguments) {
						try {
							acc.push({
								type: "function",
								function: { name: curr.name, arguments: curr.arguments },
								id: curr.id,
							});
						} catch (e) {
							logger.error(e, "ERROR/STREAMER");
						}
					}
					return acc;
				},
				undefined as StreamedChunks["toolCalls"],
			);

			return {
				content: contentChunks,
				reasoningContent: reasoningContentChunks,
				citations,
				toolCalls,
				usage,
			};
		})()
			.then(resolve)
			.catch((error) => {
				logger.error(error, "ERROR/STREAMER");
				passThrough.destroy();
				resolve({
					content: contentChunks,
					reasoningContent: reasoningContentChunks,
					citations,
					toolCalls: undefined,
					usage,
				});
			});
	});
};

V2 Improvements:

  • Index-based: Each chunk has explicit index for ordering
  • Tool Result Streaming: Sub-agents stream through parent
  • Reasoning Display: Shows model reasoning to user
  • Simplified: Removed adaptive pacing (handled elsewhere)

EventManager: Progress Tracking

File: src/server/repositories/streamer/streamer.ts:1031

Manages UI progress events:

TypeScript
export class EventManager {
    eventsMap: Map<string, SSEProgressEvent>;
    metadata: TEventMetadata;
    version: "V1" | "V2";
    index: number;  // Current streaming index

    private constructor(
        index: number,
        version: "V1" | "V2" = "V1",
        public name?: string,
        public icon?: TIcon,
    ) {
        this.eventsMap = new Map();
        this.metadata = {};
        this.index = index;
        this.version = version;

        // Send initial heading
        if (this.version === "V1") {
            appendEvent("progress", {
                payload: { name: this.name, icon: this.icon },
                metadata: { type: "HEADING" },
            });
        }
    }

    static init(index: number, version?: "V1" | "V2") {
        return new EventManager(index, version);
    }

    public createEvent(name: TMessageProgressStep["name"], icon?: TIcon) {
        const currentEventIndex = this.eventsMap.size;
        const event = new SSEProgressEvent(
            name,
            this.index,
            currentEventIndex,
            icon,
            this.version,
        );
        this.eventsMap.set(event.id, event);
        return this.eventsMap.get(event.id) as SSEProgressEvent;
    }

    public getOrCreateLatestEvent(name: TMessageProgressStep["name"], icon?: TIcon) {
        let [latestEvent] = [...this.eventsMap.values()].filter(e => e.name === name);
        if (latestEvent) {
            latestEvent.status = "IN_PROGRESS";
            // Re-send to update UI
            appendEvent("progress", {
                payload: { id: latestEvent.id, name: latestEvent.name, status: latestEvent.status, ... },
            });
        } else {
            latestEvent = this.createEvent(name, icon);
        }
        return latestEvent;
    }

    public updateName(name: string, icon?: TIcon) {
        this.name = name;
        this.icon = icon;
        appendEvent("message", {
            data: { index: this.index, type: "progress", content: "", progress: { name: this.name, icon: this.icon } },
        });
    }
}

Usage Example:

TypeScript
const progress = EventManager.init(0, "V2");
const searchEvent = progress.createEvent("Searching web...", "GLOBE");

// Later...
searchEvent.send([{ type: "TEXT", value: "Found 10 results" }]);

// When done...
searchEvent.end(); // Status: DONE

SSEProgressEvent: Individual Steps

File: src/server/repositories/streamer/streamer.ts:794

TypeScript
export class SSEProgressEvent implements TmessageProgressV2Step {
    id: string;
    name: string;
    tags: string[];
    status: "IN_PROGRESS" | "DONE";
    values: TProgressV2EventType[];
    icon: TIcon;
    toolIndex: number;
    index: number;
    version: "V1" | "V2";

    constructor(name, toolIndex, index, icon?, version = "V1") {
        this.id = randomUUID();
        this.status = "IN_PROGRESS";
        this.name = name;
        this.tags = [];
        this.values = [];
        this.icon = icon ?? "DEFAULT";
        this.index = index;
        this.toolIndex = toolIndex;
        this.version = version;

        // Send to client immediately
        if (this.version === "V1") {
            appendEvent("progress", {
                payload: { id: this.id, name: this.name, status: this.status, values: this.values, icon: this.icon },
            });
        } else {
            appendEvent("message", {
                data: { index: this.toolIndex, type: "progress", content: "", progress: { steps: { index: this.index, id: this.id, ... } } },
            });
        }
    }

    public addTags(tags: string[]) {
        this.tags.push(...tags);
        // Re-send with updated tags
        this.send(tags.map(tag => ({ type: "TEXT", value: tag })));
    }

    public send(values: TProgressV2EventType[]) {
        this.values.push(...values);

        if (this.version === "V1") {
            appendEvent("progress", {
                payload: { id: this.id, name: this.name, status: this.status, values: this.values.map(v => this.serializeV1ValueFromV2(v)), icon: this.icon },
            });
        } else {
            appendEvent("message", {
                data: { index: this.toolIndex, type: "progress", content: "", progress: { steps: { ...this.serialize() } } },
            });
        }
    }

    public end() {
        this.status = "DONE";
        // Send final status
        if (this.version === "V1") {
            appendEvent("progress", { payload: { ...this.serializeAsV1() } });
        } else {
            appendEvent("message", {
                data: { index: this.toolIndex, type: "progress", content: "", progress: { steps: { ...this.serialize() } } },
            });
        }
    }

    public serialize() {
        return {
            tags: this.tags,
            id: this.id,
            name: this.name,
            status: this.status,
            values: this.values,
            icon: this.icon,
        };
    }
}

Event Types

Streamed to Client:
TypeScript
type StreamedChunks = {
	content: string[]; // Text content chunks
	reasoningContent: Array<{
		// Model reasoning (Claude, etc.)
		text: string;
		index: number;
		id?: string;
		signature?: string;
	}>;
	citations: CitationBlocks[]; // Source citations
	toolCalls?: Array<{
		// Assembled tool calls
		type: "function";
		function: { name: string; arguments: string };
		id: string;
	}>;
	usage?: CompletionUsage; // Token usage stats
};
Event Types Sent:
TypeScript
// Message content
type TStreamChunk =
	| { type: "text"; content: string; index: number }
	| { type: "reasoning"; reasoning: string; index: number }
	| { type: "toolCalls"; toolCalls: TToolCall[]; contentIndex: number }
	| {
			type: "toolCallResult";
			toolCallResult: {
				index: number;
				result: string;
				toolCallId: string;
				function: { name: string };
			};
			contentIndex: number;
	  }
	| { type: "citation"; citation: CitationBlocks }
	| { type: "progress"; progress: EventManager }
	| { type: "attachments"; attachments: TAttachment[] };

Stream End Handling

File: src/server/repositories/streamer/streamer.ts:771

TypeScript
const end = async (response: Response) => {
	const message = getMessage(END_MESSAGE);
	response.write(message);
	response.end();
};

const END_MESSAGE = { type: "DONE" };

Sends final event: end to signal completion.


Citation Handling

File: src/server/repositories/streamer/streamer.ts:151

TypeScript
function sendCitation(
	response: NodeJS.WritableStream,
	citation?: CitationBlocks,
): string {
	if (citation) {
		const { assistantMessageNode }: TUnifiedPrewareRequestContext =
			requestContext.get();

		let attachementIdx = 0;
		let citedTextIdx = 0;
		const payload: object[] = [];

		// Find which attachment this citation refers to
		assistantMessageNode.attachments
			.filter(isWebAccessGenLinkAttachment)
			.map((attachment: TWebAccessGeneratedLinkAttachment) => {
				if (attachementIdx === citation.document_index) {
					if (!attachment.citedText) {
						attachment.citedText = [];
					}

					// Check if already cited
					if (
						(citedTextIdx = attachment.citedText.indexOf(
							citation.cited_text,
						)) === -1
					) {
						citedTextIdx = attachment.citedText.length;
						attachment.citedText.push(citation.cited_text);
					}
				}

				payload.push({
					url: attachment.url,
					title: attachment.title,
					...(attachment.citedText ? { cited_text: attachment.citedText } : {}),
				});
				attachementIdx++;
			});

		appendEvent("attachments", { payload });

		// Format: [citation: documentIndex: citationIndex]
		const citationString = `[citation: ${citation.document_index + 1}: ${citedTextIdx + 1}]`;
		response.write(
			Buffer.from(getMessage({ data: { content: citationString } })),
		);
		return citationString;
	}
	return "";
}

Citation Format: [citation: 1: 2] means:

  • Document #1 (first attachment)
  • Citation #2 within that document

Index Management (CRITICAL)

File: src/server/endpoints/unified/orchestrator/helpers/baseUtils.ts

TypeScript
export const validateAndSyncStreamingIndex = (index: number): number => {
	const { currentContentIndex }: TUnifiedPostwareRequestContext =
		requestContext.get();

	// Never go below global index (prevents conflicts)
	if (currentContentIndex && index < currentContentIndex) {
		return currentContentIndex;
	}
	return index;
};

export const calculateInitialContentIndex = (
	agentName: TAgentName,
	globalContextIndex: number,
	eventManagerIndex: number,
	unifiedApiVersion: TUnifiedApiVersion,
): number => {
	if (unifiedApiVersion === "V2") {
		// For sub-agents, use the higher of global or event manager index
		return Math.max(globalContextIndex, eventManagerIndex);
	}
	return 0; // V1 doesn't use index-based streaming
};

Why This Matters:

  • Prevents multi-agent streaming conflicts
  • Ensures correct chunk ordering
  • Global index synchronized across all agents

Integration with Orchestrator

TypeScript
// In toolOrchestrator.ts
const streamOutput = await streamer.streamV2(
	config.request,
	config.response,
	chatResponse.data,
	{
		contentIndex: state.currentContentIndex,
		streamAsToolResult: isSubAgent
			? { toolResultIndex: currentToolResultIndex }
			: undefined,
		suppressUIStreaming: isSubAgent && !shouldStream,
	},
);

// Stream tool results
streamMessage({
	type: "toolCallResult",
	contentIndex: state.currentContentIndex,
	toolCallResult: {
		index: currentToolResultIndex,
		result: content,
		toolCallId: chunk.id,
		function: { name: chunk.function.name },
	},
});

Summary

The streaming architecture:

  1. SSE Protocol: Text/event-stream with structured data
  2. Dual APIs: V1 (legacy) and V2 (indexed, current)
  3. EventManager: Hierarchical progress tracking
  4. Adaptive Pacing: V1 slows when buffer full
  5. Index Synchronization: Prevents multi-agent conflicts
  6. Graceful Errors: Resolves with partial results
  7. Citation Support: Automatic attachment linking

Key Principle: Users see progress in real-time, even during multi-step tool execution. Never make them wait for the final result.