InkdownInkdown
Start writing

Study

59 filesยท8 subfolders

Shared Workspace

Study
core

16_STREAMING

Shared from "Study" on Inkdown

Streaming - Comprehensive Deep Dive

Overview

Streaming in the OpenAI Agents SDK enables real-time delivery of agent execution events. Think of Streaming as "live updates" or "progressive rendering" - instead of waiting for the entire agent run to complete before seeing results, you receive events as they happen. This is essential for responsive user interfaces, real-time feedback, and long-running operations.

Core Concepts

What is Streaming?

Streaming is the process of delivering events incrementally as they occur during agent execution:

  • Real-time updates - Receive events as they happen
  • Progressive output - Show results as they're generated
  • Event-driven - React to specific events
  • Non-blocking - Don't wait for completion
Why Streaming Matters
  1. Responsiveness - Provide immediate feedback to users
programming-language-concepts.md
zero-language-explanation.md
DB
01-introduction.md
02-relational-databases.md
03-database-design.md
04-indexing.md
05-transactions-acid.md
06-nosql-databases.md
07-query-optimization.md
08-replication-ha.md
09-sharding-partitioning.md
10-caching-strategies.md
11-cap-theorem.md
12-connection-pooling.md
13-backup-recovery.md
14-monitoring.md
15-database-selection.md
README.md
JS
Event loop
Merlin Backend
01-Orchestration.md
02-DeepResearch.md
03-Search.md
04-Scraping.md
05-Streaming.md
06-MultiProviderLLM.md
07-MemoryAndContext.md
08-ErrorHandling.md
09-RateLimiting.md
10-TaskQueue.md
11-SecurityAndAuth.md
Orchestration-2nd-draft
OpenAI Agents Python
00_OVERVIEW.md
01_AGENT_SYSTEM.md
02_RUNNER_SYSTEM.md
03_TOOL_SYSTEM.md
04_ITEMS_SYSTEM.md
05_GUARDRAILS.md
06_HANDOFFS.md
07_MEMORY_SESSIONS.md
08_MODEL_PROVIDERS.md
09_SANDBOX_SYSTEM.md
10_TRACING.md
11_RUN_STATE.md
12_CONTEXT.md
13_LIFECYCLE_HOOKS.md
14_CONFIGURATION.md
15_ERROR_HANDLING.md
16_STREAMING.md
17_EXTENSIONS.md
18_MCP_INTEGRATION.md
19_BEST_PRACTICES.md
20_ARCHITECTURE_PATTERNS.md
opencode-study
context-handling
core
Python
Alembic
Basics
sqlalchemy - fastapi
SQLAlchemy overview
tweets
system_design_for_agentic_apps.md
  • User Experience - Show progress during long operations
  • Interactivity - Allow users to see what's happening
  • Efficiency - Start processing partial results early
  • Transparency - Show agent reasoning and tool usage
  • Real-time Applications - Enable live chat interfaces
  • Streaming Methods

    run_streamed()

    The main streaming method:

    Python
    from agents import Runner
    
    async for event in Runner.run_streamed(agent, input):
        # Handle each event
        print(f"Event: {event}")

    Parameters:

    • Same as run() - agent, input, context, etc.
    • Returns an async iterator of stream events
    Stream Events

    Types of stream events:

    1. RunItemStreamEvent - A new run item was created
    2. AgentUpdatedStreamEvent - The current agent changed
    3. RawResponsesStreamEvent - A raw model stream event

    RunItemStreamEvent

    Event Structure
    Python
    @dataclass
    class RunItemStreamEvent:
        item: RunItem
        """The run item that was created."""
        
        type: Literal["run_item"]
        """Event type identifier."""
    Handling RunItem Events
    Python
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RunItemStreamEvent):
            item = event.item
            
            if isinstance(item, MessageOutputItem):
                print(f"Message: {item.raw_item.content}")
            
            elif isinstance(item, ToolCallItem):
                print(f"Tool call: {item.raw_item.name}")
            
            elif isinstance(item, ToolCallOutputItem):
                print(f"Tool output: {item.output}")
    Message Output

    Stream message content as it arrives:

    Python
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RunItemStreamEvent):
            if isinstance(event.item, MessageOutputItem):
                # Extract text content
                for content in event.item.raw_item.content:
                    if content.type == "text":
                        print(content.text, end="", flush=True)
    Tool Call Streaming

    Stream tool call events:

    Python
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RunItemStreamEvent):
            if isinstance(event.item, ToolCallItem):
                print(f"Calling tool: {event.item.raw_item.name}")
                print(f"Arguments: {event.item.raw_item.arguments}")
    Tool Output Streaming

    Stream tool results:

    Python
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RunItemStreamEvent):
            if isinstance(event.item, ToolCallOutputItem):
                print(f"Tool result: {event.item.output}")

    AgentUpdatedStreamEvent

    Event Structure
    Python
    @dataclass
    class AgentUpdatedStreamEvent:
        agent: Agent
        """The new current agent."""
        
        type: Literal["agent_updated"]
        """Event type identifier."""
    Handling Agent Updates
    Python
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, AgentUpdatedStreamEvent):
            print(f"Agent changed to: {event.agent.name}")
    Handoff Detection

    Detect handoffs via agent updates:

    Python
    current_agent = None
    
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, AgentUpdatedStreamEvent):
            if current_agent and current_agent != event.agent:
                print(f"Handoff: {current_agent.name} -> {event.agent.name}")
            current_agent = event.agent

    RawResponsesStreamEvent

    Event Structure
    Python
    @dataclass
    class RawResponsesStreamEvent:
        event: TResponseStreamEvent
        """The raw stream event from the model."""
        
        type: Literal["raw_response"]
        """Event type identifier."""
    Handling Raw Events
    Python
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RawResponsesStreamEvent):
            raw = event.event
            
            if raw.type == "response.output_item.done":
                print("Output item completed")
            
            elif raw.type == "response.output_item.added":
                print("Output item added")
    Delta Streaming

    Stream content deltas:

    Python
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RawResponsesStreamEvent):
            if event.event.type == "response.output_item.delta":
                if hasattr(event.event.delta, "content"):
                    print(event.event.delta.content, end="", flush=True)

    Streaming Patterns

    1. Text Streaming

    Stream text output character by character:

    Python
    async def stream_text(agent, input):
        """Stream text output."""
        async for event in Runner.run_streamed(agent, input):
            if isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, MessageOutputItem):
                    for content in event.item.raw_item.content:
                        if content.type == "text":
                            print(content.text, end="", flush=True)
        print()  # New line at end
    2. Progress Indicator

    Show progress during execution:

    Python
    async def stream_with_progress(agent, input):
        """Stream with progress indicator."""
        tool_count = 0
        
        async for event in Runner.run_streamed(agent, input):
            if isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, ToolCallItem):
                    tool_count += 1
                    print(f"Tool {tool_count}: {event.item.raw_item.name}")
            
            elif isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, MessageOutputItem):
                    print("Agent thinking...")
    3. Structured Output Streaming

    Stream structured output as it's built:

    Python
    async def stream_structured_output(agent, input):
        """Stream structured output."""
        output_parts = []
        
        async for event in Runner.run_streamed(agent, input):
            if isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, MessageOutputItem):
                    for content in event.item.raw_item.content:
                        if content.type == "text":
                            output_parts.append(content.text)
                            print(content.text, end="", flush=True)
        
        # Parse final output
        final_output = "".join(output_parts)
        parsed = json.loads(final_output)
        return parsed
    4. Multi-Agent Streaming

    Track multiple agents:

    Python
    async def stream_multi_agent(agent, input):
        """Stream with agent tracking."""
        current_agent = None
        
        async for event in Runner.run_streamed(agent, input):
            if isinstance(event, AgentUpdatedStreamEvent):
                if current_agent:
                    print(f"{current_agent.name}: Finished")
                current_agent = event.agent
                print(f"{current_agent.name}: Started")
            
            elif isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, MessageOutputItem):
                    print(f"{current_agent.name}: ", end="")
                    for content in event.item.raw_item.content:
                        if content.type == "text":
                            print(content.text, end="", flush=True)
                    print()
    5. Tool Execution Visualization

    Visualize tool execution:

    Python
    async def stream_tools(agent, input):
        """Stream tool execution."""
        async for event in Runner.run_streamed(agent, input):
            if isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, ToolCallItem):
                    print(f"๐Ÿ”ง {event.item.raw_item.name}")
                    print(f"   Args: {event.item.raw_item.arguments}")
                
                elif isinstance(event.item, ToolCallOutputItem):
                    print(f"โœ“ Result: {event.item.output[:100]}...")

    Streaming and WebSockets

    WebSocket Streaming

    Stream to WebSocket clients:

    Python
    async def stream_to_websocket(websocket, agent, input):
        """Stream events to WebSocket."""
        async for event in Runner.run_streamed(agent, input):
            event_data = {
                "type": event.type,
                "data": serialize_event(event),
            }
            await websocket.send_json(event_data)
    Server-Sent Events

    Stream via Server-Sent Events:

    Python
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    
    app = FastAPI()
    
    @app.get("/stream")
    async def stream_endpoint(query: str):
        """Stream via SSE."""
        async def event_generator():
            async for event in Runner.run_streamed(agent, query):
                yield f"data: {serialize_event(event)}\n\n"
        
        return StreamingResponse(
            event_generator(),
            media_type="text/event-stream",
        )

    Streaming and UI

    React Component

    Stream to React component:

    JavaScript
    async function streamAgentResponse(agent, input, onEvent) {
      const response = await fetch('/stream', {
        method: 'POST',
        body: JSON.stringify({ input }),
      });
    
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
    
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
    
        const chunk = decoder.decode(value);
        const events = chunk.split('\n\n').filter(Boolean);
    
        for (const event of events) {
          if (event.startsWith('data: ')) {
            const data = JSON.parse(event.slice(6));
            onEvent(data);
          }
        }
      }
    }
    Terminal UI

    Stream to terminal with rich output:

    Python
    from rich.console import Console
    from rich.live import Live
    from rich.panel import Panel
    
    console = Console()
    
    async def stream_to_terminal(agent, input):
        """Stream to rich terminal."""
        with Live(console, refresh_per_second=10) as live:
            current_output = ""
            
            async for event in Runner.run_streamed(agent, input):
                if isinstance(event, RunItemStreamEvent):
                    if isinstance(event.item, MessageOutputItem):
                        for content in event.item.raw_item.content:
                            if content.type == "text":
                                current_output += content.text
                                live.update(Panel(current_output))

    Streaming Performance

    Event Batching

    Batch events for efficiency:

    Python
    async def batch_stream(agent, input, batch_size=10):
        """Stream with batching."""
        batch = []
        
        async for event in Runner.run_streamed(agent, input):
            batch.append(event)
            
            if len(batch) >= batch_size:
                process_batch(batch)
                batch = []
        
        if batch:
            process_batch(batch)
    Event Filtering

    Filter events to reduce overhead:

    Python
    async def filtered_stream(agent, input):
        """Stream with filtering."""
        async for event in Runner.run_streamed(agent, input):
            # Only process message events
            if isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, MessageOutputItem):
                    yield event
    Backpressure Handling

    Handle backpressure:

    Python
    async def stream_with_backpressure(agent, input, max_queue=100):
        """Stream with backpressure handling."""
        queue = asyncio.Queue(maxsize=max_queue)
        
        async def producer():
            async for event in Runner.run_streamed(agent, input):
                await queue.put(event)
        
        async def consumer():
            while True:
                event = await queue.get()
                process_event(event)
                queue.task_done()
        
        await asyncio.gather(producer(), consumer())

    Streaming and State

    Streaming with State

    Stream while maintaining state:

    Python
    async def stream_with_state(agent, input):
        """Stream while maintaining state."""
        state = {
            "tool_calls": [],
            "messages": [],
            "current_agent": None,
        }
        
        async for event in Runner.run_streamed(agent, input):
            if isinstance(event, AgentUpdatedStreamEvent):
                state["current_agent"] = event.agent.name
            
            elif isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, ToolCallItem):
                    state["tool_calls"].append(event.item.raw_item.name)
                
                elif isinstance(event.item, MessageOutputItem):
                    state["messages"].append(event.item.raw_item.content)
            
            yield event, state
    Streaming to State

    Accumulate streaming results:

    Python
    async def stream_to_result(agent, input):
        """Stream and accumulate result."""
        result = {
            "messages": [],
            "tool_calls": [],
            "final_output": None,
        }
        
        async for event in Runner.run_streamed(agent, input):
            if isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, MessageOutputItem):
                    for content in event.item.raw_item.content:
                        if content.type == "text":
                            result["messages"].append(content.text)
                
                elif isinstance(event.item, ToolCallItem):
                    result["tool_calls"].append({
                        "name": event.item.raw_item.name,
                        "arguments": event.item.raw_item.arguments,
                    })
        
        result["final_output"] = "".join(result["messages"])
        return result

    Streaming Errors

    Handling Streaming Errors

    Handle errors during streaming:

    Python
    async def safe_stream(agent, input):
        """Stream with error handling."""
        try:
            async for event in Runner.run_streamed(agent, input):
                try:
                    process_event(event)
                except Exception as e:
                    logger.error(f"Event processing error: {e}")
        except AgentsException as e:
            logger.error(f"Agent error during streaming: {e}")
            raise
    Error Events

    Errors may be sent as events:

    Python
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RunItemStreamEvent):
            if isinstance(event.item, ToolCallOutputItem):
                if "error" in event.item.output.lower():
                    print(f"Tool error: {event.item.output}")

    Streaming and Cancellation

    Cancellation

    Cancel streaming operations:

    Python
    async def cancellable_stream(agent, input, timeout=30):
        """Stream with cancellation."""
        try:
            async for event in asyncio.timeout_for(
                Runner.run_streamed(agent, input),
                timeout=timeout
            ):
                process_event(event)
        except asyncio.TimeoutError:
            print("Streaming timed out")
    Graceful Cancellation

    Cancel gracefully:

    Python
    async def graceful_cancel(agent, input):
        """Cancel gracefully."""
        stream = Runner.run_streamed(agent, input)
        
        try:
            async for event in stream:
                if should_cancel():
                    break
                process_event(event)
        finally:
            await stream.aclose()

    Streaming Best Practices

    1. Use Async Iterators

    Use async iterators properly:

    Python
    # Good - async for
    async for event in Runner.run_streamed(agent, input):
        process_event(event)
    
    # Avoid - collect all first
    events = [event async for event in Runner.run_streamed(agent, input)]
    # Defeats purpose of streaming
    2. Handle All Event Types

    Handle all event types:

    Python
    # Good - handle all types
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RunItemStreamEvent):
            handle_run_item(event)
        elif isinstance(event, AgentUpdatedStreamEvent):
            handle_agent_update(event)
        elif isinstance(event, RawResponsesStreamEvent):
            handle_raw_event(event)
    
    # Avoid - only handle one type
    async for event in Runner.run_streamed(agent, input):
        if isinstance(event, RunItemStreamEvent):
            handle_run_item(event)
        # Miss other events
    3. Flush Output

    Flush output for real-time display:

    Python
    # Good - flush
    print(text, end="", flush=True)
    
    # Avoid - no flush
    print(text, end="")  # Might buffer
    4. Handle Backpressure

    Handle backpressure:

    Python
    # Good - handle backpressure
    queue = asyncio.Queue(maxsize=100)
    # ... manage queue
    
    # Avoid - ignore backpressure
    async for event in Runner.run_streamed(agent, input):
        process_event(event)  # Might block if slow
    5. Clean Up Resources

    Clean up streaming resources:

    Python
    # Good - clean up
    async with Runner.run_streamed(agent, input) as stream:
        async for event in stream:
            process_event(event)
    
    # Avoid - leak resources
    stream = Runner.run_streamed(agent, input)
    async for event in stream:
        process_event(event)
    # Forgot to close

    Common Streaming Patterns

    1. Chat Interface

    Streaming chat interface:

    Python
    async def chat_interface(agent, user_input):
        """Streaming chat interface."""
        print("Assistant: ", end="", flush=True)
        
        async for event in Runner.run_streamed(agent, user_input):
            if isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, MessageOutputItem):
                    for content in event.item.raw_item.content:
                        if content.type == "text":
                            print(content.text, end="", flush=True)
        
        print()  # New line
    2. Progress Bar

    Show progress during tool execution:

    Python
    from tqdm import tqdm
    
    async def stream_with_progress_bar(agent, input):
        """Stream with progress bar."""
        tool_calls = []
        pbar = tqdm(desc="Tools", unit="call")
        
        async for event in Runner.run_streamed(agent, input):
            if isinstance(event, RunItemStreamEvent):
                if isinstance(event.item, ToolCallItem):
                    tool_calls.append(event.item.raw_item.name)
                    pbar.update(1)
                    pbar.set_description(f"Calling {event.item.raw_item.name}")
        
        pbar.close()
    3. Multi-Stream Aggregation

    Aggregate multiple streams:

    Python
    async def aggregate_streams(agents, inputs):
        """Aggregate multiple streams."""
        streams = [
            Runner.run_streamed(agent, input)
            for agent, input in zip(agents, inputs)
        ]
        
        async for event in merge_async_iterators(*streams):
            process_event(event)
    4. Conditional Streaming

    Stream based on conditions:

    Python
    async def conditional_stream(agent, input, stream_enabled=True):
        """Stream conditionally."""
        if stream_enabled:
            async for event in Runner.run_streamed(agent, input):
                process_event(event)
        else:
            result = await Runner.run(agent, input)
            return result.final_output
    5. Streaming with Callbacks

    Use callbacks for event handling:

    Python
    async def stream_with_callbacks(agent, input, callbacks):
        """Stream with callbacks."""
        async for event in Runner.run_streamed(agent, input):
            event_type = type(event).__name__
            if event_type in callbacks:
                await callbacks[event_type](event)

    Streaming and Testing

    Testing Streaming

    Test streaming behavior:

    Python
    @pytest.mark.asyncio
    async def test_streaming():
        """Test streaming."""
        events = []
        
        async for event in Runner.run_streamed(agent, input):
            events.append(event)
        
        assert len(events) > 0
        assert any(isinstance(e, RunItemStreamEvent) for e in events)
    Mock Streaming

    Mock streaming for tests:

    Python
    async def mock_stream():
        """Mock streaming for tests."""
        yield RunItemStreamEvent(item=mock_item)
        yield AgentUpdatedStreamEvent(agent=mock_agent)
    
    async def test_with_mock_stream():
        """Test with mock stream."""
        events = []
        async for event in mock_stream():
            events.append(event)
        
        assert len(events) == 2

    Summary

    Streaming enables real-time agent execution. Key takeaways:

    1. run_streamed() is the main streaming method
    2. RunItemStreamEvent - new run item created
    3. AgentUpdatedStreamEvent - agent changed
    4. RawResponsesStreamEvent - raw model event
    5. Text streaming - stream output character by character
    6. Progress indicators - show execution progress
    7. Structured output - stream as it's built
    8. Multi-agent - track multiple agents
    9. Tool visualization - show tool execution
    10. WebSocket - stream to WebSocket clients
    11. SSE - stream via Server-Sent Events
    12. React - stream to React components
    13. Terminal UI - stream to rich terminal
    14. Batching - batch events for efficiency
    15. Filtering - filter events to reduce overhead
    16. Backpressure - handle slow consumers
    17. State - maintain state during streaming
    18. Errors - handle streaming errors
    19. Cancellation - cancel streaming operations
    20. Best practices - async iterators, flush, cleanup

    Streaming is essential for building responsive, real-time agent interfaces.