InkdownInkdown
Start writing

Study

59 filesยท8 subfolders

Shared Workspace

Study
core

02_RUNNER_SYSTEM

Shared from "Study" on Inkdown

Runner System - Comprehensive Deep Dive

Overview

The Runner system is the execution engine of the OpenAI Agents SDK. It's responsible for orchestrating agent runs, managing the lifecycle of agent execution, handling tool execution, coordinating handoffs, managing sessions, and ensuring proper error handling. Think of the Runner as the "director" that brings together all the components (agents, tools, guardrails, etc.) and makes them work together in a coordinated way.

Core Classes

Runner

Runner is the main entry point for executing agents. It provides both async and sync interfaces for running agents.

Location: src/agents/run.py

programming-language-concepts.md
zero-language-explanation.md
DB
01-introduction.md
02-relational-databases.md
03-database-design.md
04-indexing.md
05-transactions-acid.md
06-nosql-databases.md
07-query-optimization.md
08-replication-ha.md
09-sharding-partitioning.md
10-caching-strategies.md
11-cap-theorem.md
12-connection-pooling.md
13-backup-recovery.md
14-monitoring.md
15-database-selection.md
README.md
JS
Event loop
Merlin Backend
01-Orchestration.md
02-DeepResearch.md
03-Search.md
04-Scraping.md
05-Streaming.md
06-MultiProviderLLM.md
07-MemoryAndContext.md
08-ErrorHandling.md
09-RateLimiting.md
10-TaskQueue.md
11-SecurityAndAuth.md
Orchestration-2nd-draft
OpenAI Agents Python
00_OVERVIEW.md
01_AGENT_SYSTEM.md
02_RUNNER_SYSTEM.md
03_TOOL_SYSTEM.md
04_ITEMS_SYSTEM.md
05_GUARDRAILS.md
06_HANDOFFS.md
07_MEMORY_SESSIONS.md
08_MODEL_PROVIDERS.md
09_SANDBOX_SYSTEM.md
10_TRACING.md
11_RUN_STATE.md
12_CONTEXT.md
13_LIFECYCLE_HOOKS.md
14_CONFIGURATION.md
15_ERROR_HANDLING.md
16_STREAMING.md
17_EXTENSIONS.md
18_MCP_INTEGRATION.md
19_BEST_PRACTICES.md
20_ARCHITECTURE_PATTERNS.md
opencode-study
context-handling
core
Python
Alembic
Basics
sqlalchemy - fastapi
SQLAlchemy overview
tweets
system_design_for_agentic_apps.md

Key Methods:

  • async def run(starting_agent, input, *, context, ...) - Main async method to run an agent
  • def run_sync(starting_agent, input, *, context, ...) - Sync wrapper around run()
  • async def run_streamed(starting_agent, input, *, context, ...) - Run with streaming

Parameters:

  • starting_agent: Agent[TContext] - The agent to start with
  • input: str | list[TResponseInputItem] | RunState - Input to the agent
  • context: TContext | None - User-provided context object
  • max_turns: int - Maximum number of turns (default: 10)
  • hooks: RunHooks | None - Lifecycle hooks for the run
  • run_config: RunConfig | None - Configuration for the run
  • session: Session | None - Session for conversation persistence
  • conversation_id: str | None - ID for server-managed conversations
  • previous_response_id: str | None - ID of previous response for chaining
  • auto_previous_response_id: bool - Enable automatic response chaining
  • error_handlers: RunErrorHandlers | None - Custom error handlers
AgentRunner

AgentRunner is the internal implementation class that does the actual work. The Runner class is a thin wrapper around AgentRunner that provides a simpler public API.

Location: src/agents/run.py

Key Responsibilities:

  • Turn management (tracking which turn we're on)
  • Tool execution coordination
  • Handoff delegation
  • Session persistence
  • Error handling and recovery
  • Streaming event emission
  • Tracing integration

Execution Flow

1. Initialization

When you call Runner.run(), the following initialization happens:

Python
result = await Runner.run(
    agent,
    "Hello, world!",
    context=my_context,
    max_turns=5,
)

Steps:

  1. Context Wrapper Creation - A RunContextWrapper is created to wrap your context object. This wrapper provides:

    • Approval management (for human-in-the-loop)
    • Usage tracking
    • Tool state management
    • Access to run configuration
  2. Agent Binding - The agent is "bound" to the run. This creates an AgentBindings object that:

    • Resolves the model to use (from agent, run config, or default)
    • Resolves model settings (merged from agent and run config)
    • Prepares the agent for execution
  3. Session Preparation - If a session is provided, the conversation history is loaded and prepared. The session's items are combined with the new input.

  4. Trace Creation - A trace is created for observability. This trace will:

    • Track all events during the run
    • Record timing information
    • Capture inputs and outputs (unless tracing is disabled)
  5. Sandbox Setup - If sandbox configuration is provided, the sandbox session is initialized.

2. Turn Execution

A "turn" is one complete cycle of:

  • Input preparation
  • Model call
  • Tool execution (if needed)
  • Output processing

The Runner manages multiple turns until:

  • The agent produces a final output
  • Max turns is exceeded
  • An error occurs
  • A guardrail tripwire is triggered

Turn Lifecycle:

Python
# Pseudocode of turn execution
for turn in range(max_turns):
    # 1. Prepare input
    input_items = prepare_input(current_state, new_input)
    
    # 2. Run input guardrails (first turn only, starting agent only)
    if turn == 0 and is_starting_agent:
        guardrail_results = await run_input_guardrails(input_items)
        if any guardrail.tripwire_triggered:
            raise InputGuardrailTripwireTriggered
    
    # 3. Call the model
    response = await model.get_response(
        instructions=agent.instructions,
        input=input_items,
        tools=available_tools,
        ...
    )
    
    # 4. Process the response
    processed = process_response(response)
    
    # 5. Handle tool calls
    if processed.tool_calls:
        tool_results = await execute_tools(processed.tool_calls)
        # Add tool results to state for next turn
        state.add_tool_results(tool_results)
        continue to next turn
    
    # 6. Handle handoffs
    if processed.handoff:
        next_agent = processed.handoff.target_agent
        switch_to_agent(next_agent)
        continue to next turn
    
    # 7. Final output
    if processed.final_output:
        # Run output guardrails
        guardrail_results = await run_output_guardrails(processed.final_output)
        if any guardrail.tripwire_triggered:
            raise OutputGuardrailTripwireTriggered
        
        return RunResult(final_output=processed.final_output, ...)
3. Input Preparation

Input preparation converts the user's input into the format expected by the model:

Input Types:

  1. String input - Simple text input

    Python
    await Runner.run(agent, "Hello")
    # Converted to: [{"type": "user", "content": "Hello"}]
  2. List input - Structured input with multiple items

    Python
    await Runner.run(agent, [
        {"type": "user", "content": "Hello"},
        {"type": "user", "content": {"type": "image_url", "image_url": "..."}}
    ])
  3. RunState input - Resume from a paused state

    Python
    state = previous_run.to_state()
    await Runner.run(agent, state)  # Resumes from where it left off

Session Integration:

If a session is provided, the input preparation:

  1. Loads conversation history from the session
  2. Applies the session's input callback (if configured)
  3. Combines history with new input
  4. Respects session limits (e.g., max items to retrieve)
4. Model Call

The Runner coordinates the model call through the model provider:

Python
model_response = await model.get_response(
    system_instructions=agent.instructions,
    input=input_items,
    model_settings=resolved_model_settings,
    tools=available_tools,
    output_schema=agent.output_type,
    handoffs=agent.handoffs,
    tracing=tracing_config,
    previous_response_id=previous_response_id,
    conversation_id=conversation_id,
    prompt=agent.prompt,
)

Model Resolution:

The model is resolved in this priority order:

  1. RunConfig.model (if set)
  2. Agent.model (if set)
  3. Default model (gpt-4.1)

Settings Merging:

Model settings are merged:

  1. Start with agent.model_settings
  2. Override with run_config.model_settings (if provided)
  3. Apply any provider-specific defaults

Hooks:

Before and after the model call, lifecycle hooks are invoked:

  • on_llm_start - Before the call
  • on_llm_end - After the call
5. Response Processing

The model's response is processed to extract:

  1. Message content - Text output from the model
  2. Tool calls - Requests to call tools
  3. Handoff calls - Requests to hand off to another agent
  4. Reasoning content - Model's reasoning (for reasoning models)
  5. Refusals - Model's refusal to respond

ProcessedResponse Structure:

Python
@dataclass
class ProcessedResponse:
    content: str | None
    tool_calls: list[ToolCall]
    handoff: Handoff | None
    reasoning: ReasoningItem | None
    refusal: str | None
    raw_response: ModelResponse
6. Tool Execution

When the model requests tool calls, the Runner coordinates their execution:

Tool Execution Flow:

Python
for tool_call in processed_response.tool_calls:
    # 1. Find the tool
    tool = find_tool(tool_call.name)
    
    # 2. Check if approval is needed
    if tool.needs_approval:
        approval = await request_approval(tool_call)
        if not approval:
            record_rejection(tool_call)
            continue
    
    # 3. Run tool guardrails (if configured)
    guardrail_result = await run_tool_input_guardrail(tool, tool_call.arguments)
    if guardrail_result.tripwire_triggered:
        handle_guardrail_tripwire(guardrail_result)
        continue
    
    # 4. Execute the tool
    try:
        result = await tool.execute(tool_call.arguments, context)
    except Exception as e:
        result = handle_tool_error(e, tool)
    
    # 5. Run tool output guardrails (if configured)
    guardrail_result = await run_tool_output_guardrail(tool, result)
    if guardrail_result.tripwire_triggered:
        handle_guardrail_tripwire(guardrail_result)
        result = guardrail_result.output_info
    
    # 6. Record the result
    record_tool_output(tool_call, result)

Parallel Execution:

If multiple tools are called and they don't depend on each other, they can be executed in parallel for efficiency.

Tool Use Behavior:

Based on the agent's tool_use_behavior setting:

  • "run_llm_again" - Tool results are fed back to the model for another turn
  • "stop_on_first_tool" - First tool result is the final output
  • StopAtTools - Stop if specific tools are called
  • Custom function - Custom logic to determine if tool results are final
7. Handoff Execution

When the model requests a handoff:

Python
if processed_response.handoff:
    handoff = processed_response.handoff
    
    # 1. Invoke the handoff
    next_agent = await handoff.on_invoke_handoff(context, handoff_arguments)
    
    # 2. Apply input filter (if configured)
    if handoff.input_filter:
        handoff_input = build_handoff_input(current_state, handoff)
        filtered_input = await handoff.input_filter(handoff_input)
    else:
        filtered_input = default_handoff_input(current_state, handoff)
    
    # 3. Switch to the new agent
    current_agent = next_agent
    
    # 4. Continue with next turn
    continue

Handoff History Management:

Based on nest_handoff_history setting:

  • False (default) - Full conversation history is passed to the next agent
  • True - History is collapsed into a single summary message
  • Custom mapper - Custom function to transform history
8. Output Guardrails

When the agent produces a final output:

Python
if processed_response.final_output:
    # Run output guardrails
    guardrail_results = []
    for guardrail in agent.output_guardrails + (run_config.output_guardrails or []):
        result = await guardrail.run(context, agent, processed_response.final_output)
        guardrail_results.append(result)
        
        if result.output.tripwire_triggered:
            raise OutputGuardrailTripwireTriggered(
                guardrail_result=result,
                output=processed_response.final_output,
            )
    
    # Return the result
    return RunResult(
        final_output=processed_response.final_output,
        output_guardrail_results=guardrail_results,
        ...
    )
9. Session Persistence

After each turn, if a session is configured:

Python
# Save the turn's items to the session
await session.save_items(
    conversation_id=conversation_id,
    items=current_turn_items,
)

Session Compaction:

For OpenAI Responses API, the SDK supports intelligent compaction:

  • Older items can be collapsed into summaries
  • Reduces token usage while preserving context
  • Configured via SessionSettings
10. Result Return

The Runner returns a RunResult (or RunResultStreaming for streamed runs):

Python
@dataclass
class RunResult:
    final_output: Any
    new_items: list[RunItem]
    raw_responses: list[ModelResponse]
    last_agent: Agent
    input_guardrail_results: list[InputGuardrailResult]
    output_guardrail_results: list[OutputGuardrailResult]
    tool_input_guardrail_results: list[ToolInputGuardrailResult]
    tool_output_guardrail_results: list[ToolOutputGuardrailResult]
    trace: Trace | None
    usage: Usage
    interruptions: list[ToolApprovalItem]
    ...

Streaming

Streamed Execution

For real-time updates, use run_streamed:

Python
async for event in Runner.run_streamed(agent, input):
    if isinstance(event, RunItemStreamEvent):
        print(f"Item: {event.item}")
    elif isinstance(event, AgentUpdatedStreamEvent):
        print(f"Agent updated: {event.agent.name}")
    elif isinstance(event, RawResponsesStreamEvent):
        print(f"Raw event: {event.event}")

Streaming Events:

  1. RunItemStreamEvent - Emitted when a new run item is created
  2. AgentUpdatedStreamEvent - Emitted when the current agent changes
  3. RawResponsesStreamEvent - Emitted for raw model stream events

Streaming Flow:

The streaming path mirrors the non-streaming path but yields events as they happen:

  • Model response chunks are yielded as they arrive
  • Tool call events are yielded when tools are called
  • Tool output events are yielded when tools complete
  • Final result is yielded at the end

Error Handling

Error Handlers

The Runner supports custom error handlers:

Python
from agents import RunErrorHandlers

error_handlers = RunErrorHandlers(
    max_turns=lambda ctx, error: "Custom max turns message"
)

result = await Runner.run(
    agent,
    input,
    error_handlers=error_handlers,
)
Error Types
  1. MaxTurnsExceeded - Agent exceeded max turns without producing final output
  2. InputGuardrailTripwireTriggered - Input guardrail blocked execution
  3. OutputGuardrailTripwireTriggered - Output guardrail blocked execution
  4. ToolTimeoutError - Tool execution timed out
  5. ModelBehaviorError - Model behaved unexpectedly
  6. UserError - User configuration error
  7. AgentsException - Base SDK exception
Error Recovery

Some errors can be recovered from:

Python
try:
    result = await Runner.run(agent, input)
except MaxTurnsExceeded as e:
    # Can resume with increased max_turns
    state = e.run_state
    result = await Runner.run(agent, state, max_turns=20)
except InputGuardrailTripwireTriggered as e:
    # Can retry with modified input
    result = await Runner.run(agent, modified_input)

Run Configuration

RunConfig

RunConfig provides configuration for the entire run:

Python
from agents import RunConfig, ModelSettings

config = RunConfig(
    model="gpt-4o",
    model_settings=ModelSettings(temperature=0.7),
    max_turns=20,
    tracing_disabled=False,
    workflow_name="My workflow",
)

result = await Runner.run(agent, input, run_config=config)

Key Settings:

  • model - Override model for all agents
  • model_provider - Custom model provider
  • model_settings - Global model settings
  • handoff_input_filter - Global filter for all handoffs
  • nest_handoff_history - Enable nested handoff history
  • input_guardrails - Global input guardrails
  • output_guardrails - Global output guardrails
  • tracing_disabled - Disable tracing
  • workflow_name - Name for tracing
  • trace_id - Custom trace ID
  • group_id - Group ID for linking traces
  • session_settings - Session configuration
  • call_model_input_filter - Filter model input before calling
Configuration Priority

Settings are applied in this priority (highest to lowest):

  1. RunConfig settings
  2. Agent settings
  3. Global defaults

Lifecycle Hooks

Run Hooks

Run hooks allow you to hook into the execution lifecycle:

Python
from agents import RunHooks

class MyRunHooks(RunHooks):
    async def on_llm_start(self, context, agent, system_prompt, input_items):
        print(f"LLM call starting for {agent.name}")
    
    async def on_llm_end(self, context, agent, response):
        print(f"LLM call completed for {agent.name}")
    
    async def on_agent_start(self, context, agent):
        print(f"Agent {agent.name} starting")
    
    async def on_agent_end(self, context, agent, output):
        print(f"Agent {agent.name} finished")
    
    async def on_handoff(self, context, from_agent, to_agent):
        print(f"Handoff from {from_agent.name} to {to_agent.name}")
    
    async def on_tool_start(self, context, agent, tool):
        print(f"Tool {tool.name} starting")
    
    async def on_tool_end(self, context, agent, tool, result):
        print(f"Tool {tool.name} finished")

result = await Runner.run(agent, input, hooks=MyRunHooks())

Server-Managed Conversations

The Runner supports OpenAI's server-managed conversations:

Python
result = await Runner.run(
    agent,
    input,
    conversation_id="my-conversation-id",
    previous_response_id="prev-response-id",
)

Benefits:

  • Automatic conversation history management on OpenAI servers
  • Reduced local storage needs
  • Better prompt caching
  • Improved performance

How it works:

  1. The Runner uses OpenAIServerConversationTracker to track conversation
  2. Only deltas (new items) are sent to the server
  3. The server maintains full conversation history
  4. Session persistence is disabled when using server-managed conversations

Human-in-the-Loop

Approval Workflow

Tools can require human approval:

Python
@function_tool(needs_approval=True)
def sensitive_operation() -> str:
    return "Performed sensitive operation"

Approval Flow:

  1. Tool is called
  2. Runner pauses execution
  3. Returns RunState with interruption
  4. Human reviews and approves/rejects
  5. Run is resumed with approval decision
Python
# First run - pauses for approval
result = await Runner.run(agent, input)
if result.interruptions:
    state = result.to_state()
    
    # Human reviews
    for interruption in result.interruptions:
        if should_approve(interruption):
            state.context.approve_tool(interruption)
        else:
            state.context.reject_tool(interruption)
    
    # Resume
    result = await Runner.run(agent, state)

Tracing Integration

Trace Creation

The Runner automatically creates traces for observability:

Python
result = await Runner.run(agent, input)
print(result.trace)  # Access the trace

Trace Structure:

  • Trace - The entire run trace
  • Span - Individual operations (LLM call, tool execution, etc.)
  • SpanData - Data associated with spans

Trace Configuration:

Python
config = RunConfig(
    tracing_disabled=False,
    trace_include_sensitive_data=True,  # Include tool inputs/outputs
    workflow_name="My workflow",
    trace_id="custom-id",
    group_id="conversation-123",
    trace_metadata={"user_id": "123"},
)

Usage Tracking

Token Usage

The Runner tracks token usage:

Python
result = await Runner.run(agent, input)
print(result.usage)
# Usage(request_tokens=100, response_tokens=50, total_tokens=150)

Usage Breakdown:

  • request_tokens - Tokens sent to the model
  • response_tokens - Tokens received from the model
  • total_tokens - Sum of request and response

Per-Turn Usage:

Usage is tracked per turn and aggregated:

Python
for response in result.raw_responses:
    print(f"Turn usage: {response.usage}")

Performance Considerations

Async Execution

The Runner is async-first for performance:

Python
# Good - async
result = await Runner.run(agent, input)

# Avoid - sync wrapper (for production use)
result = Runner.run_sync(agent, input)
Parallel Tool Execution

When tools don't depend on each other, they run in parallel:

Python
# These will execute in parallel
@function_tool
def tool1() -> str:
    time.sleep(1)
    return "tool1"

@function_tool
def tool2() -> str:
    time.sleep(1)
    return "tool2"

agent = Agent(tools=[tool1, tool2])
Session Compaction

For long conversations, use session compaction:

Python
from agents import SessionSettings

settings = SessionSettings(
    max_items=50,  # Only keep last 50 items
)

result = await Runner.run(agent, input, session_settings=settings)

Best Practices

1. Use Async

Always use the async interface for production:

Python
# Good
result = await Runner.run(agent, input)

# Avoid in production
result = Runner.run_sync(agent, input)
2. Set Reasonable Max Turns

Prevent infinite loops:

Python
# Good
result = await Runner.run(agent, input, max_turns=10)

# Avoid - could run forever
result = await Runner.run(agent, input)
3. Use Sessions for Long Conversations

Maintain conversation state:

Python
session = SQLiteSession(db_path="conversations.db")
result = await Runner.run(agent, input, session=session)
4. Enable Tracing for Debugging

Get visibility into execution:

Python
config = RunConfig(
    tracing_disabled=False,
    trace_include_sensitive_data=False,  # Exclude sensitive data
)
result = await Runner.run(agent, input, run_config=config)
5. Handle Errors Gracefully

Provide good error handling:

Python
try:
    result = await Runner.run(agent, input)
except MaxTurnsExceeded:
    # Handle gracefully
    return "I need more information to complete this task."
except InputGuardrailTripwireTriggered:
    # Handle gracefully
    return "I cannot process that request."

Common Patterns

1. Multi-Agent Workflow
Python
coordinator = Agent(
    name="coordinator",
    instructions="Coordinate tasks",
    handoffs=[handoff(agent1), handoff(agent2)],
)

result = await Runner.run(coordinator, input)
2. Streaming Response
Python
async for event in Runner.run_streamed(agent, input):
    if isinstance(event, RunItemStreamEvent):
        if isinstance(event.item, MessageOutputItem):
            print(event.item.content[0].text)
3. Resume from State
Python
# First run - pauses
result1 = await Runner.run(agent, input)
state = result1.to_state()

# ... human intervention ...

# Resume
result2 = await Runner.run(agent, state)
4. Custom Error Handling
Python
error_handlers = RunErrorHandlers(
    max_turns=lambda ctx, error: "Let me try a different approach.",
)

result = await Runner.run(agent, input, error_handlers=error_handlers)

Summary

The Runner system is the execution engine that orchestrates everything. Key takeaways:

  1. Runner is the public API for running agents
  2. AgentRunner is the internal implementation
  3. Turns are cycles of model call โ†’ tool execution โ†’ output
  4. Input preparation converts user input to model format
  5. Model calls are coordinated through model providers
  6. Response processing extracts content, tool calls, handoffs
  7. Tool execution can be parallel for efficiency
  8. Handoffs switch to different agents
  9. Guardrails validate input and output
  10. Sessions maintain conversation history
  11. Streaming provides real-time updates
  12. Error handlers customize error behavior
  13. Lifecycle hooks allow custom logic
  14. Tracing provides observability
  15. Usage tracking monitors token consumption
  16. Human-in-the-loop enables approval workflows
  17. Server-managed conversations offload history to OpenAI

Understanding the Runner system is essential for building robust agent workflows.