The Runner system is the execution engine of the OpenAI Agents SDK. It's responsible for orchestrating agent runs, managing the lifecycle of agent execution, handling tool execution, coordinating handoffs, managing sessions, and ensuring proper error handling. Think of the Runner as the "director" that brings together all the components (agents, tools, guardrails, etc.) and makes them work together in a coordinated way.
Core Classes
Runner
Runner is the main entry point for executing agents. It provides both async and sync interfaces for running agents.
AgentRunner is the internal implementation class that does the actual work. The Runner class is a thin wrapper around AgentRunner that provides a simpler public API.
Location:src/agents/run.py
Key Responsibilities:
Turn management (tracking which turn we're on)
Tool execution coordination
Handoff delegation
Session persistence
Error handling and recovery
Streaming event emission
Tracing integration
Execution Flow
1. Initialization
When you call Runner.run(), the following initialization happens:
Python
result = await Runner.run(
agent,
"Hello, world!",
context=my_context,
max_turns=5,
)
Steps:
Context Wrapper Creation - A RunContextWrapper is created to wrap your context object. This wrapper provides:
Approval management (for human-in-the-loop)
Usage tracking
Tool state management
Access to run configuration
Agent Binding - The agent is "bound" to the run. This creates an AgentBindings object that:
Resolves the model to use (from agent, run config, or default)
Resolves model settings (merged from agent and run config)
Prepares the agent for execution
Session Preparation - If a session is provided, the conversation history is loaded and prepared. The session's items are combined with the new input.
Trace Creation - A trace is created for observability. This trace will:
Track all events during the run
Record timing information
Capture inputs and outputs (unless tracing is disabled)
Sandbox Setup - If sandbox configuration is provided, the sandbox session is initialized.
2. Turn Execution
A "turn" is one complete cycle of:
Input preparation
Model call
Tool execution (if needed)
Output processing
The Runner manages multiple turns until:
The agent produces a final output
Max turns is exceeded
An error occurs
A guardrail tripwire is triggered
Turn Lifecycle:
Python
# Pseudocode of turn executionfor turn inrange(max_turns):
# 1. Prepare input
input_items = prepare_input(current_state, new_input)
# 2. Run input guardrails (first turn only, starting agent only)if turn == 0and is_starting_agent:
guardrail_results = await run_input_guardrails(input_items)
ifany guardrail.tripwire_triggered:
raise InputGuardrailTripwireTriggered
# 3. Call the model
response = await model.get_response(
instructions=agent.instructions,
input=input_items,
tools=available_tools,
...
)
# 4. Process the response
processed = process_response(response)
# 5. Handle tool callsif processed.tool_calls:
tool_results = await execute_tools(processed.tool_calls)
# Add tool results to state for next turn
state.add_tool_results(tool_results)
continue to next turn
# 6. Handle handoffsif processed.handoff:
next_agent = processed.handoff.target_agent
switch_to_agent(next_agent)
continue to next turn
# 7. Final outputif processed.final_output:
# Run output guardrails
guardrail_results = await run_output_guardrails(processed.final_output)
ifany guardrail.tripwire_triggered:
raise OutputGuardrailTripwireTriggered
return RunResult(final_output=processed.final_output, ...)
3. Input Preparation
Input preparation converts the user's input into the format expected by the model:
Input Types:
String input - Simple text input
Python
await Runner.run(agent, "Hello")
# Converted to: [{"type": "user", "content": "Hello"}]
When the model requests tool calls, the Runner coordinates their execution:
Tool Execution Flow:
Python
for tool_call in processed_response.tool_calls:
# 1. Find the tool
tool = find_tool(tool_call.name)
# 2. Check if approval is neededif tool.needs_approval:
approval = await request_approval(tool_call)
ifnot approval:
record_rejection(tool_call)
continue# 3. Run tool guardrails (if configured)
guardrail_result = await run_tool_input_guardrail(tool, tool_call.arguments)
if guardrail_result.tripwire_triggered:
handle_guardrail_tripwire(guardrail_result)
continue# 4. Execute the tooltry:
result = await tool.execute(tool_call.arguments, context)
except Exception as e:
result = handle_tool_error(e, tool)
# 5. Run tool output guardrails (if configured)
guardrail_result = await run_tool_output_guardrail(tool, result)
if guardrail_result.tripwire_triggered:
handle_guardrail_tripwire(guardrail_result)
result = guardrail_result.output_info
# 6. Record the result
record_tool_output(tool_call, result)
Parallel Execution:
If multiple tools are called and they don't depend on each other, they can be executed in parallel for efficiency.
Tool Use Behavior:
Based on the agent's tool_use_behavior setting:
"run_llm_again" - Tool results are fed back to the model for another turn
"stop_on_first_tool" - First tool result is the final output
StopAtTools - Stop if specific tools are called
Custom function - Custom logic to determine if tool results are final
7. Handoff Execution
When the model requests a handoff:
Python
if processed_response.handoff:
handoff = processed_response.handoff
# 1. Invoke the handoff
next_agent = await handoff.on_invoke_handoff(context, handoff_arguments)
# 2. Apply input filter (if configured)if handoff.input_filter:
handoff_input = build_handoff_input(current_state, handoff)
filtered_input = await handoff.input_filter(handoff_input)
else:
filtered_input = default_handoff_input(current_state, handoff)
# 3. Switch to the new agent
current_agent = next_agent
# 4. Continue with next turncontinue
Handoff History Management:
Based on nest_handoff_history setting:
False (default) - Full conversation history is passed to the next agent
True - History is collapsed into a single summary message
Custom mapper - Custom function to transform history
8. Output Guardrails
When the agent produces a final output:
Python
if processed_response.final_output:
# Run output guardrails
guardrail_results = []
for guardrail in agent.output_guardrails + (run_config.output_guardrails or []):
result = await guardrail.run(context, agent, processed_response.final_output)
guardrail_results.append(result)
if result.output.tripwire_triggered:
raise OutputGuardrailTripwireTriggered(
guardrail_result=result,
output=processed_response.final_output,
)
# Return the resultreturn RunResult(
final_output=processed_response.final_output,
output_guardrail_results=guardrail_results,
...
)
9. Session Persistence
After each turn, if a session is configured:
Python
# Save the turn's items to the sessionawait session.save_items(
conversation_id=conversation_id,
items=current_turn_items,
)
Session Compaction:
For OpenAI Responses API, the SDK supports intelligent compaction:
Older items can be collapsed into summaries
Reduces token usage while preserving context
Configured via SessionSettings
10. Result Return
The Runner returns a RunResult (or RunResultStreaming for streamed runs):
try:
result = await Runner.run(agent, input)
except MaxTurnsExceeded as e:
# Can resume with increased max_turns
state = e.run_state
result = await Runner.run(agent, state, max_turns=20)
except InputGuardrailTripwireTriggered as e:
# Can retry with modified input
result = await Runner.run(agent, modified_input)
Run Configuration
RunConfig
RunConfig provides configuration for the entire run:
# First run - pauses for approval
result = await Runner.run(agent, input)
if result.interruptions:
state = result.to_state()
# Human reviewsfor interruption in result.interruptions:
if should_approve(interruption):
state.context.approve_tool(interruption)
else:
state.context.reject_tool(interruption)
# Resume
result = await Runner.run(agent, state)
Tracing Integration
Trace Creation
The Runner automatically creates traces for observability:
Python
result = await Runner.run(agent, input)
print(result.trace) # Access the trace
result = await Runner.run(agent, input)
print(result.usage)
# Usage(request_tokens=100, response_tokens=50, total_tokens=150)
Usage Breakdown:
request_tokens - Tokens sent to the model
response_tokens - Tokens received from the model
total_tokens - Sum of request and response
Per-Turn Usage:
Usage is tracked per turn and aggregated:
Python
for response in result.raw_responses:
print(f"Turn usage: {response.usage}")
Performance Considerations
Async Execution
The Runner is async-first for performance:
Python
# Good - async
result = await Runner.run(agent, input)
# Avoid - sync wrapper (for production use)
result = Runner.run_sync(agent, input)
Parallel Tool Execution
When tools don't depend on each other, they run in parallel:
Python
# These will execute in parallel@function_tooldeftool1() -> str:
time.sleep(1)
return"tool1"@function_tooldeftool2() -> str:
time.sleep(1)
return"tool2"
agent = Agent(tools=[tool1, tool2])
Session Compaction
For long conversations, use session compaction:
Python
from agents import SessionSettings
settings = SessionSettings(
max_items=50, # Only keep last 50 items
)
result = await Runner.run(agent, input, session_settings=settings)
Best Practices
1. Use Async
Always use the async interface for production:
Python
# Good
result = await Runner.run(agent, input)
# Avoid in production
result = Runner.run_sync(agent, input)
2. Set Reasonable Max Turns
Prevent infinite loops:
Python
# Good
result = await Runner.run(agent, input, max_turns=10)
# Avoid - could run forever
result = await Runner.run(agent, input)
3. Use Sessions for Long Conversations
Maintain conversation state:
Python
session = SQLiteSession(db_path="conversations.db")
result = await Runner.run(agent, input, session=session)
4. Enable Tracing for Debugging
Get visibility into execution:
Python
config = RunConfig(
tracing_disabled=False,
trace_include_sensitive_data=False, # Exclude sensitive data
)
result = await Runner.run(agent, input, run_config=config)
5. Handle Errors Gracefully
Provide good error handling:
Python
try:
result = await Runner.run(agent, input)
except MaxTurnsExceeded:
# Handle gracefullyreturn"I need more information to complete this task."except InputGuardrailTripwireTriggered:
# Handle gracefullyreturn"I cannot process that request."
asyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, RunItemStreamEvent):
ifisinstance(event.item, MessageOutputItem):
print(event.item.content[0].text)
3. Resume from State
Python
# First run - pauses
result1 = await Runner.run(agent, input)
state = result1.to_state()
# ... human intervention ...# Resume
result2 = await Runner.run(agent, state)
4. Custom Error Handling
Python
error_handlers = RunErrorHandlers(
max_turns=lambda ctx, error: "Let me try a different approach.",
)
result = await Runner.run(agent, input, error_handlers=error_handlers)
Summary
The Runner system is the execution engine that orchestrates everything. Key takeaways:
Runner is the public API for running agents
AgentRunner is the internal implementation
Turns are cycles of model call โ tool execution โ output
Input preparation converts user input to model format
Model calls are coordinated through model providers