InkdownInkdown
Start writing

Study

59 files·8 subfolders

Shared Workspace

Study
core

01-Orchestration

Shared from "Study" on Inkdown

Orchestration Architecture

Overview

The orchestration layer is the brain of the Arcane system. It manages multi-agent workflows, handles iterative tool calling, maintains conversation state, and coordinates all AI-driven operations.


Architecture Flow

Plain text
REQUEST → ToolOrchestrator → Agent Config → Loop Iteration → LLM Stream
                                              ↓
                              ┌─────────────────────────────┐
                              │   WHILE LOOP (Iterative)    │
                              │  1. Build LLM Input         │
                              │  2. Stream Response         │
                              │  3. Parse Tool Calls        │
                              │  4. Execute Tools (Parallel)│
                              │  5. Process Results         │
                              │  6. Context Trimming        │
                              │  7. Update State            │
                              └─────────────────────────────┘
                                              ↓
                                    ToolRegistry → Individual Tools
programming-language-concepts.md
zero-language-explanation.md
DB
01-introduction.md
02-relational-databases.md
03-database-design.md
04-indexing.md
05-transactions-acid.md
06-nosql-databases.md
07-query-optimization.md
08-replication-ha.md
09-sharding-partitioning.md
10-caching-strategies.md
11-cap-theorem.md
12-connection-pooling.md
13-backup-recovery.md
14-monitoring.md
15-database-selection.md
README.md
JS
Event loop
Merlin Backend
01-Orchestration.md
02-DeepResearch.md
03-Search.md
04-Scraping.md
05-Streaming.md
06-MultiProviderLLM.md
07-MemoryAndContext.md
08-ErrorHandling.md
09-RateLimiting.md
10-TaskQueue.md
11-SecurityAndAuth.md
Orchestration-2nd-draft
OpenAI Agents Python
00_OVERVIEW.md
01_AGENT_SYSTEM.md
02_RUNNER_SYSTEM.md
03_TOOL_SYSTEM.md
04_ITEMS_SYSTEM.md
05_GUARDRAILS.md
06_HANDOFFS.md
07_MEMORY_SESSIONS.md
08_MODEL_PROVIDERS.md
09_SANDBOX_SYSTEM.md
10_TRACING.md
11_RUN_STATE.md
12_CONTEXT.md
13_LIFECYCLE_HOOKS.md
14_CONFIGURATION.md
15_ERROR_HANDLING.md
16_STREAMING.md
17_EXTENSIONS.md
18_MCP_INTEGRATION.md
19_BEST_PRACTICES.md
20_ARCHITECTURE_PATTERNS.md
opencode-study
context-handling
core
Python
Alembic
Basics
sqlalchemy - fastapi
SQLAlchemy overview
tweets
system_design_for_agentic_apps.md

Core: ToolOrchestrator Class

File: src/server/endpoints/unified/orchestrator/toolOrchestrator.ts:69

TypeScript
export class ToolOrchestrator {
	private chatCtx: TChatContext; // Conversation context
	private registry: ToolRegistry; // Available tools
	private agentName: TAgentName; // Which agent is running
	private customToolCallLimit?: number; // Override default limit
	private agentConfig: TAgentConfig; // Agent behavior config

	usageConfigArray: TUsageConfig[] = []; // Track token usage
	private currentToolMetadata: TToolMetadata = {}; // Tool result metadata

	// Tracks currently executing tool for error context
	public currentExecutingTool?: {
		toolCallId: string;
		functionName: string;
		toolIndex: number;
	};

	constructor(
		chatCtx: TChatContext,
		agentName: TAgentName = AGENT_NAMES.MAIN_THREAD,
		toolRegistry: ToolRegistry,
		agentConfig: TAgentConfig,
		customToolCallLimit?: number,
	) {
		this.chatCtx = chatCtx;
		this.registry = toolRegistry;
		this.agentName = agentName;
		this.agentConfig = agentConfig;
		this.customToolCallLimit =
			customToolCallLimit && customToolCallLimit > 0
				? customToolCallLimit
				: undefined;
	}
}

Purpose:

  • Maintains state across multiple tool-calling iterations
  • Coordinates between LLM, tools, and streaming
  • Manages token usage and context limits
  • Supports sub-agents (Deep Research spawns ResearcherAgent)

The Main Loop (Heart of the System)

File: src/server/endpoints/unified/orchestrator/toolOrchestrator.ts:237

This is where the magic happens - iterative reasoning with tools:

TypeScript
async run(config: TToolOrchestratorRunInput) {
    // Initialize state
    this.currentToolMetadata = {};
    const { state, agentContext, messages, systemPromptTokens, contextLimit,
            shouldDoParallelToolCalls, modelConfig } =
        await this.initializeOrchestrationState(config);
    let currentMessages = messages;

    // Initialize agent-specific state
    if (this.agentConfig.initializeAgentState) {
        this.agentConfig.initializeAgentState(state);
    }

    // Convert registry tools to OpenAI format
    const openAiTools: ChatCompletionTool[] = this.registry.getAll().map(t => ({
        type: "function",
        function: {
            name: t.name,
            description: t.description,
            parameters: zodToJsonSchema(t.parameters, { target: "openApi3" }),
        },
    }));

    // Calculate starting content index (for streaming)
    state.currentContentIndex = calculateInitialContentIndex(
        this.agentName,
        globalContextIndex ?? 0,
        eventManager.index,
        unifiedApiVersion,
    );

    // Sync with EventManager
    requestContext.set({ currentContentIndex: state.currentContentIndex });
    eventManager.index = state.currentContentIndex;

    // For reasoning models, start reasoning stream
    if (REASONING_MODELS.includes(schema.schema.model)) {
        streamMessage({
            type: "reasoning",
            contentIndex: state.currentContentIndex,
            reasoning: "",
        });
    }

    let lastLoopProgressEvent: SSEProgressEvent | undefined;
    const toolIterationInfo: TToolIterationInfo[] = [];

    // ═══════════════════════════════════════════════════════════
    // MAIN ORCHESTRATION LOOP
    // ═══════════════════════════════════════════════════════════
    while (this.agentConfig.shouldContinueLoop(state)) {

        // Track iteration info
        toolIterationInfo.push({
            iteration: state.currentIteration + 1,
            toolCalls: [],
            layout: {},
        });

        // 1. Should we send tools this iteration?
        const shouldSendTools = this.agentConfig.shouldSendTools(
            state, openAiTools, schema.schema
        );

        // 2. Get model ID (agent can override)
        const modelId = this.agentConfig.getModelId
            ? this.agentConfig.getModelId(state, modelConfig)
            : modelConfig.id;

        // 3. Build LLM input
        const llmInput = {
            model: modelId,
            messages: [...currentMessages, ...state.inLoopTrimmedMessages],
            params: {
                ...(shouldSendTools ? {
                    tools: openAiTools,
                    tool_choice: state.toolChoice,
                    parallel_tool_calls: shouldUseParallelToolCalls(
                        this.agentName,
                        shouldDoParallelToolCalls || this.agentConfig.useParallelTools
                    ),
                } : {}),
                max_tokens: modelConfig.getMaxOutputTokens?.(false),
            },
        };

        // 4. Request LLM (returns stream)
        const chatRequest = await provider.chat(llmInput);
        const chatResponse = await chatRequest.stream();

        // 5. Reset iteration state
        state.hasToolCalls = false;
        state.toolChoice = "auto";
        lastLoopProgressEvent?.end();
        state.currentContentIndex = validateAndSyncStreamingIndex(state.currentContentIndex);

        // 6. Stream to client and capture output
        const streamOutput = unifiedApiVersion === "V2"
            ? await streamer.streamV2(config.request, config.response, chatResponse.data, {
                contentIndex: state.currentContentIndex,
            })
            : await streamer.stream(config.request, config.response, chatResponse.data);

        // 7. Parse and validate tool calls
        const toolCallParsed = multipleToolCallSchema.safeParse(streamOutput?.toolCalls);

        if (!toolCallParsed.success) {
            // Invalid tool names - add error message and retry
            state.inLoopTrimmedMessages.push(
                SystemMessage({
                    content: [{
                        text: prompts.getInvalidToolNamePrompt(toolCallParsed.error.message),
                        type: "TEXT",
                        tokens: (await tokenizer.encode(toolCallParsed.error.message)).length,
                    }],
                })
            );
            state.hasToolCalls = true;
            continue; // Retry iteration
        }

        const toolCalls = await getTokenizedToolCalls<TToolCall>(toolCallParsed.data);
        const llmResponseContent = streamOutput?.content.join("") || "";

        // 8. Process reasoning content if present
        const reasoningContent = streamOutput?.reasoningContent.reduce((acc, current) => {
            if (!acc[current.index]) {
                acc[current.index] = current;
            } else {
                if (current.text) acc[current.index].text += current.text;
                if (current.id) acc[current.index].id = current.id;
                if (current.signature) acc[current.index].signature = current.signature;
            }
            return acc;
        }, []);

        // 9. Check for tools that errored in previous iteration
        if (state.lastIterationErroredOutTools?.length) {
            const notRetried = state.lastIterationErroredOutTools.filter(
                tool => !toolCallParsed.data?.some(t => t.function.name === tool)
            );
            if (notRetried.length) {
                logger.error({ toolsNotChosenAfterError: notRetried },
                    `ERROR/TOOL/RUN-SINGLE-TOOL/TOOLS-NOT-CHOSEN-AFTER-ERROR`);
            }
            state.lastIterationErroredOutTools = [];
        }

        // 10. Agent-specific tool call processing
        if (this.agentConfig.processToolCalls && toolCalls?.length > 0) {
            const shouldBreak = await this.agentConfig.processToolCalls(
                toolCalls, state, agentContext
            );
            if (shouldBreak) {
                assistantMessageNode.contentV2.push({
                    type: "TEXT",
                    text: llmResponseContent,
                    tokens: (await tokenizer.encode(llmResponseContent)).length,
                });
                state.hasToolCalls = false;
                break;
            }
        }

        // 11. Handle no tool calls case
        if (!toolCalls || toolCalls.length === 0) {
            if (this.agentConfig.onNoToolCalls) {
                const noCallsResult = await this.agentConfig.onNoToolCalls(
                    state, agentContext, "NO_CALLS_GENERATED"
                );
                if (noCallsResult.shouldContinue) {
                    if (state.messages) {
                        currentMessages = state.messages;
                    }
                    continue;
                }
            }
            state.hasToolCalls = false;
        }

        // 12. EXECUTE TOOLS (Parallel)
        if (toolCalls?.length) {
            logger.info({
                currentIteration: state.currentIteration,
                effectiveToolCallsLimit: state.effectiveToolCallsLimit,
                toolCallsNames: toolCallParsed.data?.map(t => t.function.name),
            }, "DEBUG/TOOL_CALLS");

            const toolExecutionEvent = eventManager.createEvent("Selecting tools", "LIGHTBULB");
            state.hasToolCalls = true;

            if (this.agentConfig.onToolsSelected) {
                await this.agentConfig.onToolsSelected(toolCalls, state, agentContext);
            }

            const toolExecutionResults: TToolExecutionResults = {};

            streamMessage({ type: "toolCalls", contentIndex: state.currentContentIndex, toolCalls });

            // Mark tools as used in chat state
            toolCalls.forEach(tool => {
                this.chatCtx.chatState.addUsedMode(toolNameMap[tool.function.name]);
            });

            if (toolCalls.length) {
                state.currentContentIndex++;
            }

            state.lastIterationErroredOutTools = [];
            state.toolsNotEnabledList = [];
            let currentToolResultIndex = -1;

            // Execute all tools in parallel and yield results as they complete
            for await (const chunk of this.executeRequestedTools(
                toolCalls, state.currentContentIndex, agentContext
            )) {
                // Update streaming index from EventManager if it advanced
                if (eventManager.index > state.currentContentIndex) {
                    state.currentContentIndex = eventManager.index;
                }

                // Update global context for sub-agents
                if (this.agentConfig.enableGlobalContextUpdate) {
                    requestContext.set({ currentContentIndex: state.currentContentIndex });
                }

                // Handle progress updates
                if (chunk.type !== "tool:progress" && this.agentConfig.onToolProgress) {
                    this.agentConfig.onToolProgress(chunk.function.name, toolExecutionEvent, agentContext);
                }

                // Process different chunk types
                switch (chunk.type) {
                    case "tool:progress": {
                        toolExecutionProgress = chunk.progress;
                        state.currentContentIndex++;
                        break;
                    }
                    case "tool:error": {
                        state.lastIterationErroredOutTools.push(chunk.function.name);
                        if (state.currentIteration === state.effectiveToolCallsLimit) {
                            logger.error({ error: chunk.error }, `ERROR/TOOL/RUN-SINGLE-TOOL`);
                        }
                        toolExecutionResults[chunk.id] = {
                            type: chunk.type,
                            result: chunk.error.slice(0, TOOL_ERROR_MESSAGE_MAX_LENGTH),
                        };
                        if (chunk.error === "TOOL_NOT_ENABLED") {
                            state.toolsNotEnabledList.push(chunk.function.name);
                        }
                        this.currentToolMetadata[chunk.id] = { saveToDB: false, summary: chunk.error };
                        break;
                    }
                    case "tool:stream": {
                        // Streaming tool (e.g., sub-agent)
                        lastLoopProgressEvent = toolExecutionProgress?.getOrCreateLatestEvent(
                            getRandomGeneratingMessage(), "WAND"
                        );
                        streamMessage({
                            type: "toolCallResult",
                            contentIndex: state.currentContentIndex,
                            toolCallResult: { index: currentToolResultIndex, result: "", toolCallId: chunk.id, function: { name: chunk.function.name } },
                        });

                        const safeContentIndex = validateAndSyncStreamingIndex(state.currentContentIndex);

                        // Stream sub-agent output
                        const content = unifiedApiVersion === "V2"
                            ? await streamer.streamV2(config.request, config.response, chunk.content.stream, {
                                contentIndex: safeContentIndex,
                                streamAsToolResult: { toolResultIndex: currentToolResultIndex },
                            })
                            : await streamer.stream(config.request, config.response, chunk.content.stream);

                        // Track sub-agent usage
                        try {
                            const toolStreamUsage = await chunk.content.chatResponse.usage(content.content, {
                                usage: content.usage,
                                tools: [chunk.function.name],
                            });
                            this.addUsage(toolStreamUsage);
                        } catch (error) {
                            logger.warn({ error, toolId: chunk.id, toolName: chunk.function.name },
                                "WARN/TOOL/STREAM_USAGE_CALCULATION_FAILED");
                        }

                        // Accumulate streaming result
                        if (toolExecutionResults[chunk.id]) {
                            toolExecutionResults[chunk.id].result += content.content.join("");
                        } else {
                            toolExecutionResults[chunk.id] = {
                                type: chunk.type,
                                result: content.content.join(""),
                            };
                        }
                        this.currentToolMetadata[chunk.id] = { saveToDB: true };
                        lastLoopProgressEvent?.end();
                        break;
                    }
                    case "tool:start": {
                        currentToolResultIndex++;
                        break;
                    }
                    case "tool:done": {
                        lastLoopProgressEvent = toolExecutionProgress?.getOrCreateLatestEvent(
                            getRandomGeneratingMessage(), "WAND"
                        );
                        if (!toolExecutionResults[chunk.id]) {
                            toolExecutionResults[chunk.id] = {
                                type: chunk.type,
                                result: chunk.result.result,
                            };
                        }
                        this.currentToolMetadata[chunk.id] = chunk.result;

                        // Merge sub-agent metadata
                        if (chunk.result.toolMetadata) {
                            this.currentToolMetadata = { ...this.currentToolMetadata, ...chunk.result.toolMetadata };
                        }
                        if (chunk.result.toolCallCount) {
                            state.currentIteration += chunk.result.toolCallCount;
                        }
                        // Merge sub-agent usage
                        if (chunk.result.usageConfigArray?.length) {
                            chunk.result.usageConfigArray.forEach(usage => this.addUsage(usage));
                        }
                        lastLoopProgressEvent?.end();
                        break;
                    }
                }

                if (this.agentConfig.enableGlobalContextUpdate) {
                    requestContext.set({ currentContentIndex: state.currentContentIndex });
                }
            }

            toolExecutionEvent.end();

            // Process all tool results
            const successfulToolNames: TToolName[] = [];
            for (const [index, call] of toolCalls.entries()) {
                const executedTool = toolExecutionResults[call.id];
                let result = executedTool?.result;
                const metadata = this.currentToolMetadata[call.id];

                if (executedTool.type !== "tool:error") {
                    successfulToolNames.push(call.function.name);
                }

                // Agent can modify or intercept results
                if (this.agentConfig.handleSpecialTools) {
                    const specialResult = this.agentConfig.handleSpecialTools(
                        call.function.name, result, call, agentContext
                    );
                    if (specialResult) {
                        if (specialResult.modifiedResult) result = specialResult.modifiedResult;
                        if (specialResult.shouldForceToolChoice) state.toolChoice = specialResult.shouldForceToolChoice;
                        if (specialResult.modifiedMessages) currentMessages = specialResult.modifiedMessages;
                    }
                }

                // Stream non-streaming results
                if (toolExecutionResults[call.id].type !== "tool:stream" &&
                    toolExecutionResults[call.id].type !== "tool:error") {
                    streamMessage({
                        type: "toolCallResult",
                        contentIndex: state.currentContentIndex,
                        toolCallResult: {
                            index,
                            result: JSON.stringify(result),
                            toolCallId: call.id,
                            function: { name: call.function.name },
                        },
                    });
                }

                const content = JSON.stringify(result);

                // Prepare tool results for context
                if (metadata.saveToDB) {
                    toolResults.push({
                        toolCallId: call.id,
                        content: content,
                        tokens: (await tokenizer.encode(content)).length,
                        shouldIncludeInHistory: true,
                        function: { name: call.function.name },
                    });
                } else {
                    toolResults.push({
                        toolCallId: call.id,
                        content: content,
                        tokens: (await tokenizer.encode(content)).length,
                        shouldIncludeInHistory: false,
                        summary: JSON.stringify(metadata.summary),
                        function: { name: call.function.name },
                    });
                }

                toolIterationInfo[toolIterationInfo.length - 1].toolCalls.push({
                    name: call.function.name,
                    args: call.function.arguments,
                    result: content,
                });
            }
        }

        // 13. Calculate token usage
        let streamUsage = await chatResponse.usage(streamOutput.content, {
            usage: streamOutput.usage,
            tools: successfulToolNames,
        });

        if (this.agentConfig.calculateUsage) {
            streamUsage = await this.agentConfig.calculateUsage(
                streamUsage, streamOutput, systemPromptTokens, openAiTools, shouldSendTools
            );
        }
        this.addUsage(streamUsage);

        // 14. CONTEXT TRIMMING via Engine
        const trimmedMessages = await engine({
            messages: currentMessages,
            inLoop: state.inLoopTrimmedMessages,
            contextLimit: contextLimit,
            response: {
                tokens: streamUsage.tokens,
                toolCalls: toolCalls,
                toolResults: await formatToolResultsForLLM(toolResults),
                content: llmResponseContent,
                reasoning: reasoningContent,
            },
            agentName: this.agentName,
        }, toolIterationInfo[toolIterationInfo.length - 1]);

        state.inLoopTrimmedMessages = trimmedMessages.inLoop;

        // Add summary prompt if many tool results
        if (toolResults.length > 2) {
            state.inLoopTrimmedMessages.push(
                SystemMessage({
                    content: [{
                        type: "TEXT",
                        text: TOOL_RESULTS_CONTEXT_SUMMARY_PROMPT,
                        tokens: TOOL_RESULTS_CONTEXT_SUMMARY_TOKENS,
                    }],
                })
            );
        }

        // If history was summarized, rebuild messages
        if (trimmedMessages.summary) {
            const { messages: adjustedMessages } = await buildMessages(
                this.chatCtx, config, this.registry, this.agentName, trimmedMessages.summary
            );
            currentMessages = adjustedMessages;
            if (trimmedMessages.shouldDumpSummaryInDB) {
                assistantMessageNode.isChatSummarizedSoFar = true;
                assistantMessageNode.summary = trimmedMessages.summary;
            }
        }

        // Update chat context
        this.chatCtx.messages = [...currentMessages, ...state.inLoopTrimmedMessages];

        // 15. Save assistant response to database
        const dataPolicy = this.agentConfig.getDataPolicy();
        if (dataPolicy.shouldStoreText) {
            const reasoningContentText = reasoningContent.reduce((acc, current) => acc + current.text, "");
            assistantMessageNode.contentV2.push({
                type: "TEXT",
                toolCalls: filterToolCallsByPolicy(dataPolicy, toolCalls),
                text: llmResponseContent,
                tokens: (await tokenizer.encode(llmResponseContent)).length,
                ...(reasoningContent ? {
                    reasoning: {
                        content: reasoningContentText,
                        tokens: (await tokenizer.encode(reasoningContentText)).length,
                    },
                } : {}),
            });
        }

        // Save progress events
        if (toolResults.length) {
            const progressEvents = toolExecutionProgress?.getEvents() || [];
            const steps = progressEvents.map(event => event.serialize());
            assistantMessageNode.contentV2.push({
                type: "PROGRESS",
                name: toolExecutionProgress?.name,
                icon: toolExecutionProgress?.icon,
                steps: steps,
                metadata: toolExecutionProgress?.metadata,
            });

            if (dataPolicy.shouldStoreToolResults) {
                assistantMessageNode.contentV2.push({
                    type: "TOOL_RESULT",
                    toolResults: await settleAllOrThrow(
                        filterToolResultsByPolicy(toolResults, dataPolicy).map(result =>
                            getFormattedAndTokenizedToolResult(result, this.agentName)
                        )
                    ),
                });
            }
        }

        // 16. Handle disabled tools
        if (state.toolsNotEnabledList.length > 0 && this.agentConfig.onNoToolCalls) {
            await this.agentConfig.onNoToolCalls(state, agentContext, "TOOLS_DISABLED", toolCalls);
        }

        // 17. Update iteration counter
        const iterativeToolCalls = toolCalls?.filter(call =>
            !NON_ITERATIVE_TOOLS.some(tool => call.function.name === `${tool.toLowerCase()}_tool`)
        ) || [];

        if (toolCalls?.length) {
            state.currentIteration += iterativeToolCalls.length;
        } else {
            state.currentIteration++;
        }

        // 18. Check limit reached
        if (state.currentIteration >= state.effectiveToolCallsLimit - 1 &&
            state.hasToolCalls && this.agentConfig.onNoToolCalls &&
            !state.isMainThreadFinalLoopPromptInjected) {
            const limitReachedResult = await this.agentConfig.onNoToolCalls(
                state, agentContext, "LIMIT_REACHED"
            );
            if (limitReachedResult.shouldContinue) {
                if (state.messages) {
                    currentMessages = state.messages;
                    state.isMainThreadFinalLoopPromptInjected = true;
                    state.currentIteration = state.effectiveToolCallsLimit - 1;
                }
            }
        }

        state.currentContentIndex++;
    }

    // Final cleanup
    if (lastLoopProgressEvent) lastLoopProgressEvent.end();
    if (eventManager) {
        eventManager.index = state.currentContentIndex;
    }

    if (this.agentConfig.enableGlobalContextUpdate) {
        requestContext.set({
            currentContentIndex: state.currentContentIndex,
            toolIterationInfo: toolIterationInfo,
        });
    }

    return {
        result: "OK",
        finalIteration: state.currentContentIndex,
        toolCallCount: state.currentIteration,
        usageConfigArray: this.usageConfigArray,
    };
}

Why This Loop Matters:

  • Iterative: LLM sees tool results, then decides next action
  • Stateful: Each iteration builds on previous results
  • Streaming: Real-time updates to client
  • Resilient: Tool errors don't crash the system

State Management

File: src/server/endpoints/unified/orchestrator/toolOrchestrator.ts:129

TypeScript
type TOrchestratorState = {
	currentIteration: number; // Tool loop count
	currentContentIndex: number; // SSE stream index (CRITICAL)
	hasToolCalls: boolean; // Continue looping?
	toolChoice: "auto" | "none" | specific;
	effectiveToolCallsLimit: number; // Plan-based limit
	usageConfigArray: TUsageConfig[]; // Token tracking
	toolMetadata: TToolMetadata; // Tool result metadata
	inLoopTrimmedMessages: TPromptMessage[]; // Context from Engine
	messages: TPromptMessage[]; // Current history
	systemPromptTokens: number;
	lastIterationErroredOutTools: string[]; // Failed tools
	toolsNotEnabledList: string[]; // Disabled this iteration
	isMainThreadFinalLoopPromptInjected: boolean;
};

Critical Field: currentContentIndex

This prevents streaming conflicts in multi-agent scenarios. Each chunk sent to client has an index:

  • Main thread uses indices 0, 1, 2, 3...
  • Sub-agent starts where parent left off
  • Prevents race conditions where chunks arrive out of order

Agent Configuration System

Location: src/server/endpoints/unified/orchestrator/configs/

TypeScript
type TAgentConfig = {
	shouldContinueLoop: (state) => boolean;
	shouldSendTools: (state, tools, schema) => boolean;
	processToolCalls?: (calls, state, context) => Promise<boolean>;
	onToolsSelected?: (calls, state, context) => Promise<void>;
	onToolProgress?: (name, event, context) => void;
	onNoToolCalls?: (
		state,
		context,
		reason,
	) => Promise<{ shouldContinue: boolean }>;
	handleSpecialTools?: (name, result, call, context) => SpecialResult;
	getModelId?: (state, config) => string;
	useParallelTools: boolean;
	enableGlobalContextUpdate: boolean;
	getDataPolicy: () => {
		shouldStoreText;
		shouldStoreToolResults;
		shouldStoreToolCalls;
	};
};

Three Built-in Agents:

  1. MainThreadAgent (mainThread.config.ts)

    • Standard chat
    • 8-15 iterations (plan-based)
    • Parallel tools enabled
    • Full data storage
  2. DeepResearchSupervisor (deepResearch.config.ts)

    • Complex research orchestration
    • Time-limited (e.g., 5 minutes)
    • Sequential execution
    • Custom tool filtering
  3. ResearcherAgent (researcher.config.ts)

    • Individual research tasks
    • Spawned by supervisor
    • Tool-specific focus

Parallel Tool Execution

File: src/server/endpoints/unified/orchestrator/toolOrchestrator.ts:929

TypeScript
private async *executeRequestedTools(
    calls: ToolCall[],
    currentContentIndex: number,
    agentContext: TAgentContext,
): AsyncIterable<TStreamChunk> {
    // Prevent index regression in multi-agent
    const safeToolIndex = validateAndSyncStreamingIndex(currentContentIndex);

    // Initialize progress tracking
    const progress = EventManager.init(safeToolIndex, "V2");
    yield { type: "tool:progress", progress };

    // Create parallel tasks
    const tasks = calls.map((call, index) =>
        this.runSingleTool(
            call.function.name,
            call.id,
            call.function.arguments,
            progress,
            agentContext,
            index,
        ),
    );

    // Execute concurrently, yield as each completes
    for await (const work of tasks) {
        for await (const chunk of work) yield chunk;
    }
}

Tools run in parallel, results stream as they arrive. No waiting for slow tools.


Individual Tool Lifecycle

File: src/server/endpoints/unified/orchestrator/toolOrchestrator.ts:962

TypeScript
private runSingleTool(name, id, args, progress, agentContext, index) {
    const tool = this.registry.get(name);
    if (!tool) throw new Error(`planner asked for unknown tool ${name}`);

    return async function* (): AsyncIterable<TStreamChunk> {
        yield { type: "tool:start", id, function: { name }, progress };

        // Check if tool enabled
        if (tool.shouldUse && !(await tool.shouldUse(ctx))) {
            yield { type: "tool:error", id, function: { name }, error: "TOOL_NOT_ENABLED" };
            return;
        }

        try {
            // Parse/preprocess arguments
            const parsedArgs = this.agentConfig.preprocessToolArguments
                ? await this.agentConfig.preprocessToolArguments({...}, ctx, this.toolMetadata, agentContext)
                : JSON.parse(args);

            this.currentExecutingTool = { toolCallId: id, functionName: name, toolIndex: index };
            const toolContext = createToolContext(this);

            // Execute
            const resultOrStream = await tool.execute(parsedArgs, ctx, progress, toolContext);
            let finalResult;

            if (isAsyncIterable(resultOrStream)) {
                // Streaming tool (sub-agent)
                const iterator = resultOrStream[Symbol.asyncIterator]();
                while (true) {
                    const { value, done } = await iterator.next();
                    if (done) { finalResult = value; break; }
                    if ("stream" in value) {
                        yield { type: "tool:stream", id, function: { name }, content: { stream: value.stream, chatResponse: value.chatResponse } };
                    }
                }
            } else {
                finalResult = resultOrStream;
            }

            yield { type: "tool:done", id, function: { name }, result: finalResult };
        } catch (e) {
            logger.warn({ error: e, args, name, id }, `WARN/TOOL/RUN-SINGLE-TOOL`);
            yield { type: "tool:error", id, function: { name }, error: e.message };
        }
    }.bind(this)();
}

Tool Result Types:

  • tool:start - Execution begins
  • tool:stream - Streaming partial results (sub-agents)
  • tool:done - Success with final result
  • tool:error - Failed (caught gracefully)
  • tool:progress - Overall batch tracking

Multi-Agent Coordination

Plain text
Main Thread Agent
       │
       ├─ Spawns ─→ DeepResearchSupervisor (up to 50 iterations)
                          │
                          ├─ Spawns ─→ ResearcherAgent (individual tasks)
                          │
                          └─ Spawns ─→ More sub-agents as needed

Key: Each has own ToolRegistry, state, and streaming index

Sub-Agent Spawning Example:

TypeScript
const deepResearchOrchestrator = new ToolOrchestrator(
    subChatContext,
    AGENT_NAMES.DEEP_RESEARCH_SUPERVISOR,
    deepResearchToolRegistry,  // Different tools available
    deepResearchSupervisorConfig,
    50,  // Custom limit: 50 tool calls
);

const result = await deepResearchOrchestrator.run({...});

// Merge sub-agent results back to parent
if (result.toolMetadata) {
    Object.assign(this.currentToolMetadata, result.toolMetadata);
}

Integration Points

  1. Engine (src/server/repositories/engine/engine.ts)

    • Context trimming after each iteration
    • Layout selection algorithm
    • Token counting
  2. Streamer (src/server/repositories/streamer/streamer.ts)

    • SSE streaming to client
    • EventManager integration
    • Index synchronization
  3. Provider (src/server/repositories/provider/provider.ts)

    • LLM API calls via Rune
    • Token cost calculation
    • Model selection
  4. ToolRegistry (src/server/endpoints/unified/tools/toolRegistry.ts)

    • Tool availability per agent
    • Dynamic tool loading

Summary

The orchestration system:

  • Iterative reasoning: Multi-step tool calling
  • Multi-agent: Different agents for different tasks
  • Parallel execution: Tools run concurrently
  • Streaming-first: Real-time progress
  • Resilient: Errors contained, not catastrophic
  • Token-aware: Context trimming prevents overflow

Key insight: This isn't single-pass Q&A. It's an iterative reasoning engine where the LLM can take multiple actions, see results, and continue thinking - just like a human problem-solving approach.