OpenAI Agents Python 21 files · 0 subfolders
Copy to Workspace 17_EXTENSIONS Shared from "OpenAI Agents Python" on Inkdown
Extensions - Comprehensive Deep Dive
Overview
Extensions in the OpenAI Agents SDK provide a way to add additional functionality beyond the core library. Think of Extensions as "plugins" or "add-ons" that extend the SDK's capabilities - they can add new tools, integrate with external services, provide custom storage backends, and more. This is essential for customizing the SDK to fit specific use cases and integrating with existing systems.
Core Concepts
What are Extensions?
Extensions are optional components that:
Add new functionality to the SDK
Integrate with external services
Provide custom implementations
Extend core capabilities without modifying core code
Why Extensions Matter
Modularity - Keep core library focused
Extensibility - Add new features easily
01_AGENT_SYSTEM.md
Integration - Connect to external systems
Customization - Tailor SDK to specific needs
Community - Share extensions with others
Maintenance - Extensions can evolve independently
Extension Types The SDK supports several types of extensions:
Memory Extensions - Custom session storage backends
MCP Extensions - Model Context Protocol integrations
Tool Extensions - Custom tool implementations
Model Provider Extensions - Custom LLM providers
Sandbox Extensions - Custom sandbox implementations
Tracing Extensions - Custom tracing backends
Memory Extensions
Memory Backends The SDK includes several memory backends:
SQLiteSession - Local SQLite database
OpenAIConversationsSession - OpenAI server-managed conversations
OpenAIResponsesCompactionSession - OpenAI with compaction
RedisSession - Redis-based storage (optional)
PostgreSQLSession - PostgreSQL-based storage (optional)
Custom Memory Backend Implement a custom memory backend:
Python
from agents import SessionABC, TResponseInputItem
class CustomMemorySession (SessionABC ):
"""Custom memory backend."""
def __init__ (self, backend: Any ):
self .backend = backend
async def save_items (
self,
conversation_id: str ,
items: list [TResponseInputItem],
) -> None :
"""Save items to custom backend."""
await self .backend.save(conversation_id, items)
async def load_items (
self,
conversation_id: str ,
*,
max_items: int | None = None ,
) -> list [TResponseInputItem]:
"""Load items from custom backend."""
items = await self .backend.load(conversation_id)
if max_items:
return items[-max_items:]
return items
Using Custom Memory Python
backend = MyCustomBackend()
session = CustomMemorySession(backend)
result = await Runner.run(
agent,
input ,
session=session,
conversation_id="conv-123" ,
)
Redis Memory Using Redis as memory backend (if available):
Python
from agents.extensions.memory import RedisSession
session = RedisSession(
redis_url="redis://localhost:6379" ,
prefix="agents:" ,
)
result = await Runner.run(
agent,
input ,
session=session,
)
PostgreSQL Memory Using PostgreSQL as memory backend (if available):
Python
from agents.extensions.memory import PostgreSQLSession
session = PostgreSQLSession(
connection_string="postgresql://user:pass@localhost/db" ,
)
result = await Runner.run(
agent,
input ,
session=session,
)
MCP Extensions
MCP Servers The SDK includes MCP server implementations:
MCPServerStdio - Standard I/O based MCP server
MCPServerSse - Server-Sent Events based MCP server
MCPServerStreamableHttp - Streamable HTTP MCP server
Custom MCP Server Implement a custom MCP server:
Python
from agents.mcp import MCPServer
class CustomMCPServer (MCPServer ):
"""Custom MCP server implementation."""
async def list_tools (self ) -> list [MCPServerTool]:
"""List available tools."""
return [
MCPServerTool(
name="custom_tool" ,
description="Custom tool" ,
input_schema={"type" : "object" , "properties" : {}},
)
]
async def call_tool (self, name: str , arguments: str ) -> str :
"""Call a tool."""
return f"Tool {name} called with {arguments} "
Using Custom MCP Python
server = CustomMCPServer()
agent = Agent(
name="mcp_agent" ,
instructions="Use MCP tools" ,
mcp_servers=[server],
)
MCP Tool Filtering Python
from agents.mcp import ToolFilter, ToolFilterStatic
filter_config = ToolFilterStatic(
include=["tool1" , "tool2" ],
exclude=["tool3" ],
)
agent = Agent(
name="mcp_agent" ,
mcp_servers=[server],
mcp_config=MCPConfig(
tool_filter=filter_config,
),
)
Tool Extensions
Custom Tool Implementations Create custom tool implementations:
Python
from agents import Tool, ToolContext
class CustomTool (Tool ):
"""Custom tool implementation."""
name = "custom_tool"
description = "A custom tool"
async def on_invoke_tool (
self,
context: ToolContext,
args: str ,
) -> Any :
"""Implement the tool logic."""
parsed_args = json.loads(args)
result = perform_custom_operation(parsed_args)
return result
Custom Tool with Schema Python
class CustomToolWithSchema (Tool ):
"""Custom tool with JSON schema."""
name = "custom_tool"
description = "A custom tool with schema"
params_json_schema = {
"type" : "object" ,
"properties" : {
"param1" : {"type" : "string" },
"param2" : {"type" : "number" },
},
"required" : ["param1" ],
}
async def on_invoke_tool (
self,
context: ToolContext,
args: str ,
) -> Any :
"""Implement the tool logic."""
parsed = json.loads(args)
return process(parsed)
Tool Families Create families of related tools:
Python
class DatabaseTool (Tool ):
"""Base class for database tools."""
def __init__ (self, connection: Any ):
self .connection = connection
class QueryTool (DatabaseTool ):
"""Database query tool."""
name = "query"
description = "Query the database"
async def on_invoke_tool (self, context, args ):
query = json.loads(args)["query" ]
return self .connection.execute(query)
class InsertTool (DatabaseTool ):
"""Database insert tool."""
name = "insert"
description = "Insert into database"
async def on_invoke_tool (self, context, args ):
data = json.loads(args)
return self .connection.insert(data)
Model Provider Extensions
Custom Model Provider Implement a custom model provider:
Python
from agents.models.interface import ModelProvider, Model
class CustomModelProvider (ModelProvider ):
"""Custom model provider."""
def __init__ (self, api_key: str ):
self .api_key = api_key
self .client = CustomClient(api_key)
def get_model (self, model_name: str | None ) -> Model:
"""Get a model instance."""
return CustomModel(
model_name or "default" ,
self .client,
)
async def aclose (self ) -> None :
"""Release resources."""
await self .client.close()
Custom Model Implement a custom model:
Python
class CustomModel (Model ):
"""Custom model implementation."""
def __init__ (self, model_name: str , client: CustomClient ):
self .model_name = model_name
self .client = client
async def get_response (
self,
system_instructions: str | None ,
input : str | list [TResponseInputItem],
model_settings: ModelSettings,
tools: list [Tool],
output_schema: AgentOutputSchemaBase | None ,
handoffs: list [Handoff],
tracing: ModelTracing,
*,
previous_response_id: str | None ,
conversation_id: str | None ,
prompt: ResponsePromptParam | None ,
) -> ModelResponse:
"""Get response from custom API."""
custom_input = self .convert_input(input )
custom_tools = self .convert_tools(tools)
response = await self .client.chat(
model=self .model_name,
messages=custom_input,
tools=custom_tools,
**self .convert_settings(model_settings),
)
return self .convert_response(response)
def stream_response (
self,
system_instructions: str | None ,
input : str | list [TResponseInputItem],
model_settings: ModelSettings,
tools: list [Tool],
output_schema: AgentOutputSchemaBase | None ,
handoffs: list [Handoff],
tracing: ModelTracing,
*,
previous_response_id: str | None ,
conversation_id: str | None ,
prompt: ResponsePromptParam | None ,
) -> AsyncIterator[TResponseStreamEvent]:
"""Stream response from custom API."""
async for chunk in self .client.chat_stream(...):
yield self .convert_chunk(chunk)
Using Custom Provider Python
provider = CustomModelProvider(api_key="your-key" )
result = await Runner.run(
agent,
input ,
model_provider=provider,
model="custom-model" ,
)
Sandbox Extensions
Custom Sandbox Client Implement a custom sandbox client:
Python
from agents.sandbox.session.base_sandbox_client import BaseSandboxClient
class CustomSandboxClient (BaseSandboxClient ):
"""Custom sandbox client."""
async def create_session (
self,
manifest: Manifest,
options: Any | None ,
) -> BaseSandboxSession:
"""Create a sandbox session."""
session = await self .backend.create_session(manifest)
return CustomSandboxSession(session)
async def resume_session (
self,
session_state: SandboxSessionState,
) -> BaseSandboxSession:
"""Resume a sandbox session."""
session = await self .backend.resume(session_state)
return CustomSandboxSession(session)
Custom Sandbox Session Implement a custom sandbox session:
Python
class CustomSandboxSession (BaseSandboxSession ):
"""Custom sandbox session."""
async def exec (
self,
command: str ,
timeout: int | None = None ,
) -> ExecResult:
"""Execute a command in the sandbox."""
result = await self .session.exec (command, timeout)
return ExecResult(
return_code=result.return_code,
stdout=result.stdout,
stderr=result.stderr,
)
async def read_file (self, path: str ) -> str :
"""Read a file from the sandbox."""
return await self .session.read_file(path)
async def write_file (self, path: str , content: str ) -> None :
"""Write a file to the sandbox."""
await self .session.write_file(path, content)
Tracing Extensions
Custom Trace Processor Implement a custom trace processor:
Python
from agents.tracing import TracingProcessor, Trace
class CustomTraceProcessor (TracingProcessor ):
"""Custom trace processor."""
def __init__ (self, destination: Any ):
self .destination = destination
async def process_trace (self, trace: Trace ) -> None :
"""Process a completed trace."""
await self .destination.send(trace)
Custom Trace Export Export traces to custom system:
Python
class CustomTraceExporter :
"""Custom trace exporter."""
async def export (self, trace: Trace ) -> None :
"""Export trace to custom system."""
data = self .serialize_trace(trace)
await self .api.send_trace(data)
Using Custom Tracing Python
exporter = CustomTraceExporter()
processor = CustomTraceProcessor(exporter)
add_trace_processor(processor)
result = await Runner.run(agent, input )
Extension Registration
Registering Extensions Register extensions with the SDK:
Python
from agents.extensions import register_extension
register_extension("memory" , CustomMemorySession)
register_extension("model_provider" , CustomModelProvider)
register_extension("sandbox" , CustomSandboxClient)
Extension Discovery Discover available extensions:
Python
from agents.extensions import list_extensions
extensions = list_extensions()
print (f"Available extensions: {extensions} " )
Extension Configuration
Extension Configuration Python
from agents.extensions import configure_extension
configure_extension(
"memory" ,
backend_type="redis" ,
redis_url="redis://localhost:6379" ,
)
configure_extension(
"model_provider" ,
provider_type="custom" ,
api_key="your-key" ,
)
Extension Settings Python
from agents.extensions import get_extension_settings
settings = get_extension_settings("memory" )
backend = create_memory_backend(**settings)
Extension Best Practices
1. Follow Extension Interface Implement the correct interface:
Python
class CustomSession (SessionABC ):
async def save_items (self, conversation_id, items ):
...
async def load_items (self, conversation_id, max_items=None ):
...
class CustomSession :
def save (self, data ):
...
2. Handle Errors Gracefully Handle errors in extensions:
Python
async def save_items (self, conversation_id, items ):
try :
await self .backend.save(conversation_id, items)
except BackendError as e:
logger.error(f"Backend error: {e} " )
raise SessionError(f"Failed to save: {e} " )
async def save_items (self, conversation_id, items ):
await self .backend.save(conversation_id, items)
3. Document Extensions Document extension behavior:
Python
class CustomSession (SessionABC ):
"""
Custom memory backend using XYZ database.
Configuration:
- connection_string: Database connection string
- table_name: Table name for storage
"""
def __init__ (self, connection_string: str , table_name: str ):
...
4. Test Extensions Test extensions thoroughly:
Python
@pytest.mark.asyncio
async def test_custom_session ():
"""Test custom session."""
session = CustomSession("test://db" , "test_table" )
await session.save_items("conv-1" , [{"type" : "user" , "content" : "test" }])
items = await session.load_items("conv-1" )
assert len (items) == 1
assert items[0 ]["content" ] == "test"
5. Version Extensions Python
__version__ = "1.0.0"
class CustomSession (SessionABC ):
"""Custom session v1.0.0."""
...
Common Extension Patterns
1. Cloud Storage Extension Cloud storage for sessions:
Python
class CloudStorageSession (SessionABC ):
"""Session storage in cloud storage."""
def __init__ (self, bucket: str , prefix: str ):
self .bucket = bucket
self .prefix = prefix
self .client = StorageClient()
async def save_items (self, conversation_id, items ):
key = f"{self.prefix} /{conversation_id} .json"
data = json.dumps(items)
await self .client.upload(self .bucket, key, data)
2. Cache Extension Caching layer for sessions:
Python
class CachedSession (SessionABC ):
"""Session with caching."""
def __init__ (self, inner: SessionABC, cache: Cache ):
self .inner = inner
self .cache = cache
async def load_items (self, conversation_id, max_items=None ):
cached = await self .cache.get(conversation_id)
if cached:
return cached
items = await self .inner.load_items(conversation_id, max_items)
await self .cache.set (conversation_id, items)
return items
3. Encryption Extension Python
class EncryptedSession (SessionABC ):
"""Encrypted session storage."""
def __init__ (self, inner: SessionABC, key: str ):
self .inner = inner
self .cipher = Fernet(key)
async def save_items (self, conversation_id, items ):
encrypted = self .cipher.encrypt(json.dumps(items).encode())
await self .inner.save_items(conversation_id, encrypted)
async def load_items (self, conversation_id, max_items=None ):
encrypted = await self .inner.load_items(conversation_id, max_items)
decrypted = json.loads(self .cipher.decrypt(encrypted))
return decrypted
4. Compression Extension Python
class CompressedSession (SessionABC ):
"""Compressed session storage."""
def __init__ (self, inner: SessionABC ):
self .inner = inner
async def save_items (self, conversation_id, items ):
data = json.dumps(items).encode()
compressed = gzip.compress(data)
await self .inner.save_items(conversation_id, compressed)
async def load_items (self, conversation_id, max_items=None ):
compressed = await self .inner.load_items(conversation_id, max_items)
decompressed = gzip.decompress(compressed)
return json.loads(decompressed)
5. Multi-Backend Extension Multiple storage backends:
Python
class MultiBackendSession (SessionABC ):
"""Session with multiple backends."""
def __init__ (self, primary: SessionABC, backups: list [SessionABC] ):
self .primary = primary
self .backups = backups
async def save_items (self, conversation_id, items ):
await self .primary.save_items(conversation_id, items)
for backup in self .backups:
asyncio.create_task(backup.save_items(conversation_id, items))
async def load_items (self, conversation_id, max_items=None ):
try :
return await self .primary.load_items(conversation_id, max_items)
except Exception:
for backup in self .backups:
try :
return await backup.load_items(conversation_id, max_items)
except Exception:
continue
raise
Extension Distribution
Packaging Extensions Package extensions for distribution:
Plain text
my-extensions/
├── setup.py
├── my_extensions/
│ ├── __init__.py
│ ├── memory.py
│ ├── tools.py
│ └── models.py
setup.py Python
from setuptools import setup, find_packages
setup(
name="my-extensions" ,
version="1.0.0" ,
packages=find_packages(),
install_requires=[
"openai-agents" ,
],
entry_points={
"agents.extensions" : [
"memory = my_extensions.memory:CustomMemorySession" ,
"model = my_extensions.models:CustomModelProvider" ,
],
},
)
Publishing Extensions Bash
python -m build
twine upload dist/
Extension and Compatibility
Version Compatibility Ensure extension compatibility:
Python
def check_compatibility (sdk_version: str ) -> bool :
"""Check if extension is compatible with SDK version."""
sdk = parse_version(sdk_version)
required = parse_version("1.0.0" )
return sdk >= required
Graceful Degradation Degrade gracefully if extension unavailable:
Python
try :
from my_extensions import CustomSession
session = CustomSession()
except ImportError:
from agents import SQLiteSession
session = SQLiteSession()
Extension and Testing
Testing Extensions Test extensions in isolation:
Python
@pytest.mark.asyncio
async def test_custom_memory ():
"""Test custom memory extension."""
session = CustomMemorySession(test_backend)
await session.save_items("test" , [{"type" : "user" , "content" : "test" }])
items = await session.load_items("test" )
assert len (items) == 1
Extension Fixtures Use fixtures for extension testing:
Python
@pytest.fixture
def custom_session ():
"""Fixture for custom session."""
return CustomMemorySession(test_backend)
@pytest.mark.asyncio
async def test_with_fixture (custom_session ):
"""Test with fixture."""
await custom_session.save_items("test" , test_items)
items = await custom_session.load_items("test" )
assert items == test_items
Summary Extensions extend SDK capabilities. Key takeaways:
Extensions add optional functionality
Memory extensions - custom storage backends
MCP extensions - protocol integrations
Tool extensions - custom tool implementations
Model provider extensions - custom LLM providers
Sandbox extensions - custom sandbox implementations
Tracing extensions - custom tracing backends
Custom memory - implement SessionABC
Custom MCP - implement MCPServer
Custom tools - extend Tool class
Custom models - implement Model and ModelProvider
Custom sandbox - implement BaseSandboxClient
Custom tracing - implement TracingProcessor
Registration - register extensions with SDK
Configuration - configure extension behavior
Interface compliance - follow extension interfaces
Error handling - handle errors gracefully
Documentation - document extension behavior
Testing - test extensions thoroughly
Distribution - package and publish extensions
Extensions are essential for customizing and extending the SDK for specific use cases.