The Agent Runtime is the execution environment that manages agent lifecycle, message routing, and system-level operations. It provides the infrastructure for agents to send messages, publish events, and maintain state.
AgentRuntime Protocol
The AgentRuntime protocol defines the interface all runtime implementations must provide:
from autogen_core import AgentRuntime, AgentId, TopicId, Agent, Subscription
from typing import Any, Awaitable, Callable
class AgentRuntime ( Protocol ):
async def send_message (
self ,
message : Any,
recipient : AgentId,
* ,
sender : AgentId | None = None ,
cancellation_token : CancellationToken | None = None ,
message_id : str | None = None ,
) -> Any:
"""Send a message to an agent and get a response."""
...
async def publish_message (
self ,
message : Any,
topic_id : TopicId,
* ,
sender : AgentId | None = None ,
cancellation_token : CancellationToken | None = None ,
message_id : str | None = None ,
) -> None :
"""Publish a message to all subscribers of a topic."""
...
async def register_factory (
self ,
type : str | AgentType,
agent_factory : Callable[[], T | Awaitable[T]],
* ,
expected_class : type[T] | None = None ,
) -> AgentType:
"""Register an agent factory for on-demand instantiation."""
...
async def register_agent_instance (
self ,
agent_instance : Agent,
agent_id : AgentId,
) -> AgentId:
"""Register a specific agent instance."""
...
async def add_subscription ( self , subscription : Subscription) -> None :
"""Add a subscription for message routing."""
...
async def save_state ( self ) -> Mapping[ str , Any]:
"""Save runtime and all agent states."""
...
async def load_state ( self , state : Mapping[ str , Any]) -> None :
"""Restore runtime and all agent states."""
...
SingleThreadedAgentRuntime
SingleThreadedAgentRuntime is a development-focused runtime that processes all messages in a single asyncio event loop.
SingleThreadedAgentRuntime is designed for development and standalone applications. For production deployments with high throughput or distributed requirements, use the distributed runtime.
Initialization
from autogen_core import SingleThreadedAgentRuntime
from opentelemetry.trace import TracerProvider
runtime = SingleThreadedAgentRuntime(
intervention_handlers = None , # Optional message interceptors
tracer_provider = None , # Optional OpenTelemetry provider
ignore_unhandled_exceptions = True # Ignore exceptions in event handlers
)
Parameters:
intervention_handlers
List[InterventionHandler]
default: "None"
List of handlers that can intercept and modify messages before delivery. Useful for logging, validation, or message transformation.
tracer_provider
TracerProvider
default: "None"
OpenTelemetry tracer provider for distributed tracing. Set AUTOGEN_DISABLE_RUNTIME_TRACING=true to disable.
ignore_unhandled_exceptions
If True, exceptions in event handlers won’t stop the runtime. Exceptions are raised on next process_next() or stop() call. RPC handler exceptions are always propagated.
Lifecycle Management
Starting the Runtime
import asyncio
from autogen_core import SingleThreadedAgentRuntime
async def main ():
runtime = SingleThreadedAgentRuntime()
# Register agents...
# Start background message processing
runtime.start()
# Runtime is now processing messages
# Your application logic here...
# Stop when done
await runtime.stop_when_idle()
asyncio.run(main())
Stopping the Runtime
Three ways to stop the runtime:
# 1. Stop immediately (discards queued messages)
await runtime.stop()
# 2. Stop when message queue is empty (recommended)
await runtime.stop_when_idle()
# 3. Stop when condition is met (legacy, not recommended)
await runtime.stop_when( lambda : task_complete)
Use stop_when_idle() in most cases. It ensures all queued messages are processed before shutdown.
Cleanup
# Close runtime and all agents
await runtime.close()
This calls stop() if the runtime is running, then calls close() on all instantiated agents.
Agent Registration
Factory Registration
Register a factory function that creates agents on-demand:
from autogen_core import SingleThreadedAgentRuntime, AgentType
runtime = SingleThreadedAgentRuntime()
# Using BaseAgent.register() (recommended)
await MyAgent.register(
runtime = runtime,
type = "my_agent" ,
factory = lambda : MyAgent( "Agent description" )
)
# Using runtime.register_factory() (low-level)
agent_type = await runtime.register_factory(
type = "my_agent" ,
agent_factory = lambda : MyAgent( "Agent description" ),
expected_class = MyAgent # Optional runtime validation
)
Factory function signature:
# Zero-argument factory (recommended)
def factory () -> Agent:
return MyAgent()
# Access runtime and ID via context
from autogen_core import AgentInstantiationContext
def factory () -> Agent:
runtime = AgentInstantiationContext.current_runtime()
agent_id = AgentInstantiationContext.current_agent_id()
return MyAgent(runtime, agent_id)
Two-argument factories factory(runtime: AgentRuntime, agent_id: AgentId) are deprecated. Use AgentInstantiationContext instead.
Instance Registration
Register a pre-created agent instance:
from autogen_core import AgentId
agent = MyAgent( "My agent" )
# Using BaseAgent.register_instance() (recommended)
await agent.register_instance(
runtime = runtime,
agent_id = AgentId( "my_agent" , "instance-1" )
)
# Using runtime.register_agent_instance() (low-level)
await runtime.register_agent_instance(
agent_instance = agent,
agent_id = AgentId( "my_agent" , "instance-1" )
)
All instances of the same agent type must be of the same class. Mixing factories and instances for the same type is not allowed.
Message Operations
Sending Direct Messages
from autogen_core import AgentId, CancellationToken
from dataclasses import dataclass
@dataclass
class TaskRequest :
task_id: str
# Send message and await response
response = await runtime.send_message(
message = TaskRequest( task_id = "123" ),
recipient = AgentId( "worker" , "instance-1" ),
sender = AgentId( "coordinator" , "default" ), # Optional
cancellation_token = CancellationToken(), # Optional
message_id = "unique-msg-id" # Optional, auto-generated if None
)
Parameters:
The message object to send. Must be serializable.
The agent that should receive and process the message.
sender
AgentId | None
default: "None"
The sending agent’s ID. Should only be None for external (non-agent) senders.
cancellation_token
CancellationToken | None
default: "None"
Token to cancel the operation if needed.
Unique message identifier. Auto-generated UUID if not provided.
Publishing to Topics
from autogen_core import TopicId
@dataclass
class StatusUpdate :
status: str
# Publish to all subscribers
await runtime.publish_message(
message = StatusUpdate( status = "processing" ),
topic_id = TopicId( type = "status" , source = "system" ),
sender = AgentId( "worker" , "instance-1" ) # Optional
)
publish_message() is fire-and-forget. It doesn’t wait for responses or confirm delivery to subscribers.
Subscription Management
from autogen_core import TypeSubscription, DefaultSubscription
# Subscribe agent to topic
await runtime.add_subscription(
TypeSubscription(
topic_type = "status" ,
agent_type = "logger"
)
)
# Remove subscription by ID
await runtime.remove_subscription( subscription_id = "sub-uuid" )
See Message Passing for details on subscription types.
State Persistence
Save State
# Save all agent states
state = await runtime.save_state()
print (state) # {'agent_type/key': {'counter': 42, ...}, ...}
# Save to file
import json
with open ( 'runtime_state.json' , 'w' ) as f:
json.dump(state, f)
Load State
import json
# Load from file
with open ( 'runtime_state.json' , 'r' ) as f:
state = json.load(f)
# Restore state
await runtime.load_state(state)
Per-Agent State Operations
from autogen_core import AgentId
agent_id = AgentId( "worker" , "instance-1" )
# Get agent metadata
metadata = await runtime.agent_metadata(agent_id)
print (metadata.type, metadata.key, metadata.description)
# Save single agent state
agent_state = await runtime.agent_save_state(agent_id)
# Load single agent state
await runtime.agent_load_state(agent_id, agent_state)
Advanced Operations
Manual Message Processing
# Process messages without start()
while has_more_work:
await runtime.process_next() # Process one message from queue
process_next() raises unhandled exceptions. Cannot be called again after an exception is raised. Use start() for automatic message processing.
Accessing Agent Instances
from autogen_core import AgentId
# Get underlying agent instance (discouraged)
try :
agent = await runtime.try_get_underlying_agent_instance(
id = AgentId( "worker" , "instance-1" ),
type = MyAgent # Expected type
)
# Direct access to agent
print (agent.internal_state)
except LookupError :
print ( "Agent not found" )
except TypeError :
print ( "Agent is wrong type" )
Accessing agent instances directly breaks the Actor model. Use only for debugging or special cases.
Agent Lookup
from autogen_core import AgentType
# Get AgentId for lazy or eager instantiation
agent_id = await runtime.get(
AgentType( "worker" ),
key = "instance-1" ,
lazy = True # Don't create agent yet
)
# Or from existing AgentId
agent_id = await runtime.get(existing_agent_id)
Message Serialization
from autogen_core import MessageSerializer
from typing import Any
class CustomSerializer (MessageSerializer[MyMessage]):
@ property
def data_content_type ( self ) -> str :
return "application/x-custom"
def serialize ( self , message : MyMessage) -> bytes :
return json.dumps(message. __dict__ ).encode()
def deserialize ( self , data : bytes ) -> MyMessage:
return MyMessage( ** json.loads(data))
# Register serializer
runtime.add_message_serializer(CustomSerializer())
Complete Example
import asyncio
from dataclasses import dataclass
from autogen_core import (
SingleThreadedAgentRuntime,
RoutedAgent,
MessageContext,
AgentId,
TopicId,
TypeSubscription,
rpc,
event
)
@dataclass
class WorkRequest :
work_id: str
data: str
@dataclass
class WorkResponse :
work_id: str
result: str
@dataclass
class ProgressEvent :
work_id: str
progress: float
class Worker ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Worker agent" )
@rpc
async def process_work ( self , message : WorkRequest, ctx : MessageContext) -> WorkResponse:
# Publish progress updates
for progress in [ 0.25 , 0.5 , 0.75 , 1.0 ]:
await self .publish_message(
ProgressEvent( work_id = message.work_id, progress = progress),
TopicId( type = "progress" , source = message.work_id)
)
await asyncio.sleep( 0.1 )
result = f "Processed: { message.data } "
return WorkResponse( work_id = message.work_id, result = result)
class Monitor ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Monitor agent" )
self .progress_log = []
@event
async def track_progress ( self , message : ProgressEvent, ctx : MessageContext) -> None :
self .progress_log.append((message.work_id, message.progress))
print ( f "Progress: { message.work_id } - { message.progress * 100 } %" )
async def main ():
# Create and configure runtime
runtime = SingleThreadedAgentRuntime()
# Register agents
await Worker.register(runtime, "worker" , lambda : Worker())
await Monitor.register(runtime, "monitor" , lambda : Monitor())
# Add subscription for monitor to receive progress events
await runtime.add_subscription(
TypeSubscription( topic_type = "progress" , agent_type = "monitor" )
)
# Start runtime
runtime.start()
try :
# Send work request
response = await runtime.send_message(
WorkRequest( work_id = "task-1" , data = "sample data" ),
recipient = AgentId( "worker" , "default" )
)
print ( f "Result: { response.result } " )
# Give time for progress events to process
await asyncio.sleep( 0.5 )
finally :
# Clean shutdown
await runtime.stop_when_idle()
await runtime.close()
if __name__ == "__main__" :
asyncio.run(main())
Runtime Monitoring
# Check message queue size
queue_size = runtime.unprocessed_messages_count
print ( f "Messages pending: { queue_size } " )
Next Steps
Message Passing Learn about messages, contexts, topics, and subscriptions
Event-Driven Architecture Understand event handlers and message routing
Distributed Runtime Scale across processes and languages
Core Overview Return to Core API overview