Skip to main content
The Agent Runtime is the execution environment that manages agent lifecycle, message routing, and system-level operations. It provides the infrastructure for agents to send messages, publish events, and maintain state.

AgentRuntime Protocol

The AgentRuntime protocol defines the interface all runtime implementations must provide:
from autogen_core import AgentRuntime, AgentId, TopicId, Agent, Subscription
from typing import Any, Awaitable, Callable

class AgentRuntime(Protocol):
    async def send_message(
        self,
        message: Any,
        recipient: AgentId,
        *,
        sender: AgentId | None = None,
        cancellation_token: CancellationToken | None = None,
        message_id: str | None = None,
    ) -> Any:
        """Send a message to an agent and get a response."""
        ...
    
    async def publish_message(
        self,
        message: Any,
        topic_id: TopicId,
        *,
        sender: AgentId | None = None,
        cancellation_token: CancellationToken | None = None,
        message_id: str | None = None,
    ) -> None:
        """Publish a message to all subscribers of a topic."""
        ...
    
    async def register_factory(
        self,
        type: str | AgentType,
        agent_factory: Callable[[], T | Awaitable[T]],
        *,
        expected_class: type[T] | None = None,
    ) -> AgentType:
        """Register an agent factory for on-demand instantiation."""
        ...
    
    async def register_agent_instance(
        self,
        agent_instance: Agent,
        agent_id: AgentId,
    ) -> AgentId:
        """Register a specific agent instance."""
        ...
    
    async def add_subscription(self, subscription: Subscription) -> None:
        """Add a subscription for message routing."""
        ...
    
    async def save_state(self) -> Mapping[str, Any]:
        """Save runtime and all agent states."""
        ...
    
    async def load_state(self, state: Mapping[str, Any]) -> None:
        """Restore runtime and all agent states."""
        ...

SingleThreadedAgentRuntime

SingleThreadedAgentRuntime is a development-focused runtime that processes all messages in a single asyncio event loop.
SingleThreadedAgentRuntime is designed for development and standalone applications. For production deployments with high throughput or distributed requirements, use the distributed runtime.

Initialization

from autogen_core import SingleThreadedAgentRuntime
from opentelemetry.trace import TracerProvider

runtime = SingleThreadedAgentRuntime(
    intervention_handlers=None,  # Optional message interceptors
    tracer_provider=None,        # Optional OpenTelemetry provider
    ignore_unhandled_exceptions=True  # Ignore exceptions in event handlers
)
Parameters:
intervention_handlers
List[InterventionHandler]
default:"None"
List of handlers that can intercept and modify messages before delivery. Useful for logging, validation, or message transformation.
tracer_provider
TracerProvider
default:"None"
OpenTelemetry tracer provider for distributed tracing. Set AUTOGEN_DISABLE_RUNTIME_TRACING=true to disable.
ignore_unhandled_exceptions
bool
default:"True"
If True, exceptions in event handlers won’t stop the runtime. Exceptions are raised on next process_next() or stop() call. RPC handler exceptions are always propagated.

Lifecycle Management

Starting the Runtime

import asyncio
from autogen_core import SingleThreadedAgentRuntime

async def main():
    runtime = SingleThreadedAgentRuntime()
    
    # Register agents...
    
    # Start background message processing
    runtime.start()
    
    # Runtime is now processing messages
    # Your application logic here...
    
    # Stop when done
    await runtime.stop_when_idle()

asyncio.run(main())

Stopping the Runtime

Three ways to stop the runtime:
# 1. Stop immediately (discards queued messages)
await runtime.stop()

# 2. Stop when message queue is empty (recommended)
await runtime.stop_when_idle()

# 3. Stop when condition is met (legacy, not recommended)
await runtime.stop_when(lambda: task_complete)
Use stop_when_idle() in most cases. It ensures all queued messages are processed before shutdown.

Cleanup

# Close runtime and all agents
await runtime.close()
This calls stop() if the runtime is running, then calls close() on all instantiated agents.

Agent Registration

Factory Registration

Register a factory function that creates agents on-demand:
from autogen_core import SingleThreadedAgentRuntime, AgentType

runtime = SingleThreadedAgentRuntime()

# Using BaseAgent.register() (recommended)
await MyAgent.register(
    runtime=runtime,
    type="my_agent",
    factory=lambda: MyAgent("Agent description")
)

# Using runtime.register_factory() (low-level)
agent_type = await runtime.register_factory(
    type="my_agent",
    agent_factory=lambda: MyAgent("Agent description"),
    expected_class=MyAgent  # Optional runtime validation
)
Factory function signature:
# Zero-argument factory (recommended)
def factory() -> Agent:
    return MyAgent()

# Access runtime and ID via context
from autogen_core import AgentInstantiationContext

def factory() -> Agent:
    runtime = AgentInstantiationContext.current_runtime()
    agent_id = AgentInstantiationContext.current_agent_id()
    return MyAgent(runtime, agent_id)
Two-argument factories factory(runtime: AgentRuntime, agent_id: AgentId) are deprecated. Use AgentInstantiationContext instead.

Instance Registration

Register a pre-created agent instance:
from autogen_core import AgentId

agent = MyAgent("My agent")

# Using BaseAgent.register_instance() (recommended)
await agent.register_instance(
    runtime=runtime,
    agent_id=AgentId("my_agent", "instance-1")
)

# Using runtime.register_agent_instance() (low-level)
await runtime.register_agent_instance(
    agent_instance=agent,
    agent_id=AgentId("my_agent", "instance-1")
)
All instances of the same agent type must be of the same class. Mixing factories and instances for the same type is not allowed.

Message Operations

Sending Direct Messages

from autogen_core import AgentId, CancellationToken
from dataclasses import dataclass

@dataclass
class TaskRequest:
    task_id: str

# Send message and await response
response = await runtime.send_message(
    message=TaskRequest(task_id="123"),
    recipient=AgentId("worker", "instance-1"),
    sender=AgentId("coordinator", "default"),  # Optional
    cancellation_token=CancellationToken(),  # Optional
    message_id="unique-msg-id"  # Optional, auto-generated if None
)
Parameters:
message
Any
required
The message object to send. Must be serializable.
recipient
AgentId
required
The agent that should receive and process the message.
sender
AgentId | None
default:"None"
The sending agent’s ID. Should only be None for external (non-agent) senders.
cancellation_token
CancellationToken | None
default:"None"
Token to cancel the operation if needed.
message_id
str | None
default:"None"
Unique message identifier. Auto-generated UUID if not provided.

Publishing to Topics

from autogen_core import TopicId

@dataclass
class StatusUpdate:
    status: str

# Publish to all subscribers
await runtime.publish_message(
    message=StatusUpdate(status="processing"),
    topic_id=TopicId(type="status", source="system"),
    sender=AgentId("worker", "instance-1")  # Optional
)
publish_message() is fire-and-forget. It doesn’t wait for responses or confirm delivery to subscribers.

Subscription Management

from autogen_core import TypeSubscription, DefaultSubscription

# Subscribe agent to topic
await runtime.add_subscription(
    TypeSubscription(
        topic_type="status",
        agent_type="logger"
    )
)

# Remove subscription by ID
await runtime.remove_subscription(subscription_id="sub-uuid")
See Message Passing for details on subscription types.

State Persistence

Save State

# Save all agent states
state = await runtime.save_state()
print(state)  # {'agent_type/key': {'counter': 42, ...}, ...}

# Save to file
import json
with open('runtime_state.json', 'w') as f:
    json.dump(state, f)

Load State

import json

# Load from file
with open('runtime_state.json', 'r') as f:
    state = json.load(f)

# Restore state
await runtime.load_state(state)

Per-Agent State Operations

from autogen_core import AgentId

agent_id = AgentId("worker", "instance-1")

# Get agent metadata
metadata = await runtime.agent_metadata(agent_id)
print(metadata.type, metadata.key, metadata.description)

# Save single agent state
agent_state = await runtime.agent_save_state(agent_id)

# Load single agent state
await runtime.agent_load_state(agent_id, agent_state)

Advanced Operations

Manual Message Processing

# Process messages without start()
while has_more_work:
    await runtime.process_next()  # Process one message from queue
process_next() raises unhandled exceptions. Cannot be called again after an exception is raised. Use start() for automatic message processing.

Accessing Agent Instances

from autogen_core import AgentId

# Get underlying agent instance (discouraged)
try:
    agent = await runtime.try_get_underlying_agent_instance(
        id=AgentId("worker", "instance-1"),
        type=MyAgent  # Expected type
    )
    # Direct access to agent
    print(agent.internal_state)
except LookupError:
    print("Agent not found")
except TypeError:
    print("Agent is wrong type")
Accessing agent instances directly breaks the Actor model. Use only for debugging or special cases.

Agent Lookup

from autogen_core import AgentType

# Get AgentId for lazy or eager instantiation
agent_id = await runtime.get(
    AgentType("worker"),
    key="instance-1",
    lazy=True  # Don't create agent yet
)

# Or from existing AgentId
agent_id = await runtime.get(existing_agent_id)

Message Serialization

from autogen_core import MessageSerializer
from typing import Any

class CustomSerializer(MessageSerializer[MyMessage]):
    @property
    def data_content_type(self) -> str:
        return "application/x-custom"
    
    def serialize(self, message: MyMessage) -> bytes:
        return json.dumps(message.__dict__).encode()
    
    def deserialize(self, data: bytes) -> MyMessage:
        return MyMessage(**json.loads(data))

# Register serializer
runtime.add_message_serializer(CustomSerializer())

Complete Example

import asyncio
from dataclasses import dataclass
from autogen_core import (
    SingleThreadedAgentRuntime,
    RoutedAgent,
    MessageContext,
    AgentId,
    TopicId,
    TypeSubscription,
    rpc,
    event
)

@dataclass
class WorkRequest:
    work_id: str
    data: str

@dataclass
class WorkResponse:
    work_id: str
    result: str

@dataclass
class ProgressEvent:
    work_id: str
    progress: float

class Worker(RoutedAgent):
    def __init__(self):
        super().__init__("Worker agent")
    
    @rpc
    async def process_work(self, message: WorkRequest, ctx: MessageContext) -> WorkResponse:
        # Publish progress updates
        for progress in [0.25, 0.5, 0.75, 1.0]:
            await self.publish_message(
                ProgressEvent(work_id=message.work_id, progress=progress),
                TopicId(type="progress", source=message.work_id)
            )
            await asyncio.sleep(0.1)
        
        result = f"Processed: {message.data}"
        return WorkResponse(work_id=message.work_id, result=result)

class Monitor(RoutedAgent):
    def __init__(self):
        super().__init__("Monitor agent")
        self.progress_log = []
    
    @event
    async def track_progress(self, message: ProgressEvent, ctx: MessageContext) -> None:
        self.progress_log.append((message.work_id, message.progress))
        print(f"Progress: {message.work_id} - {message.progress*100}%")

async def main():
    # Create and configure runtime
    runtime = SingleThreadedAgentRuntime()
    
    # Register agents
    await Worker.register(runtime, "worker", lambda: Worker())
    await Monitor.register(runtime, "monitor", lambda: Monitor())
    
    # Add subscription for monitor to receive progress events
    await runtime.add_subscription(
        TypeSubscription(topic_type="progress", agent_type="monitor")
    )
    
    # Start runtime
    runtime.start()
    
    try:
        # Send work request
        response = await runtime.send_message(
            WorkRequest(work_id="task-1", data="sample data"),
            recipient=AgentId("worker", "default")
        )
        print(f"Result: {response.result}")
        
        # Give time for progress events to process
        await asyncio.sleep(0.5)
        
    finally:
        # Clean shutdown
        await runtime.stop_when_idle()
        await runtime.close()

if __name__ == "__main__":
    asyncio.run(main())

Runtime Monitoring

# Check message queue size
queue_size = runtime.unprocessed_messages_count
print(f"Messages pending: {queue_size}")

Next Steps

Message Passing

Learn about messages, contexts, topics, and subscriptions

Event-Driven Architecture

Understand event handlers and message routing

Distributed Runtime

Scale across processes and languages

Core Overview

Return to Core API overview