Shared from "Study" on Inkdown
The orchestration layer is a custom-built, multi-agent tool orchestration system designed for AI assistant interactions. It manages tool execution, agent coordination, streaming responses, and context window optimization - all without relying on external frameworks like LangChain or AutoGPT.
User Request
โ
Unified API Endpoint
โ
ToolOrchestrator (with Agent Config)
โ
ToolRegistry (manages available tools)
โ
LLM Provider (Anthropic/OpenAI/Google)
โ
Tool Execution (with streaming)
โ
Token Engine (context optimization)
โ
Response Storage (with data policies)Location: src/server/endpoints/unified/orchestrator/toolOrchestrator.ts
The ToolOrchestrator is the heart of the orchestration system. It's a 1065-line class that manages the entire orchestration lifecycle.
constructor(
chatCtx: TChatContext,
agentName: TAgentName = AGENT_NAMES.MAIN_THREAD,
toolRegistry: ToolRegistry,
agentConfig: TAgentConfig,
customToolCallLimit?: number,
)Parameters:
chatCtx: Chat context containing conversation history, model config, and chat stateagentName: Which agent to run (MainThreadAgent, DeepResearchSupervisor, ResearcherAgent)toolRegistry: Registry of available tools for this sessionagentConfig: Configuration object defining agent behaviorcustomToolCallLimit: Optional override for tool call limitsprivate chatCtx: TChatContext;
private registry: ToolRegistry;
private agentName: TAgentName;
private customToolCallLimit?: number;
private agentConfig: TAgentConfig;
usageConfigArray: TUsageConfig[] = [];
private currentToolMetadata: TToolMetadata = {};
public currentExecutingTool?: {
toolCallId: string;
functionName: string;
toolIndex: number;
};chatCtx: Holds conversation state, model config, attachments, etc.registry: Manages which tools are availableagentName: Determines which agent behavior to useagentConfig: Defines how this agent behaves (hooks, limits, policies)usageConfigArray: Tracks token usage across all iterationscurrentToolMetadata: Metadata about tools being executedcurrentExecutingTool: Currently running tool for error reportingrun()The run() method is the main entry point that executes the entire orchestration flow.
async run(config: TToolOrchestratorRunInput) {
// 1. Initialize orchestration state
const { state, agentContext, messages, ... } = await this.initializeOrchestrationState(config);
// 2. Convert tools to OpenAI format
const openAiTools = this.registry.getAll().map(t => ({
type: "function",
function: {
name: t.name,
description: t.description,
parameters: zodToJsonSchema(t.parameters),
},
}));
// 3. Main orchestration loop
while (this.agentConfig.shouldContinueLoop(state)) {
// 3a. Decide if tools should be sent
const shouldSendTools = this.agentConfig.shouldSendTools(state, openAiTools, schema);
// 3b. Call LLM with or without tools
const chatRequest = await provider.chat(llmInput);
const chatResponse = await chatRequest.stream();
// 3c. Stream the response to client
const streamOutput = await streamer.stream(...);
// 3d. Parse tool calls from response
const toolCalls = await getTokenizedToolCalls(toolCallParsed.data);
// 3e. Execute tools (if any)
for await (const chunk of this.executeRequestedTools(toolCalls, ...)) {
// Handle streaming tool results
}
// 3f. Run token engine for context optimization
const trimmedMessages = await engine({
messages: currentMessages,
inLoop: state.inLoopTrimmedMessages,
contextLimit: contextLimit,
response: { ... },
});
// 3g. Store results based on data policy
if (dataPolicy.shouldStoreToolResults) {
assistantMessageNode.contentV2.push({
type: "TOOL_RESULT",
toolResults: filterToolResultsByPolicy(toolResults, dataPolicy),
});
}
// 3h. Update iteration counter
state.currentIteration += iterativeToolCalls.length;
}
return {
result: "OK",
finalIteration: state.currentContentIndex,
toolCallCount: state.currentIteration,
usageConfigArray: this.usageConfigArray,
};
}executeRequestedTools()This is an async generator that executes tools and yields streaming results.
private async *executeRequestedTools(
calls: { id: string; function: { name: TToolName; arguments: string } }[],
currentContentIndex: number,
agentContext: TAgentContext,
): AsyncIterable<TStreamChunk> {
// 1. Create progress event manager
const progress = EventManager.init(safeToolIndex, "V2");
yield { type: "tool:progress", progress };
// 2. Create tasks for all tools (parallel execution)
const tasks = calls.map((call, index) =>
this.runSingleTool(call.function.name, call.id, call.function.arguments, progress, agentContext, index),
);
// 3. Execute and stream results
for await (const work of tasks) {
for await (const chunk of work) yield chunk;
}
}Stream Chunk Types:
tool:progress: Initial progress event with EventManagertool:start: Tool execution beginstool:stream: Streaming tool result (for tools that return streams)tool:done: Tool execution completed with resulttool:error: Tool execution failedrunSingleTool()private runSingleTool(
name: TToolName,
id: string,
args: string,
progress: EventManager,
agentContext: TAgentContext,
index: number,
) {
return async function* (this: ToolOrchestrator): AsyncIterable<TStreamChunk> {
yield { type: "tool:start", id, function: { name }, progress };
// 1. Check if tool should be used
if (tool.shouldUse && !(await tool.shouldUse(ctx))) {
yield { type: "tool:error", id, function: { name }, error: "TOOL_NOT_ENABLED" };
return;
}
// 2. Parse arguments (with agent-specific preprocessing)
const parsedArgs = this.agentConfig.preprocessToolArguments
? await this.agentConfig.preprocessToolArguments(toolCall, ctx, this.toolMetadata, agentContext)
: JSON.parse(args);
// 3. Set current executing tool context
this.currentExecutingTool = { toolCallId: id, functionName: name, toolIndex: index };
// 4. Execute tool
const resultOrStream = await tool.execute(parsedArgs, ctx, progress, toolContext);
// 5. Handle streaming vs non-streaming results
if (isAsyncIterable(resultOrStream)) {
const iterator = resultOrStream[Symbol.asyncIterator]();
while (true) {
const { value, done } = await iterator.next();
if (done) {
yield { type: "tool:done", id, function: { name }, result: value };
break;
}
if ("stream" in value) {
yield { type: "tool:stream", id, function: { name }, content: value };
}
}
} else {
yield { type: "tool:done", id, function: { name }, result: resultOrStream };
}
}.bind(this)();
}Location: src/server/endpoints/unified/tools/toolRegistry.ts
The ToolRegistry manages the lifecycle of tools - registration, retrieval, and filtering.
export class ToolRegistry {
private tools: Map<TToolName, Tool> = new Map();
private basePrompt: string;
constructor(tools: Tool[] = [], basePrompt: string = MERLIN_TOOL_PROMPT) {
tools.forEach((tool) => this.register(tool));
this.basePrompt = basePrompt;
}
static defaultRegistry(
userAddedModes: TUserAddedChatModes[],
settingsV3: TSettingsV3,
requestClient: RequestClient,
) {
const registry = new ToolRegistry(DEFAULT_TOOLS);
// Add tools based on user added modes
userAddedModes.forEach((mode) => {
if (mode === "RAG" || mode === "CHATBOT") registry.register(ragTool);
if (mode === "CHATBOT") registry.register(chatbotTool);
if (mode === "DATA_ANALYSIS") registry.register(dataAnalysisTool);
});
// Clear all tools for deep research (has custom tools)
if (userAddedModes.includes("DEEP_RESEARCH")) {
registry.clear();
}
// Remove memory tools on mobile if disabled
if (!settingsV3.memory.isEnabled) {
registry.unregister(toolNames.Enum.store_memory_tool);
registry.unregister(toolNames.Enum.retrieve_memory_tool);
}
return registry;
}
register(tool: Tool): void
unregister(toolName: TToolName): void
clear(): void
get(name: TToolName): Tool | undefined
getAll(): Tool[]
getBasePrompt(): string
}Default Tools:
memoryRetrievalTool: Retrieve user memoriesmemoryStorageTool: Store information to user memoriescraftTool: Generate code/craftswebSearchTool: Search the webimageGenTool: Generate imagesLocation: src/server/endpoints/unified/orchestrator/configs/
Agent configs define the behavior of different agent types. They use a hook-based pattern for maximum flexibility.
export type TAgentConfig = {
// Agent properties
enableGlobalContextUpdate: boolean; // Whether to update global request context
useParallelTools: boolean; // Whether to use parallel tool calls
// Lifecycle hooks
initializeAgentState?: (state: TOrchestratorState) => void;
shouldContinueLoop: (state: TOrchestratorState) => boolean;
shouldSendTools: (state, openAiTools, schema) => boolean;
getModelId?: (state, baseModelConfig) => TLLMModels;
// Tool execution hooks
onNoToolCalls?: (state, context, reason, toolCalls?) => Promise<{ shouldContinue: boolean }>;
processToolCalls?: (toolCalls, state, context) => Promise<boolean>;
onToolsSelected?: (toolCalls, state, context) => Promise<void>;
onToolProgress?: (toolName, event, context) => void;
// Tool argument/result hooks
preprocessToolArguments?: (toolCall, context, toolMetadata, agentContext) => Promise<object>;
handleSpecialTools?: (toolName, result, toolCall, context) => { shouldForceToolChoice?, modifiedResult?, modifiedMessages? };
// Usage and data hooks
calculateUsage?: (baseUsage, streamOutput, systemPromptTokens, openAiTools, shouldSendTools) => Promise<TUsageConfig>;
getDataPolicy: () => TAgentDataPolicy;
};Location: src/server/endpoints/unified/orchestrator/configs/mainThread.config.ts
This is the default, user-facing agent that saves all data and has standard behavior.
export const MAIN_THREAD_CONFIG: TAgentConfig = {
enableGlobalContextUpdate: true,
useParallelTools: true,
shouldContinueLoop: (state) => {
return (
state.hasToolCalls &&
state.currentIteration <= state.effectiveToolCallsLimit &&
!shouldReturnEarly()
);
},
shouldSendTools: (state, openAiTools, schema) => {
let shouldSendTools = !!schema?.toolsRequired && openAiTools.length > 0;
const isMainThreadLimitReached = state.currentIteration >= state.effectiveToolCallsLimit - 1;
if (isMainThreadLimitReached) {
shouldSendTools = false; // Don't send tools on last iteration
}
return shouldSendTools;
},
onNoToolCalls: async (state, context, reason, toolCalls) => {
if (reason === "LIMIT_REACHED") {
// Clear tools and inject final response prompt
context.registry.clear();
const updatedMessagesObject = await buildMessages(...);
state.messages = updatedMessagesObject.messages;
state.inLoopTrimmedMessages.push(
SystemMessage({
content: finalResponseGenrationPrompt(state.effectiveToolCallsLimit),
}),
);
return { shouldContinue: true };
}
if (reason === "TOOLS_DISABLED" && toolCalls) {
// Inform user about disabled tools
const toolsNotEnabledPrompt = prompts.getEnableToolPrompt(state.toolsNotEnabledList[0]);
streamMessage({ type: "text", content: toolsNotEnabledPrompt, ... });
return { shouldContinue: false };
}
return { shouldContinue: false };
},
handleSpecialTools: (toolName, result, toolCall, context) => {
// Special handling for RAG tool - inject context into user message
if (toolName === toolNames.enum.rag_tool) {
const ragResults = result as TGetRAGResultsResponse;
if (!ragResults?.result?.pastedTextEmbeddings?.length) return;
const pastedText = ragResults.result.pastedTextEmbeddings.join("");
const modifiedMessages = JSON.parse(JSON.stringify(context.chatCtx?.messages || []));
const userMessageIndex = modifiedMessages.findLastIndex(
(msg) => msg.role === "user" && msg.content[0]?.type === "TEXT",
);
if (userMessageIndex !== -1) {
const targetMessageContent = modifiedMessages[userMessageIndex].content[0];
(targetMessageContent as { text: string }).text = `${pastedText}\nUser Query: ${(targetMessageContent as { text: string }).text}`;
}
return { modifiedMessages, modifiedResult: { ...ragResults, result: { ...ragResults.result, pastedTextEmbeddings: [""] } } };
}
// Special handling for chatbot tool - force RAG tool next
if (toolName === toolNames.Enum.chatbot_tool) {
const parsedArgs = JSON.parse(toolCall.function.arguments);
if (parsedArgs?.shouldUseKnowledgeBase) {
return {
shouldForceToolChoice: {
type: "function",
function: { name: toolNames.Enum.rag_tool },
},
};
}
}
},
getDataPolicy: () => ({
shouldStoreToolCalls: true,
shouldStoreToolResults: true,
shouldStoreText: true,
filterToolCalls: null,
filterToolResults: null,
}),
};Location: src/server/endpoints/unified/orchestrator/configs/deepResearch.config.ts
This agent orchestrates deep research by spawning researcher sub-agents and managing report generation.
export const DEEP_RESEARCH_CONFIG: TAgentConfig = {
enableGlobalContextUpdate: true,
useParallelTools: true,
initializeAgentState: (state) => {
state.deepResearchRetryCount = 0;
},
shouldContinueLoop: (state) => {
// Check if we should force report generation
const isshouldForceReportGeneration = shouldForceReportGeneration(
assistantMessageNode,
state.currentIteration,
state.effectiveToolCallsLimit,
state.hasToolCalls,
state.deepResearchRetryCount,
);
if (isshouldForceReportGeneration) {
const reportConfig = setupForcedReportGeneration(...);
state.currentIteration = reportConfig.updatedIteration;
state.toolChoice = reportConfig.toolChoice;
state.inLoopTrimmedMessages.push(reportConfig.systemMessage);
state.hasToolCalls = true;
return true;
}
return (
state.hasToolCalls &&
state.currentIteration < state.effectiveToolCallsLimit &&
!shouldReturnEarly()
);
},
getModelId: (state, baseModelConfig) => {
const retryCount = state.deepResearchRetryCount || 0;
// Use cheaper model after retries
return retryCount > 1 ? getLLMModel("gpt-4.1-mini").id : baseModelConfig.id;
},
onNoToolCalls: async (state, context, reason) => {
const retryCount = state.deepResearchRetryCount || 0;
if (reason === "NO_CALLS_GENERATED") {
// Retry logic for deep research
if (shouldRetryDeepResearchSupervisor(...)) {
state.deepResearchRetryCount = retryCount + 1;
state.inLoopTrimmedMessages.push(
SystemMessage({ content: NO_TOOL_CALLS_RETURNED_DEEPRESEARCH_PROMPT }),
);
state.hasToolCalls = true;
return { shouldContinue: true };
}
// Force report generation if appropriate
if (shouldForceReportGeneration(...)) {
return { shouldContinue: true };
}
}
return { shouldContinue: false };
},
preprocessToolArguments: async (toolCall, context, toolMetadata) => {
// Special handling for report generation tool arguments
if (toolCall.function.name === toolNames.Enum.report_generation_tool) {
try {
const parsedArgs = JSON.parse(toolCall.function.arguments);
const isArgsValid = reportGenerationInputSchema.safeParse(parsedArgs);
if (!isArgsValid.success) {
throw new Error(`Invalid arguments: ${isArgsValid.error.message}`);
}
return parsedArgs;
} catch (error) {
// Fallback to extraction if direct parsing fails
return await extractReportGenerationInputSchema(toolCall.function.arguments, toolCall.id, context, toolMetadata);
}
}
// Default behavior
try {
return JSON.parse(toolCall.function.arguments);
} catch (error) {
if (!TOOLS_WITH_DEFAULT_ARGS.has(toolCall.function.name)) {
throw error;
}
return {}; // Tools with default args
}
},
getDataPolicy: () => {
const toolFilter = createDeepResearchToolFilter();
return {
shouldStoreToolCalls: true,
shouldStoreToolResults: true,
shouldStoreText: true,
filterToolCalls: toolFilter, // Only store research-specific tools
filterToolResults: toolFilter,
};
},
};Tool Filter for Deep Research:
export const createDeepResearchToolFilter = () => {
return (toolName: TToolName) => {
switch (toolName) {
case toolNames.Enum.researcher_agent_tool:
case toolNames.Enum.feedback_questions_tool:
case toolNames.Enum.report_generation_tool:
return true; // Only store these
default:
return false; // Filter out everything else
}
};
};Location: src/server/endpoints/unified/orchestrator/configs/researcher.config.ts
This is a sub-agent used by the deep research supervisor for individual research tasks.
export const RESEARCHER_CONFIG: TAgentConfig = {
enableGlobalContextUpdate: false, // Don't update global context (sub-agent)
useParallelTools: true,
shouldContinueLoop: MAIN_THREAD_CONFIG.shouldContinueLoop,
shouldSendTools: (_state, openAiTools) => {
return openAiTools.length > 0; // Always send tools if available
},
preprocessToolArguments: MAIN_THREAD_CONFIG.preprocessToolArguments,
getDataPolicy: () => {
const toolFilter = createResearcherAgentToolFilter();
return {
shouldStoreToolCalls: true,
shouldStoreToolResults: true,
shouldStoreText: true,
filterToolCalls: toolFilter,
filterToolResults: toolFilter,
};
},
};Tool Filter for Researcher:
export const createResearcherAgentToolFilter = () => {
return (toolName: string) => toolName !== toolNames.Enum.get_search_history_tool;
};Location: src/server/endpoints/unified/orchestrator/helpers/baseUtils.ts
This file contains utility functions used throughout the orchestration system.
export const buildMessages = async (
chatCtx: TChatContext,
config: TToolOrchestratorRunInput,
registry: ToolRegistry,
agentName: TAgentName,
summary?: string,
) => {
const { schema, request } = requestContext.get();
const { userMessageNode, chatNode, user } = config;
const modelConfig = getLLMModel(schema.schema.model);
// Get context limit based on user plan and agent
const { contextLimit } = getLimitsBasedOnUserPlan(user, modelConfig, agentName);
// Handle edge case: user message too large
if (userMessageNode.tokens > contextLimit) {
logger.warn({ size: userMessageNode.tokens }, "USER_MESSAGE_TOO_LARGE");
const trimmedMessageAsTokens = (await tokenizer.encode(userMessageNode.content)).splice(0, contextLimit);
const decodedMessageContent = await tokenizer.decode(trimmedMessageAsTokens);
userMessageContent = [{ text: decodedMessageContent, type: "TEXT", tokens: trimmedMessageAsTokens.length }];
}
// Build system prompt
let systemPrompt = schema.schema.overiddenSystemPrompt && isMainThreadAgent(agentName)
? schema.schema.overiddenSystemPrompt
: registry.getBasePrompt();
// Add tool usage rules
systemPrompt += buildToolUsagePrompt(registry.getAll(), systemPrompt);
// Add personalization prompts
if (settingsV3.personalization.isEnabled) {
systemPrompt += await getPersonalizationPrompt(user);
}
// Add memory prompts
if (settingsV3.memory.isEnabled) {
systemPrompt += await getProfileMemoriesPrompt(user);
}
// Add project rules
if (projectWithAttachments) {
systemPrompt += getProjectRule(projectWithAttachments);
}
// Add attachment prompts
const attachmentPrompt = await getAttachmentPrompt(chatState);
if (attachmentPrompt) {
systemPrompt += attachmentPrompt;
}
// Add few-shot examples if model supports it
if (ALLOW_FEW_SHOT_MODELS.includes(modelConfig.id)) {
messages.push(...FEW_SHOT_EXAMPLES);
}
// Add chat history
const chatHistory = chatNode.thread.map((msg) => {
// Convert each message to prompt format
// ...
});
// Add summary if provided
if (summary) {
messages.push(
SystemMessage({
content: [
{
type: "TEXT",
text: prompts.getUserChatHistorySummaryWithPrompt(summary),
tokens: (await tokenizer.encode(summary)).length,
},
],
}),
);
}
// Add current user message
messages.push(
UserMessage({
content: userMessageContent,
}),
);
return {
messages,
systemPromptTokens: (await tokenizer.encode(systemPrompt)).length,
};
};export const getToolChoiceConfig = (
chatCtx: TChatContext,
attachmentCount: number,
registry: ToolRegistry,
chatbotId?: string,
) => {
let toolChoice: TToolChoice = "auto";
// Force RAG tool if attachments present and RAG tool available
if (attachmentCount > 0 && registry.get(toolNames.enum.rag_tool)) {
toolChoice = {
type: "function",
function: { name: toolNames.Enum.rag_tool },
};
}
// Force chatbot tool if chatbot ID present and chatbot tool available
if (chatbotId && registry.get(toolNames.enum.chatbot_tool)) {
toolChoice = {
type: "function",
function: { name: toolNames.Enum.chatbot_tool },
};
}
return toolChoice;
};export async function getTokenizedToolCalls<T extends { id: string; function: { name: string; arguments: string } }>(
toolCalls: T[] | undefined,
) {
if (!toolCalls) return undefined;
const toolTokens = await Promise.all(
toolCalls.map(async (toolCall) => {
const content = JSON.stringify({
tool_call_id: toolCall.id,
tool_name: toolCall.function.name,
tool_args: toolCall.function.arguments,
});
return tokenizer.encode(content);
}),
);
return toolCalls.map((toolCall, index) => ({
...toolCall,
tokens: toolTokens[index].length,
}));
}export const getFormattedAndTokenizedToolResult = async (
result: TToolResultTypeWithMetadata,
agentName: TAgentName,
) => {
return {
toolCallId: result.toolCallId,
content:
result.shouldIncludeInHistory ||
isDeepResearchSupervisor(agentName) ||
isResearcherAgent(agentName)
? result.content // Full content for research agents
: prompts.getUseAndThrowToolReducedVersion(result.summary ?? result.content), // Reduced for main thread
tokens: result.shouldIncludeInHistory
? result.tokens
: (await tokenizer.encode(prompts.getUseAndThrowToolReducedVersion(result.summary ?? result.content))).length,
function: { name: result.function.name },
};
};export const toolResultWithSummaryTokens = async (result: TToolResultTypeWithMetadata) => {
const tokens = (await tokenizer.encode(result.summary ?? prompts.getUseAndThrowToolReducedVersion(result.content))).length;
switch (result.function.name) {
case toolNames.Enum.craft_tool:
// Wrap craft result to prevent re-generation
const newToolResult = { ...result };
newToolResult.content = `The craft has been generated and sent to the user. You are not supposed to send the same result again.
<CRAFT_GENERATED_THAT_HAS_BEEN_SENT_TO_USER>
${result.content}
</CRAFT_GENERATED_THAT_HAS_BEEN_SENT_TO_USER>`;
newToolResult.summary = `A Craft was generated here, snippet: ${getTrimmedMessageWithWordCount(result.content, 100).trimmedText}`;
newToolResult.summaryTokens = tokens;
return newToolResult;
case toolNames.Enum.feedback_questions_tool:
// Similar wrapping for feedback questions
// ...
case toolNames.Enum.report_generation_tool:
// Similar wrapping for report generation
// ...
default:
return result;
}
};export function validateAndSyncStreamingIndex(currentIndex: number) {
const { currentContentIndex } = requestContext.get();
if (currentIndex < currentContentIndex) {
logger.warn(
{ previousIndex: currentIndex, correctedIndex: currentContentIndex },
"WARN/TOOL_ORCHESTRATOR/INDEX_REGRESSION_PREVENTED",
);
return currentContentIndex; // Prevent index regression
}
return currentIndex;
}export function createToolContext(orchestrator: ToolOrchestrator) {
return {
addUsage: (usageConfig: TUsageConfig) => orchestrator.addUsage(usageConfig),
currentExecutingTool: orchestrator.currentExecutingTool,
toolMetadata: orchestrator.toolMetadata,
};
}When a user sends a request through the unified API:
// In the API endpoint
const toolRegistry = ToolRegistry.defaultRegistry(
userAddedModes,
settingsV3,
requestClient,
);
const agentConfig = AGENT_CONFIGS[agentName];
const orchestrator = new ToolOrchestrator(
chatCtx,
agentName,
toolRegistry,
agentConfig,
customToolCallLimit,
);private async initializeOrchestrationState(config: TToolOrchestratorRunInput) {
const { unifiedApiVersion } = requestContext.get();
const { input, assistantMessageNode, user } = config;
const modelConfig: TModelConfig = this.chatCtx.model;
// Build messages with system prompt, history, attachments, etc.
const { messages, systemPromptTokens } = await buildMessages(
this.chatCtx,
config,
this.registry,
this.agentName,
);
// Get limits based on user plan and agent
const { contextLimit, toolCallsLimit, shouldDoParallelToolCalls } = getLimitsBasedOnUserPlan(
user,
modelConfig,
this.agentName,
);
// Initialize orchestrator state
const state: TOrchestratorState = {
currentIteration: 0,
currentContentIndex: 0,
hasToolCalls: true,
toolChoice: getToolChoiceConfig(this.chatCtx, input.attachments.length, this.registry, input.metadata.chatbot?.id),
effectiveToolCallsLimit: this.customToolCallLimit ?? toolCallsLimit,
usageConfigArray: this.usageConfigArray,
toolMetadata: this.currentToolMetadata,
inLoopTrimmedMessages: [],
messages,
systemPromptTokens,
lastCountedSubAgentTools: 0,
lastIterationErroredOutTools: [],
toolsNotEnabledList: [],
isMainThreadFinalLoopPromptInjected: false,
};
// Initialize agent-specific state if needed
if (this.agentConfig.initializeAgentState) {
this.agentConfig.initializeAgentState(state);
}
// Create agent context
const agentContext: TAgentContext = {
agentName: this.agentName,
registry: this.registry,
modelConfig,
unifiedApiVersion,
assistantMessageNode,
chatCtx: this.chatCtx,
config,
};
return { state, agentContext, messages, systemPromptTokens, contextLimit, toolCallsLimit, shouldDoParallelToolCalls, modelConfig };
}const openAiTools: ChatCompletionTool[] = this.registry.getAll().map((t) => ({
type: "function",
function: {
name: t.name,
description: t.description,
parameters: zodToJsonSchema(t.parameters, { target: "openApi3" }),
},
}));const { currentContentIndex: globalContextIndex } = requestContext.get();
state.currentContentIndex = calculateInitialContentIndex(
this.agentName,
globalContextIndex ?? 0,
eventManager.index,
unifiedApiVersion,
);
// Persist to global request context
requestContext.set({ currentContentIndex: state.currentContentIndex });
// Sync EventManager index
eventManager.index = state.currentContentIndex;while (this.agentConfig.shouldContinueLoop(state)) {
// 5a. Track iteration info
toolIterationInfo.push({
iteration: state.currentIteration + 1,
toolCalls: [],
layout: {},
});
// 5b. Decide if tools should be sent
const shouldSendTools = this.agentConfig.shouldSendTools(state, openAiTools, schema);
// 5c. Get model ID (agent may override)
const modelId = this.agentConfig.getModelId
? this.agentConfig.getModelId(state, modelConfig)
: modelConfig.id;
// 5d. Prepare LLM input
const llmInput = {
model: modelId,
messages: [...currentMessages, ...state.inLoopTrimmedMessages],
params: {
...(shouldSendTools ? {
tools: openAiTools,
tool_choice: state.toolChoice,
parallel_tool_calls: shouldUseParallelToolCalls(this.agentName, shouldDoParallelTools || this.agentConfig.useParallelTools),
} : {}),
max_tokens: modelConfig.getMaxOutputTokens?.(false),
},
};
// 5e. Call LLM
const chatRequest = await provider.chat(llmInput);
const chatResponse = await chatRequest.stream();
// 5f. Stream response to client
const streamOutput = unifiedApiVersion === "V2"
? await streamer.streamV2(config.request, config.response, chatResponse.data, { contentIndex: state.currentContentIndex })
: await streamer.stream(config.request, config.response, chatResponse.data);
// 5g. Parse tool calls
const toolCallParsed = multipleToolCallSchema.safeParse(streamOutput?.toolCalls);
if (!toolCallParsed.success) {
// Handle invalid tool names
state.inLoopTrimmedMessages.push(
SystemMessage({
content: [{ text: prompts.getInvalidToolNamePrompt(toolCallParsed.error.message), type: "TEXT", tokens: ... }],
}),
);
state.hasToolCalls = true;
continue;
}
const toolCalls = await getTokenizedToolCalls(toolCallParsed.data);
// 5h. Extract content
const llmResponseContent = streamOutput?.content.join("") || "";
const reasoningContent = streamOutput?.reasoningContent.reduce((acc, current) => {
if (!acc[current.index]) acc[current.index] = current;
else {
if (current.text) acc[current.index].text += current.text;
if (current.id) acc[current.index].id = current.id;
if (current.signature) acc[current.index].signature = current.signature;
}
return acc;
}, []);
// 5i. Process tool calls if agent has hook
if (this.agentConfig.processToolCalls && toolCalls?.length > 0) {
const shouldBreak = await this.agentConfig.processToolCalls(toolCalls, state, agentContext);
if (shouldBreak) {
assistantMessageNode.contentV2.push({
type: "TEXT",
text: llmResponseContent,
tokens: (await tokenizer.encode(llmResponseContent)).length,
});
state.hasToolCalls = false;
break;
}
}
// 5j. Handle no tool calls
if (!toolCalls || toolCalls.length === 0) {
if (this.agentConfig.onNoToolCalls) {
const noCallsGeneratedResult = await this.agentConfig.onNoToolCalls(state, agentContext, "NO_CALLS_GENERATED");
if (noCallsGeneratedResult.shouldContinue) {
if (state.messages) currentMessages = state.messages;
continue;
}
}
state.hasToolCalls = false;
}
// 5k. Execute tools
const toolResults: TToolResultTypeWithMetadata[] = [];
let toolExecutionProgress: EventManager | null = null;
const successfulToolNames: TToolName[] = [];
if (toolCalls?.length) {
const toolExecutionEvent = eventManager.createEvent("Selecting tools", "LIGHTBULB");
state.hasToolCalls = true;
if (this.agentConfig.onToolsSelected) {
await this.agentConfig.onToolsSelected(toolCalls, state, agentContext);
}
const toolExecutionResults: TToolExecutionResults = {};
// Stream tool calls to client
streamMessage({ type: "toolCalls", contentIndex: state.currentContentIndex, toolCalls });
// Mark tools as used
toolCalls.forEach((tool) => {
this.chatCtx.chatState.addUsedMode(toolNameMap[tool.function.name]);
});
if (toolCalls.length) state.currentContentIndex++;
state.lastIterationErroredOutTools = [];
state.toolsNotEnabledList = [];
let currentToolResultIndex = -1;
// Execute tools with streaming
for await (const chunk of this.executeRequestedTools(toolCalls, state.currentContentIndex, agentContext)) {
if (eventManager.index > state.currentContentIndex) {
state.currentContentIndex = eventManager.index;
}
if (this.agentConfig.enableGlobalContextUpdate) {
requestContext.set({ currentContentIndex: state.currentContentIndex });
}
if (chunk.type !== "tool:progress" && this.agentConfig.onToolProgress) {
this.agentConfig.onToolProgress(chunk.function.name, toolExecutionEvent, agentContext);
}
switch (chunk.type) {
case "tool:progress":
toolExecutionProgress = chunk.progress;
state.currentContentIndex++;
break;
case "tool:error":
state.lastIterationErroredOutTools.push(chunk.function.name);
toolExecutionResults[chunk.id] = {
type: chunk.type,
result: chunk.error.slice(0, TOOL_ERROR_MESSAGE_MAX_LENGTH),
};
if (chunk.error === "TOOL_NOT_ENABLED") {
state.toolsNotEnabledList.push(chunk.function.name);
}
this.currentToolMetadata[chunk.id] = {
saveToDB: false,
summary: toolExecutionResults[chunk.id].result,
};
break;
case "tool:stream":
// Handle streaming tool result
lastLoopProgressEvent = toolExecutionProgress?.getOrCreateLatestEvent(getRandomGeneratingMessage(), "WAND");
streamMessage({
type: "toolCallResult",
contentIndex: state.currentContentIndex,
toolCallResult: {
index: currentToolResultIndex,
result: "",
toolCallId: chunk.id,
function: { name: chunk.function.name },
},
});
const safeContentIndex = validateAndSyncStreamingIndex(state.currentContentIndex);
const content = unifiedApiVersion === "V2"
? await streamer.streamV2(config.request, config.response, chunk.content.stream, {
contentIndex: safeContentIndex,
streamAsToolResult: { toolResultIndex: currentToolResultIndex },
})
: await streamer.stream(config.request, config.response, chunk.content.stream);
// Calculate usage for tool stream
try {
const toolStreamUsage = await chunk.content.chatResponse.usage(content.content, {
usage: content.usage,
tools: [chunk.function.name],
});
this.addUsage(toolStreamUsage);
} catch (error) {
logger.warn({ error, toolId: chunk.id, toolName: chunk.function.name }, "WARN/TOOL/STREAM_USAGE_CALCULATION_FAILED");
}
if (toolExecutionResults[chunk.id]) {
toolExecutionResults[chunk.id].result += content.content.join("");
} else {
toolExecutionResults[chunk.id] = { type: chunk.type, result: content.content.join("") };
}
this.currentToolMetadata[chunk.id] = { saveToDB: true };
lastLoopProgressEvent?.end();
break;
case "tool:start":
currentToolResultIndex++;
break;
case "tool:done":
lastLoopProgressEvent = toolExecutionProgress?.getOrCreateLatestEvent(getRandomGeneratingMessage(), "WAND");
if (!toolExecutionResults[chunk.id]) {
toolExecutionResults[chunk.id] = {
type: chunk.type,
result: chunk.result.result,
};
}
this.currentToolMetadata[chunk.id] = chunk.result;
// Merge sub-agent tool metadata
if (chunk.result.toolMetadata) {
this.currentToolMetadata = { ...this.currentToolMetadata, ...chunk.result.toolMetadata };
}
// Add sub-agent tool call count
if (chunk.result.toolCallCount) {
state.currentIteration += chunk.result.toolCallCount;
}
// Merge sub-agent usage
if (chunk.result.usageConfigArray?.length) {
chunk.result.usageConfigArray.forEach((usageConfig) => {
this.addUsage(usageConfig);
});
}
lastLoopProgressEvent?.end();
break;
}
if (this.agentConfig.enableGlobalContextUpdate) {
requestContext.set({ currentContentIndex: state.currentContentIndex });
}
}
toolExecutionEvent.end();
// Process tool results
for (const [index, call] of toolCalls.entries()) {
const executedTool = toolExecutionResults[call.id];
let result = executedTool?.result;
const metadata = this.currentToolMetadata[call.id];
if (executedTool.type !== "tool:error") {
successfulToolNames.push(call.function.name);
}
// Handle special tools
if (this.agentConfig.handleSpecialTools) {
const specialToolResult = this.agentConfig.handleSpecialTools(call.function.name, result, call, agentContext);
if (specialToolResult) {
if (specialToolResult.modifiedResult) result = specialToolResult.modifiedResult;
if (specialToolResult.shouldForceToolChoice) state.toolChoice = specialToolResult.shouldForceToolChoice;
if (specialToolResult.modifiedMessages) currentMessages = specialToolResult.modifiedMessages;
}
}
// Stream tool result to client
if (toolExecutionResults[call.id].type !== "tool:stream" && toolExecutionResults[call.id].type !== "tool:error") {
streamMessage({
type: "toolCallResult",
contentIndex: state.currentContentIndex,
toolCallResult: {
index,
result: JSON.stringify(result),
toolCallId: call.id,
function: { name: call.function.name },
},
});
}
const content = JSON.stringify(result);
// Add to tool results with metadata
if (metadata.saveToDB) {
toolResults.push({
toolCallId: call.id,
content: content,
tokens: (await tokenizer.encode(content)).length,
shouldIncludeInHistory: true,
function: { name: call.function.name },
});
} else {
toolResults.push({
toolCallId: call.id,
content: content,
tokens: (await tokenizer.encode(content)).length,
shouldIncludeInHistory: false,
summary: JSON.stringify(metadata.summary),
function: { name: call.function.name },
});
}
// Track for tool iteration info
toolIterationInfo[toolIterationInfo.length - 1].toolCalls.push({
name: call.function.name,
args: call.function.arguments,
result: content,
});
}
}
// 5l. Calculate usage
let streamUsage = await chatResponse.usage(streamOutput.content, {
usage: streamOutput.usage,
tools: successfulToolNames,
});
if (this.agentConfig.calculateUsage) {
streamUsage = await this.agentConfig.calculateUsage(streamUsage, streamOutput, systemPromptTokens, openAiTools, shouldSendTools);
}
this.addUsage(streamUsage);
// 5m. Run token engine for context optimization
const trimmedMessages = await engine({
messages: currentMessages,
inLoop: state.inLoopTrimmedMessages,
contextLimit: contextLimit,
response: {
tokens: streamUsage.tokens,
toolCalls: toolCalls,
toolResults: await formatToolResultsForLLM(toolResults),
content: llmResponseContent,
reasoning: reasoningContent,
},
agentName: this.agentName,
}, toolIterationInfo[toolIterationInfo.length - 1]);
state.inLoopTrimmedMessages = trimmedMessages.inLoop;
// 5n. Add summary prompt if many tool results
if (toolResults.length > 2) {
state.inLoopTrimmedMessages.push(
SystemMessage({
content: [{ type: "TEXT", text: TOOL_RESULTS_CONTEXT_SUMMARY_PROMPT, tokens: TOOL_RESULTS_CONTEXT_SUMMARY_TOKENS }],
}),
);
}
// 5o. Handle summary from token engine
if (trimmedMessages.summary) {
const { messages: adjustedMessages } = await buildMessages(
this.chatCtx,
config,
this.registry,
this.agentName,
trimmedMessages.summary,
);
currentMessages = adjustedMessages;
if (trimmedMessages.shouldDumpSummaryInDB) {
assistantMessageNode.isChatSummarizedSoFar = true;
assistantMessageNode.summary = trimmedMessages.summary;
}
}
// 5p. Update chat context messages
this.chatCtx.messages = [...currentMessages, ...state.inLoopTrimmedMessages];
// 5q. Store data based on policy
const dataPolicy = this.agentConfig.getDataPolicy();
if (dataPolicy.shouldStoreText) {
const reasoningContentText = reasoningContent.reduce((acc, current) => acc + current.text, "");
assistantMessageNode.contentV2.push({
type: "TEXT",
toolCalls: filterToolCallsByPolicy(dataPolicy, toolCalls),
text: llmResponseContent,
tokens: (await tokenizer.encode(llmResponseContent)).length,
...(reasoningContent ? {
reasoning: {
content: reasoningContentText,
tokens: (await tokenizer.encode(reasoningContentText)).length,
},
} : {}),
});
}
if (toolResults.length) {
const progressEvents = toolExecutionProgress?.getEvents() || [];
const steps = progressEvents.map((event) => event.serialize());
assistantMessageNode.contentV2.push({
type: "PROGRESS",
name: toolExecutionProgress?.name,
icon: toolExecutionProgress?.icon,
steps: steps,
metadata: toolExecutionProgress?.metadata,
});
if (dataPolicy.shouldStoreToolResults) {
assistantMessageNode.contentV2.push({
type: "TOOL_RESULT",
toolResults: await settleAllOrThrow(
filterToolResultsByPolicy(toolResults, dataPolicy).map((result) =>
getFormattedAndTokenizedToolResult(result, this.agentName),
),
),
});
}
}
// 5r. Handle disabled tools
if (state.toolsNotEnabledList.length > 0 && this.agentConfig.onNoToolCalls) {
await this.agentConfig.onNoToolCalls(state, agentContext, "TOOLS_DISABLED", toolCalls);
}
// 5s. Update iteration counter
const iterativeToolCalls = toolCalls?.filter(
(call) => !NON_ITERATIVE_TOOLS.some((tool) => call.function.name === `${tool.toLowerCase()}_tool`),
) || [];
if (toolCalls?.length) {
state.currentIteration += iterativeToolCalls.length;
} else {
state.currentIteration++;
}
// 5t. Check for limit reached
if (
state.currentIteration >= state.effectiveToolCallsLimit - 1 &&
state.hasToolCalls &&
this.agentConfig.onNoToolCalls &&
!state.isMainThreadFinalLoopPromptInjected
) {
const limitReachedResult = await this.agentConfig.onNoToolCalls(state, agentContext, "LIMIT_REACHED");
if (limitReachedResult.shouldContinue) {
if (state.messages) {
currentMessages = state.messages;
state.isMainThreadFinalLoopPromptInjected = true;
state.currentIteration = state.effectiveToolCallsLimit - 1;
}
}
}
state.currentContentIndex++;
}if (lastLoopProgressEvent) lastLoopProgressEvent.end();
if (eventManager) {
eventManager.index = state.currentContentIndex;
}
if (this.agentConfig.enableGlobalContextUpdate) {
requestContext.set({
currentContentIndex: state.currentContentIndex,
toolIterationInfo: toolIterationInfo,
});
}
return {
result: "OK",
finalIteration: state.currentContentIndex,
toolCallCount: state.currentIteration,
usageConfigArray: this.usageConfigArray,
};export class Tool {
name: TToolName;
description: string;
parameters: z.ZodSchema;
selectionRule: string;
execute: (
args: any,
chatCtx: TChatContext,
progress: EventManager,
toolContext: any,
) => Promise<any | AsyncIterable<any>>;
shouldUse?: (chatCtx: TChatContext) => Promise<boolean>;
}Tools are registered with the ToolRegistry:
const registry = new ToolRegistry(DEFAULT_TOOLS);
registry.register(customTool);
registry.unregister(toolName);
registry.clear();shouldUse() is called if presentexecute() is calledhandleSpecialTools() hookTools can return metadata about their execution:
interface TToolMetadata {
[toolCallId: string]: {
saveToDB: boolean;
summary?: string;
// ... other metadata
};
}This metadata controls:
export type TOrchestratorState = {
// Core state
currentIteration: number; // Current loop iteration
currentContentIndex: number; // Streaming index
hasToolCalls: boolean; // Whether tools were called
toolChoice: TToolChoice; // Tool choice for next LLM call
// Tool execution state
effectiveToolCallsLimit: number; // Actual tool call limit
usageConfigArray: TUsageConfig[]; // Token usage tracking
toolMetadata: TToolMetadata; // Tool execution metadata
// Messaging state
inLoopTrimmedMessages: TPromptMessage[]; // Messages from current loop
messages: TPromptMessage[]; // Full message history
systemPromptTokens?: number; // System prompt token count
// Error tracking
lastIterationErroredOutTools: TToolName[]; // Tools that errored last iteration
toolsNotEnabledList: TToolName[]; // Tools not enabled for user
// Multi-agent state
lastCountedSubAgentTools: number; // Tool count from sub-agents
// Main thread specific
isMainThreadFinalLoopPromptInjected: boolean; // Whether final prompt was injected
// Agent-specific state
deepResearchRetryCount?: number; // Retry count for deep research
};initializeOrchestrationState()initializeAgentState() hook is calledThe orchestration system uses a global streaming index to ensure consistent ordering of streamed content across agents and sub-agents.
// Initialize
state.currentContentIndex = calculateInitialContentIndex(
this.agentName,
globalContextIndex ?? 0,
eventManager.index,
unifiedApiVersion,
);
// Validate before use
const safeContentIndex = validateAndSyncStreamingIndex(currentContentIndex);
// Update after streaming
if (eventManager.index > state.currentContentIndex) {
state.currentContentIndex = eventManager.index;
}
// Sync to global context
if (this.agentConfig.enableGlobalContextUpdate) {
requestContext.set({ currentContentIndex: state.currentContentIndex });
}export type TStreamMessageOptions =
| TTextStreamOptions // Text content
| TToolCallsStreamOptions // Tool calls
| TReasoningStreamOptions // Reasoning content
| TToolCallResultStreamOptions; // Tool resultsThe EventManager manages progress events for tool execution:
const progress = EventManager.init(safeToolIndex, "V2");
// Create events
const event = progress.createEvent("Tool name", "ICON");
event.send([{ type: "TEXT", value: "Working with tool" }]);
event.end();
// Get serialized steps
const steps = progress.getEvents().map((event) => event.serialize());MainThreadAgent (user-facing)
โ
DeepResearchSupervisor (orchestrates research)
โ
ResearcherAgent (sub-agent for individual tasks)Agents communicate through:
The researcher_agent_tool is used to spawn researcher sub-agents:
const researcherResult = await researcherAgentTool.execute(
{ query: "research topic", ... },
chatCtx,
progress,
toolContext,
);
// Result includes:
// - result: Research findings
// - toolMetadata: Tool execution metadata
// - toolCallCount: Number of tools used by sub-agent
// - usageConfigArray: Token usage from sub-agentMain agents update global context, sub-agents do not:
// Main thread
enableGlobalContextUpdate: true,
// Researcher (sub-agent)
enableGlobalContextUpdate: false,This prevents sub-agents from interfering with the main thread's streaming index.
export type TAgentDataPolicy = {
shouldStoreToolCalls: boolean; // Store tool calls to DB
shouldStoreToolResults: boolean; // Store tool results to DB
shouldStoreText: boolean; // Store LLM text to DB
filterToolCalls?: (toolName: TToolName) => boolean | null; // Filter which tools to store
filterToolResults?: (toolName: TToolName) => boolean | null; // Filter which results to store
};Main Thread Policy (store everything):
{
shouldStoreToolCalls: true,
shouldStoreToolResults: true,
shouldStoreText: true,
filterToolCalls: null,
filterToolResults: null,
}Deep Research Policy (only research tools):
{
shouldStoreToolCalls: true,
shouldStoreToolResults: true,
shouldStoreText: true,
filterToolCalls: (toolName) => [
toolNames.Enum.researcher_agent_tool,
toolNames.Enum.feedback_questions_tool,
toolNames.Enum.report_generation_tool,
].includes(toolName),
filterToolResults: (toolName) => [
toolNames.Enum.researcher_agent_tool,
toolNames.Enum.feedback_questions_tool,
toolNames.Enum.report_generation_tool,
].includes(toolName),
}// Filter tool calls before storage
const filteredToolCalls = filterToolCallsByPolicy(dataPolicy, toolCalls);
// Filter tool results before storage
const filteredToolResults = filterToolResultsByPolicy(toolResults, dataPolicy);
// Store based on policy
if (dataPolicy.shouldStoreToolCalls) {
assistantMessageNode.contentV2.push({
type: "TEXT",
toolCalls: filteredToolCalls,
...
});
}
if (dataPolicy.shouldStoreToolResults) {
assistantMessageNode.contentV2.push({
type: "TOOL_RESULT",
toolResults: filteredToolResults,
...
});
}export type TUsageConfig = {
model: string;
tokens: {
prompt: number;
completion: number;
cached: number;
reasoning: number;
};
tools: string[];
};Usage is collected at multiple points:
let streamUsage = await chatResponse.usage(streamOutput.content, {
usage: streamOutput.usage,
tools: successfulToolNames,
});const toolStreamUsage = await chunk.content.chatResponse.usage(content.content, {
usage: content.usage,
tools: [chunk.function.name],
});
this.addUsage(toolStreamUsage);if (chunk.result.usageConfigArray?.length) {
chunk.result.usageConfigArray.forEach((usageConfig) => {
this.addUsage(usageConfig);
});
}if (this.agentConfig.calculateUsage) {
streamUsage = await this.agentConfig.calculateUsage(
streamUsage,
streamOutput,
systemPromptTokens,
openAiTools,
shouldSendTools,
);
}addUsage(usageConfig: TUsageConfig) {
this.usageConfigArray.push(usageConfig);
}Final usage array is returned and used for billing.
try {
const parsedArgs = this.agentConfig.preprocessToolArguments
? await this.agentConfig.preprocessToolArguments(toolCall, ctx, this.toolMetadata, agentContext)
: JSON.parse(args);
const resultOrStream = await tool.execute(parsedArgs, ctx, progress, toolContext);
// ... process result
} catch (e) {
logger.warn({ error: e, args, name, id }, `WARN/TOOL/RUN-SINGLE-TOOL`);
yield {
type: "tool:error",
id,
function: { name },
error: e instanceof Error ? e.message : String(e),
};
}if (tool.shouldUse && !(await tool.shouldUse(ctx))) {
logger.info({ args, name, id }, `INFO/TOOL/TOOL_NOT_ENABLED`);
yield {
type: "tool:error",
id,
function: { name },
error: "TOOL_NOT_ENABLED",
};
return;
}const toolCallParsed = multipleToolCallSchema.safeParse(streamOutput?.toolCalls);
if (!toolCallParsed.success) {
state.inLoopTrimmedMessages.push(
SystemMessage({
content: [{
text: prompts.getInvalidToolNamePrompt(toolCallParsed.error.message),
type: "TEXT",
tokens: (await tokenizer.encode(toolCallParsed.error.message)).length,
}],
}),
);
state.hasToolCalls = true;
continue;
}state.lastIterationErroredOutTools: TToolName[];
state.toolsNotEnabledList: TToolName[];
// When tool errors
state.lastIterationErroredOutTools.push(chunk.function.name);
// When tool not enabled
if (chunk.error === "TOOL_NOT_ENABLED") {
state.toolsNotEnabledList.push(chunk.function.name);
}Tools can be executed in parallel:
const tasks = calls.map((call, index) =>
this.runSingleTool(call.function.name, call.id, call.function.arguments, progress, agentContext, index),
);
for await (const work of tasks) {
for await (const chunk of work) yield chunk;
}Parallel execution is controlled by:
useParallelTools: booleanshouldDoParallelToolCallsALWAYS_ALLOW_MULTI_TOOL_CALL_MODELSAgents can force specific tools:
// In handleSpecialTools hook
if (toolName === toolNames.Enum.chatbot_tool) {
const parsedArgs = JSON.parse(toolCall.function.arguments);
if (parsedArgs?.shouldUseKnowledgeBase) {
return {
shouldForceToolChoice: {
type: "function",
function: { name: toolNames.Enum.rag_tool },
},
};
}
}
// Applied in next iteration
state.toolChoice = specialToolResult.shouldForceToolChoice;Agents can modify messages based on tool results:
// In handleSpecialTools hook
if (toolName === toolNames.enum.rag_tool) {
const modifiedMessages = JSON.parse(JSON.stringify(context.chatCtx?.messages || []));
const userMessageIndex = modifiedMessages.findLastIndex(
(msg) => msg.role === "user" && msg.content[0]?.type === "TEXT",
);
if (userMessageIndex !== -1) {
const targetMessageContent = modifiedMessages[userMessageIndex].content[0];
(targetMessageContent as { text: string }).text = `${pastedText}\nUser Query: ${(targetMessageContent as { text: string }).text}`;
}
return { modifiedMessages, modifiedResult };
}
// Applied
currentMessages = specialToolResult.modifiedMessages;The token engine optimizes context usage:
const trimmedMessages = await engine({
messages: currentMessages,
inLoop: state.inLoopTrimmedMessages,
contextLimit: contextLimit,
response: {
tokens: streamUsage.tokens,
toolCalls: toolCalls,
toolResults: await formatToolResultsForLLM(toolResults),
content: llmResponseContent,
reasoning: reasoningContent,
},
agentName: this.agentName,
}, toolIterationInfo[toolIterationInfo.length - 1]);The engine:
Some models receive few-shot examples:
if (ALLOW_FEW_SHOT_MODELS.includes(modelConfig.id)) {
messages.push(...FEW_SHOT_EXAMPLES);
}These examples show the LLM how to use tools correctly.
Some tool results are wrapped to prevent re-generation:
case toolNames.Enum.craft_tool:
const newToolResult = { ...result };
newToolResult.content = `The craft has been generated and sent to the user. You are not supposed to send the same result again.
<CRAFT_GENERATED_THAT_HAS_BEEN_SENT_TO_USER>
${result.content}
</CRAFT_GENERATED_THAT_HAS_BEEN_SENT_TO_USER>`;
return newToolResult;This prevents the LLM from re-generating content already sent to the user.
Agents can have custom tool call limits:
const orchestrator = new ToolOrchestrator(
chatCtx,
agentName,
toolRegistry,
agentConfig,
customToolCallLimit, // Optional override
);This is used in deep research to allow more iterations.
Agents can override the model:
getModelId: (state, baseModelConfig) => {
const retryCount = state.deepResearchRetryCount || 0;
return retryCount > 1 ? getLLMModel("gpt-4.1-mini").id : baseModelConfig.id;
}This allows switching to cheaper models after retries.
The orchestration layer is a sophisticated, custom-built system that provides:
The system is production-ready and handles complex scenarios like deep research with sub-agents, streaming tool execution, and context window optimization - all without relying on external orchestration frameworks.