TypeScript
const streamV2 = async (
_req: Request,
response: NodeJS.WritableStream,
inputStream: NodeJS.ReadableStream,
config: {
contentIndex: number;
streamAsToolResult?: {
toolResultIndex: number;
};
suppressUIStreaming?: boolean;
},
): Promise<StreamedChunks> => {
inputStream.setEncoding("utf8");
return new Promise<StreamedChunks>((resolve) => {
let chunkBuffer = "";
const contentChunks: string[] = [];
const reasoningContentChunks: Array<ChoiceDelta["reasoning"]> = [];
const citations: CitationBlocks[] = [];
const passThrough = new PassThrough();
const toolCallsAgg: Array<{ name: string; arguments: string; id: string }> =
[];
let usage: CompletionUsage | undefined;
inputStream.pipe(passThrough);
(async (): Promise<StreamedChunks> => {
outer: for await (const chunk of passThrough) {
if (shouldReturnEarly()) break outer;
const { events, bfr } = extractEventsFromChunk(chunk, chunkBuffer);
chunkBuffer = bfr;
for (let i = 0; i < events.length; i++) {
contents: {
const content = events?.[i]?.choices?.[0]?.delta?.content ?? "";
contentChunks.push(content);
if (content && !config.suppressUIStreaming) {
if (config.streamAsToolResult) {
response.write(
Buffer.from(
getMessage({
data: {
content: "",
index: config.contentIndex,
type: "toolCallResult",
toolCallResult: {
index: config.streamAsToolResult.toolResultIndex,
delta: content,
},
},
}),
),
);
} else {
response.write(
Buffer.from(
getMessage({
data: {
content: "",
index: config.contentIndex,
type: "text",
text: content,
},
}),
),
);
}
}
}
toolCalls: {
try {
const toolCalls = events?.[i]?.choices?.[0]?.delta?.tool_calls;
if (toolCalls && toolCalls.length > 0) {
const firstToolCall = toolCalls[0];
if ("type" in firstToolCall) {
toolCallsAgg[firstToolCall.index] = {
name: firstToolCall.function.name,
arguments: firstToolCall.function.arguments,
id: firstToolCall.id,
};
} else {
const toolAtIndex = toolCallsAgg[firstToolCall.index];
if (toolAtIndex) {
toolAtIndex.arguments += firstToolCall.function.arguments;
} else {
throw new Error("Tool agg does not exist");
}
}
}
} catch (e) {
logger.error(e, "ERROR/STREAMER");
}
}
reasoning_content: {
const reasoningContent =
events?.[i]?.choices?.[0]?.delta?.reasoning;
if (reasoningContent) {
if (shouldReturnEarly()) break outer;
reasoningContentChunks.push(reasoningContent);
if (reasoningContent.text) {
response.write(
Buffer.from(
getMessage({
data: {
content: "",
index: config.contentIndex,
type: "reasoning",
reasoning: reasoningContent.text,
},
}),
),
);
}
}
}
usage: {
const usageChunk = events?.[i]?.usage;
if (usageChunk) {
usage = { ...clone(usage), ...usageChunk };
}
}
}
}
const toolCalls = toolCallsAgg?.reduce(
(acc, curr) => {
if (!acc) acc = [];
if (curr.arguments) {
try {
acc.push({
type: "function",
function: { name: curr.name, arguments: curr.arguments },
id: curr.id,
});
} catch (e) {
logger.error(e, "ERROR/STREAMER");
}
}
return acc;
},
undefined as StreamedChunks["toolCalls"],
);
return {
content: contentChunks,
reasoningContent: reasoningContentChunks,
citations,
toolCalls,
usage,
};
})()
.then(resolve)
.catch((error) => {
logger.error(error, "ERROR/STREAMER");
passThrough.destroy();
resolve({
content: contentChunks,
reasoningContent: reasoningContentChunks,
citations,
toolCalls: undefined,
usage,
});
});
});
};