TypeScript
async run(config: TToolOrchestratorRunInput) {
this.currentToolMetadata = {};
const { state, agentContext, messages, systemPromptTokens, contextLimit,
shouldDoParallelToolCalls, modelConfig } =
await this.initializeOrchestrationState(config);
let currentMessages = messages;
if (this.agentConfig.initializeAgentState) {
this.agentConfig.initializeAgentState(state);
}
const openAiTools: ChatCompletionTool[] = this.registry.getAll().map(t => ({
type: "function",
function: {
name: t.name,
description: t.description,
parameters: zodToJsonSchema(t.parameters, { target: "openApi3" }),
},
}));
state.currentContentIndex = calculateInitialContentIndex(
this.agentName,
globalContextIndex ?? 0,
eventManager.index,
unifiedApiVersion,
);
requestContext.set({ currentContentIndex: state.currentContentIndex });
eventManager.index = state.currentContentIndex;
if (REASONING_MODELS.includes(schema.schema.model)) {
streamMessage({
type: "reasoning",
contentIndex: state.currentContentIndex,
reasoning: "",
});
}
let lastLoopProgressEvent: SSEProgressEvent | undefined;
const toolIterationInfo: TToolIterationInfo[] = [];
while (this.agentConfig.shouldContinueLoop(state)) {
toolIterationInfo.push({
iteration: state.currentIteration + 1,
toolCalls: [],
layout: {},
});
const shouldSendTools = this.agentConfig.shouldSendTools(
state, openAiTools, schema.schema
);
const modelId = this.agentConfig.getModelId
? this.agentConfig.getModelId(state, modelConfig)
: modelConfig.id;
const llmInput = {
model: modelId,
messages: [...currentMessages, ...state.inLoopTrimmedMessages],
params: {
...(shouldSendTools ? {
tools: openAiTools,
tool_choice: state.toolChoice,
parallel_tool_calls: shouldUseParallelToolCalls(
this.agentName,
shouldDoParallelToolCalls || this.agentConfig.useParallelTools
),
} : {}),
max_tokens: modelConfig.getMaxOutputTokens?.(false),
},
};
const chatRequest = await provider.chat(llmInput);
const chatResponse = await chatRequest.stream();
state.hasToolCalls = false;
state.toolChoice = "auto";
lastLoopProgressEvent?.end();
state.currentContentIndex = validateAndSyncStreamingIndex(state.currentContentIndex);
const streamOutput = unifiedApiVersion === "V2"
? await streamer.streamV2(config.request, config.response, chatResponse.data, {
contentIndex: state.currentContentIndex,
})
: await streamer.stream(config.request, config.response, chatResponse.data);
const toolCallParsed = multipleToolCallSchema.safeParse(streamOutput?.toolCalls);
if (!toolCallParsed.success) {
state.inLoopTrimmedMessages.push(
SystemMessage({
content: [{
text: prompts.getInvalidToolNamePrompt(toolCallParsed.error.message),
type: "TEXT",
tokens: (await tokenizer.encode(toolCallParsed.error.message)).length,
}],
})
);
state.hasToolCalls = true;
continue;
}
const toolCalls = await getTokenizedToolCalls<TToolCall>(toolCallParsed.data);
const llmResponseContent = streamOutput?.content.join("") || "";
const reasoningContent = streamOutput?.reasoningContent.reduce((acc, current) => {
if (!acc[current.index]) {
acc[current.index] = current;
} else {
if (current.text) acc[current.index].text += current.text;
if (current.id) acc[current.index].id = current.id;
if (current.signature) acc[current.index].signature = current.signature;
}
return acc;
}, []);
if (state.lastIterationErroredOutTools?.length) {
const notRetried = state.lastIterationErroredOutTools.filter(
tool => !toolCallParsed.data?.some(t => t.function.name === tool)
);
if (notRetried.length) {
logger.error({ toolsNotChosenAfterError: notRetried },
`ERROR/TOOL/RUN-SINGLE-TOOL/TOOLS-NOT-CHOSEN-AFTER-ERROR`);
}
state.lastIterationErroredOutTools = [];
}
if (this.agentConfig.processToolCalls && toolCalls?.length > 0) {
const shouldBreak = await this.agentConfig.processToolCalls(
toolCalls, state, agentContext
);
if (shouldBreak) {
assistantMessageNode.contentV2.push({
type: "TEXT",
text: llmResponseContent,
tokens: (await tokenizer.encode(llmResponseContent)).length,
});
state.hasToolCalls = false;
break;
}
}
if (!toolCalls || toolCalls.length === 0) {
if (this.agentConfig.onNoToolCalls) {
const noCallsResult = await this.agentConfig.onNoToolCalls(
state, agentContext, "NO_CALLS_GENERATED"
);
if (noCallsResult.shouldContinue) {
if (state.messages) {
currentMessages = state.messages;
}
continue;
}
}
state.hasToolCalls = false;
}
if (toolCalls?.length) {
logger.info({
currentIteration: state.currentIteration,
effectiveToolCallsLimit: state.effectiveToolCallsLimit,
toolCallsNames: toolCallParsed.data?.map(t => t.function.name),
}, "DEBUG/TOOL_CALLS");
const toolExecutionEvent = eventManager.createEvent("Selecting tools", "LIGHTBULB");
state.hasToolCalls = true;
if (this.agentConfig.onToolsSelected) {
await this.agentConfig.onToolsSelected(toolCalls, state, agentContext);
}
const toolExecutionResults: TToolExecutionResults = {};
streamMessage({ type: "toolCalls", contentIndex: state.currentContentIndex, toolCalls });
toolCalls.forEach(tool => {
this.chatCtx.chatState.addUsedMode(toolNameMap[tool.function.name]);
});
if (toolCalls.length) {
state.currentContentIndex++;
}
state.lastIterationErroredOutTools = [];
state.toolsNotEnabledList = [];
let currentToolResultIndex = -1;
for await (const chunk of this.executeRequestedTools(
toolCalls, state.currentContentIndex, agentContext
)) {
if (eventManager.index > state.currentContentIndex) {
state.currentContentIndex = eventManager.index;
}
if (this.agentConfig.enableGlobalContextUpdate) {
requestContext.set({ currentContentIndex: state.currentContentIndex });
}
if (chunk.type !== "tool:progress" && this.agentConfig.onToolProgress) {
this.agentConfig.onToolProgress(chunk.function.name, toolExecutionEvent, agentContext);
}
switch (chunk.type) {
case "tool:progress": {
toolExecutionProgress = chunk.progress;
state.currentContentIndex++;
break;
}
case "tool:error": {
state.lastIterationErroredOutTools.push(chunk.function.name);
if (state.currentIteration === state.effectiveToolCallsLimit) {
logger.error({ error: chunk.error }, `ERROR/TOOL/RUN-SINGLE-TOOL`);
}
toolExecutionResults[chunk.id] = {
type: chunk.type,
result: chunk.error.slice(0, TOOL_ERROR_MESSAGE_MAX_LENGTH),
};
if (chunk.error === "TOOL_NOT_ENABLED") {
state.toolsNotEnabledList.push(chunk.function.name);
}
this.currentToolMetadata[chunk.id] = { saveToDB: false, summary: chunk.error };
break;
}
case "tool:stream": {
lastLoopProgressEvent = toolExecutionProgress?.getOrCreateLatestEvent(
getRandomGeneratingMessage(), "WAND"
);
streamMessage({
type: "toolCallResult",
contentIndex: state.currentContentIndex,
toolCallResult: { index: currentToolResultIndex, result: "", toolCallId: chunk.id, function: { name: chunk.function.name } },
});
const safeContentIndex = validateAndSyncStreamingIndex(state.currentContentIndex);
const content = unifiedApiVersion === "V2"
? await streamer.streamV2(config.request, config.response, chunk.content.stream, {
contentIndex: safeContentIndex,
streamAsToolResult: { toolResultIndex: currentToolResultIndex },
})
: await streamer.stream(config.request, config.response, chunk.content.stream);
try {
const toolStreamUsage = await chunk.content.chatResponse.usage(content.content, {
usage: content.usage,
tools: [chunk.function.name],
});
this.addUsage(toolStreamUsage);
} catch (error) {
logger.warn({ error, toolId: chunk.id, toolName: chunk.function.name },
"WARN/TOOL/STREAM_USAGE_CALCULATION_FAILED");
}
if (toolExecutionResults[chunk.id]) {
toolExecutionResults[chunk.id].result += content.content.join("");
} else {
toolExecutionResults[chunk.id] = {
type: chunk.type,
result: content.content.join(""),
};
}
this.currentToolMetadata[chunk.id] = { saveToDB: true };
lastLoopProgressEvent?.end();
break;
}
case "tool:start": {
currentToolResultIndex++;
break;
}
case "tool:done": {
lastLoopProgressEvent = toolExecutionProgress?.getOrCreateLatestEvent(
getRandomGeneratingMessage(), "WAND"
);
if (!toolExecutionResults[chunk.id]) {
toolExecutionResults[chunk.id] = {
type: chunk.type,
result: chunk.result.result,
};
}
this.currentToolMetadata[chunk.id] = chunk.result;
if (chunk.result.toolMetadata) {
this.currentToolMetadata = { ...this.currentToolMetadata, ...chunk.result.toolMetadata };
}
if (chunk.result.toolCallCount) {
state.currentIteration += chunk.result.toolCallCount;
}
if (chunk.result.usageConfigArray?.length) {
chunk.result.usageConfigArray.forEach(usage => this.addUsage(usage));
}
lastLoopProgressEvent?.end();
break;
}
}
if (this.agentConfig.enableGlobalContextUpdate) {
requestContext.set({ currentContentIndex: state.currentContentIndex });
}
}
toolExecutionEvent.end();
const successfulToolNames: TToolName[] = [];
for (const [index, call] of toolCalls.entries()) {
const executedTool = toolExecutionResults[call.id];
let result = executedTool?.result;
const metadata = this.currentToolMetadata[call.id];
if (executedTool.type !== "tool:error") {
successfulToolNames.push(call.function.name);
}
if (this.agentConfig.handleSpecialTools) {
const specialResult = this.agentConfig.handleSpecialTools(
call.function.name, result, call, agentContext
);
if (specialResult) {
if (specialResult.modifiedResult) result = specialResult.modifiedResult;
if (specialResult.shouldForceToolChoice) state.toolChoice = specialResult.shouldForceToolChoice;
if (specialResult.modifiedMessages) currentMessages = specialResult.modifiedMessages;
}
}
if (toolExecutionResults[call.id].type !== "tool:stream" &&
toolExecutionResults[call.id].type !== "tool:error") {
streamMessage({
type: "toolCallResult",
contentIndex: state.currentContentIndex,
toolCallResult: {
index,
result: JSON.stringify(result),
toolCallId: call.id,
function: { name: call.function.name },
},
});
}
const content = JSON.stringify(result);
if (metadata.saveToDB) {
toolResults.push({
toolCallId: call.id,
content: content,
tokens: (await tokenizer.encode(content)).length,
shouldIncludeInHistory: true,
function: { name: call.function.name },
});
} else {
toolResults.push({
toolCallId: call.id,
content: content,
tokens: (await tokenizer.encode(content)).length,
shouldIncludeInHistory: false,
summary: JSON.stringify(metadata.summary),
function: { name: call.function.name },
});
}
toolIterationInfo[toolIterationInfo.length - 1].toolCalls.push({
name: call.function.name,
args: call.function.arguments,
result: content,
});
}
}
let streamUsage = await chatResponse.usage(streamOutput.content, {
usage: streamOutput.usage,
tools: successfulToolNames,
});
if (this.agentConfig.calculateUsage) {
streamUsage = await this.agentConfig.calculateUsage(
streamUsage, streamOutput, systemPromptTokens, openAiTools, shouldSendTools
);
}
this.addUsage(streamUsage);
const trimmedMessages = await engine({
messages: currentMessages,
inLoop: state.inLoopTrimmedMessages,
contextLimit: contextLimit,
response: {
tokens: streamUsage.tokens,
toolCalls: toolCalls,
toolResults: await formatToolResultsForLLM(toolResults),
content: llmResponseContent,
reasoning: reasoningContent,
},
agentName: this.agentName,
}, toolIterationInfo[toolIterationInfo.length - 1]);
state.inLoopTrimmedMessages = trimmedMessages.inLoop;
if (toolResults.length > 2) {
state.inLoopTrimmedMessages.push(
SystemMessage({
content: [{
type: "TEXT",
text: TOOL_RESULTS_CONTEXT_SUMMARY_PROMPT,
tokens: TOOL_RESULTS_CONTEXT_SUMMARY_TOKENS,
}],
})
);
}
if (trimmedMessages.summary) {
const { messages: adjustedMessages } = await buildMessages(
this.chatCtx, config, this.registry, this.agentName, trimmedMessages.summary
);
currentMessages = adjustedMessages;
if (trimmedMessages.shouldDumpSummaryInDB) {
assistantMessageNode.isChatSummarizedSoFar = true;
assistantMessageNode.summary = trimmedMessages.summary;
}
}
this.chatCtx.messages = [...currentMessages, ...state.inLoopTrimmedMessages];
const dataPolicy = this.agentConfig.getDataPolicy();
if (dataPolicy.shouldStoreText) {
const reasoningContentText = reasoningContent.reduce((acc, current) => acc + current.text, "");
assistantMessageNode.contentV2.push({
type: "TEXT",
toolCalls: filterToolCallsByPolicy(dataPolicy, toolCalls),
text: llmResponseContent,
tokens: (await tokenizer.encode(llmResponseContent)).length,
...(reasoningContent ? {
reasoning: {
content: reasoningContentText,
tokens: (await tokenizer.encode(reasoningContentText)).length,
},
} : {}),
});
}
if (toolResults.length) {
const progressEvents = toolExecutionProgress?.getEvents() || [];
const steps = progressEvents.map(event => event.serialize());
assistantMessageNode.contentV2.push({
type: "PROGRESS",
name: toolExecutionProgress?.name,
icon: toolExecutionProgress?.icon,
steps: steps,
metadata: toolExecutionProgress?.metadata,
});
if (dataPolicy.shouldStoreToolResults) {
assistantMessageNode.contentV2.push({
type: "TOOL_RESULT",
toolResults: await settleAllOrThrow(
filterToolResultsByPolicy(toolResults, dataPolicy).map(result =>
getFormattedAndTokenizedToolResult(result, this.agentName)
)
),
});
}
}
if (state.toolsNotEnabledList.length > 0 && this.agentConfig.onNoToolCalls) {
await this.agentConfig.onNoToolCalls(state, agentContext, "TOOLS_DISABLED", toolCalls);
}
const iterativeToolCalls = toolCalls?.filter(call =>
!NON_ITERATIVE_TOOLS.some(tool => call.function.name === `${tool.toLowerCase()}_tool`)
) || [];
if (toolCalls?.length) {
state.currentIteration += iterativeToolCalls.length;
} else {
state.currentIteration++;
}
if (state.currentIteration >= state.effectiveToolCallsLimit - 1 &&
state.hasToolCalls && this.agentConfig.onNoToolCalls &&
!state.isMainThreadFinalLoopPromptInjected) {
const limitReachedResult = await this.agentConfig.onNoToolCalls(
state, agentContext, "LIMIT_REACHED"
);
if (limitReachedResult.shouldContinue) {
if (state.messages) {
currentMessages = state.messages;
state.isMainThreadFinalLoopPromptInjected = true;
state.currentIteration = state.effectiveToolCallsLimit - 1;
}
}
}
state.currentContentIndex++;
}
if (lastLoopProgressEvent) lastLoopProgressEvent.end();
if (eventManager) {
eventManager.index = state.currentContentIndex;
}
if (this.agentConfig.enableGlobalContextUpdate) {
requestContext.set({
currentContentIndex: state.currentContentIndex,
toolIterationInfo: toolIterationInfo,
});
}
return {
result: "OK",
finalIteration: state.currentContentIndex,
toolCallCount: state.currentIteration,
usageConfigArray: this.usageConfigArray,
};
}