Streaming in the OpenAI Agents SDK enables real-time delivery of agent execution events. Think of Streaming as "live updates" or "progressive rendering" - instead of waiting for the entire agent run to complete before seeing results, you receive events as they happen. This is essential for responsive user interfaces, real-time feedback, and long-running operations.
Core Concepts
What is Streaming?
Streaming is the process of delivering events incrementally as they occur during agent execution:
Real-time updates - Receive events as they happen
Progressive output - Show results as they're generated
Event-driven - React to specific events
Non-blocking - Don't wait for completion
Why Streaming Matters
Responsiveness - Provide immediate feedback to users
@dataclassclassAgentUpdatedStreamEvent:
agent: Agent
"""The new current agent."""type: Literal["agent_updated"]
"""Event type identifier."""
Handling Agent Updates
Python
asyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, AgentUpdatedStreamEvent):
print(f"Agent changed to: {event.agent.name}")
Handoff Detection
Detect handoffs via agent updates:
Python
current_agent = Noneasyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, AgentUpdatedStreamEvent):
if current_agent and current_agent != event.agent:
print(f"Handoff: {current_agent.name} -> {event.agent.name}")
current_agent = event.agent
RawResponsesStreamEvent
Event Structure
Python
@dataclassclassRawResponsesStreamEvent:
event: TResponseStreamEvent
"""The raw stream event from the model."""type: Literal["raw_response"]
"""Event type identifier."""
Handling Raw Events
Python
asyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, RawResponsesStreamEvent):
raw = event.event
if raw.type == "response.output_item.done":
print("Output item completed")
elif raw.type == "response.output_item.added":
print("Output item added")
Delta Streaming
Stream content deltas:
Python
asyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, RawResponsesStreamEvent):
if event.event.type == "response.output_item.delta":
ifhasattr(event.event.delta, "content"):
print(event.event.delta.content, end="", flush=True)
Streaming Patterns
1. Text Streaming
Stream text output character by character:
Python
asyncdefstream_text(agent, input):
"""Stream text output."""asyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, RunItemStreamEvent):
ifisinstance(event.item, MessageOutputItem):
for content in event.item.raw_item.content:
if content.type == "text":
print(content.text, end="", flush=True)
print() # New line at end
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/stream")asyncdefstream_endpoint(query: str):
"""Stream via SSE."""asyncdefevent_generator():
asyncfor event in Runner.run_streamed(agent, query):
yieldf"data: {serialize_event(event)}\n\n"return StreamingResponse(
event_generator(),
media_type="text/event-stream",
)
Streaming and UI
React Component
Stream to React component:
JavaScript
asyncfunctionstreamAgentResponse(agent, input, onEvent) {
const response = awaitfetch('/stream', {
method: 'POST',
body: JSON.stringify({ input }),
});
const reader = response.body.getReader();
const decoder = newTextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const events = chunk.split('\n\n').filter(Boolean);
for (const event of events) {
if (event.startsWith('data: ')) {
const data = JSON.parse(event.slice(6));
onEvent(data);
}
}
}
}
Terminal UI
Stream to terminal with rich output:
Python
from rich.console import Console
from rich.live import Live
from rich.panel import Panel
console = Console()
asyncdefstream_to_terminal(agent, input):
"""Stream to rich terminal."""with Live(console, refresh_per_second=10) as live:
current_output = ""asyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, RunItemStreamEvent):
ifisinstance(event.item, MessageOutputItem):
for content in event.item.raw_item.content:
if content.type == "text":
current_output += content.text
live.update(Panel(current_output))
Streaming Performance
Event Batching
Batch events for efficiency:
Python
asyncdefbatch_stream(agent, input, batch_size=10):
"""Stream with batching."""
batch = []
asyncfor event in Runner.run_streamed(agent, input):
batch.append(event)
iflen(batch) >= batch_size:
process_batch(batch)
batch = []
if batch:
process_batch(batch)
Event Filtering
Filter events to reduce overhead:
Python
asyncdeffiltered_stream(agent, input):
"""Stream with filtering."""asyncfor event in Runner.run_streamed(agent, input):
# Only process message eventsifisinstance(event, RunItemStreamEvent):
ifisinstance(event.item, MessageOutputItem):
yield event
asyncdefcancellable_stream(agent, input, timeout=30):
"""Stream with cancellation."""try:
asyncfor event in asyncio.timeout_for(
Runner.run_streamed(agent, input),
timeout=timeout
):
process_event(event)
except asyncio.TimeoutError:
print("Streaming timed out")
Graceful Cancellation
Cancel gracefully:
Python
asyncdefgraceful_cancel(agent, input):
"""Cancel gracefully."""
stream = Runner.run_streamed(agent, input)
try:
asyncfor event in stream:
if should_cancel():
break
process_event(event)
finally:
await stream.aclose()
Streaming Best Practices
1. Use Async Iterators
Use async iterators properly:
Python
# Good - async forasyncfor event in Runner.run_streamed(agent, input):
process_event(event)
# Avoid - collect all first
events = [event asyncfor event in Runner.run_streamed(agent, input)]
# Defeats purpose of streaming
2. Handle All Event Types
Handle all event types:
Python
# Good - handle all typesasyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, RunItemStreamEvent):
handle_run_item(event)
elifisinstance(event, AgentUpdatedStreamEvent):
handle_agent_update(event)
elifisinstance(event, RawResponsesStreamEvent):
handle_raw_event(event)
# Avoid - only handle one typeasyncfor event in Runner.run_streamed(agent, input):
ifisinstance(event, RunItemStreamEvent):
handle_run_item(event)
# Miss other events
3. Flush Output
Flush output for real-time display:
Python
# Good - flushprint(text, end="", flush=True)
# Avoid - no flushprint(text, end="") # Might buffer
4. Handle Backpressure
Handle backpressure:
Python
# Good - handle backpressure
queue = asyncio.Queue(maxsize=100)
# ... manage queue# Avoid - ignore backpressureasyncfor event in Runner.run_streamed(agent, input):
process_event(event) # Might block if slow
5. Clean Up Resources
Clean up streaming resources:
Python
# Good - clean upasyncwith Runner.run_streamed(agent, input) as stream:
asyncfor event in stream:
process_event(event)
# Avoid - leak resources
stream = Runner.run_streamed(agent, input)
asyncfor event in stream:
process_event(event)
# Forgot to close
Common Streaming Patterns
1. Chat Interface
Streaming chat interface:
Python
asyncdefchat_interface(agent, user_input):
"""Streaming chat interface."""print("Assistant: ", end="", flush=True)
asyncfor event in Runner.run_streamed(agent, user_input):
ifisinstance(event, RunItemStreamEvent):
ifisinstance(event.item, MessageOutputItem):
for content in event.item.raw_item.content:
if content.type == "text":
print(content.text, end="", flush=True)
print() # New line
asyncdefaggregate_streams(agents, inputs):
"""Aggregate multiple streams."""
streams = [
Runner.run_streamed(agent, input)
for agent, inputinzip(agents, inputs)
]
asyncfor event in merge_async_iterators(*streams):
process_event(event)
4. Conditional Streaming
Stream based on conditions:
Python
asyncdefconditional_stream(agent, input, stream_enabled=True):
"""Stream conditionally."""if stream_enabled:
asyncfor event in Runner.run_streamed(agent, input):
process_event(event)
else:
result = await Runner.run(agent, input)
return result.final_output
5. Streaming with Callbacks
Use callbacks for event handling:
Python
asyncdefstream_with_callbacks(agent, input, callbacks):
"""Stream with callbacks."""asyncfor event in Runner.run_streamed(agent, input):
event_type = type(event).__name__
if event_type in callbacks:
await callbacks[event_type](event)
Streaming and Testing
Testing Streaming
Test streaming behavior:
Python
@pytest.mark.asyncioasyncdeftest_streaming():
"""Test streaming."""
events = []
asyncfor event in Runner.run_streamed(agent, input):
events.append(event)
assertlen(events) > 0assertany(isinstance(e, RunItemStreamEvent) for e in events)
Mock Streaming
Mock streaming for tests:
Python
asyncdefmock_stream():
"""Mock streaming for tests."""yield RunItemStreamEvent(item=mock_item)
yield AgentUpdatedStreamEvent(agent=mock_agent)
asyncdeftest_with_mock_stream():
"""Test with mock stream."""
events = []
asyncfor event in mock_stream():
events.append(event)
assertlen(events) == 2