Documentation Index Fetch the complete documentation index at: https://mintlify.com/microsoft/autogen/llms.txt
Use this file to discover all available pages before exploring further.
Message passing is the foundation of communication in AutoGen Core. Agents communicate exclusively through asynchronous messages, following the Actor model with no shared state.
Message Types
Any Python object can be a message. Common patterns:
Dataclass Messages
from dataclasses import dataclass
@dataclass
class TaskRequest :
task_id: str
priority: int
data: dict
@dataclass
class TaskResponse :
task_id: str
result: str
success: bool
@dataclass
class StatusEvent :
status: str
timestamp: float
Use dataclasses for messages. They’re simple, type-safe, and work well with serialization.
Pydantic Models
from pydantic import BaseModel, Field
class ValidationRequest ( BaseModel ):
data: dict
rules: list[ str ] = Field( default_factory = list )
class ValidationResponse ( BaseModel ):
valid: bool
errors: list[ str ] = Field( default_factory = list )
MessageContext
Every message handler receives a MessageContext with metadata about the message:
from dataclasses import dataclass
from autogen_core import MessageContext, AgentId, TopicId, CancellationToken
@dataclass
class MessageContext :
sender: AgentId | None
"""The agent that sent the message, or None if sent externally."""
topic_id: TopicId | None
"""The topic this message was published to, or None for direct messages."""
is_rpc: bool
"""True if this is an RPC (expects response), False for events."""
cancellation_token: CancellationToken
"""Token to check if operation was cancelled."""
message_id: str
"""Unique identifier for this message."""
Using MessageContext
from autogen_core import RoutedAgent, MessageContext, event, rpc
class MyAgent ( RoutedAgent ):
@rpc
async def handle_request ( self , message : Request, ctx : MessageContext) -> Response:
# Check who sent the message
if ctx.sender:
print ( f "Request from: { ctx.sender.type } / { ctx.sender.key } " )
# Check if operation was cancelled
if ctx.cancellation_token.is_cancelled():
raise asyncio.CancelledError()
# Reply with response
return Response( data = "processed" )
@event
async def handle_event ( self , message : StatusEvent, ctx : MessageContext) -> None :
# Check if published to topic
if ctx.topic_id:
print ( f "Event on topic: { ctx.topic_id.type } / { ctx.topic_id.source } " )
# Events don't return responses
assert not ctx.is_rpc
MessageHandlerContext
MessageHandlerContext provides access to the current agent’s ID from within a message handler:
from autogen_core import MessageHandlerContext, AgentId
class MyAgent ( RoutedAgent ):
@rpc
async def handle_request ( self , message : Request, ctx : MessageContext) -> Response:
# Get current agent's ID
my_id: AgentId = MessageHandlerContext.agent_id()
print ( f "I am: { my_id.type } / { my_id.key } " )
return Response()
MessageHandlerContext.agent_id() must be called from within a message handler. It raises RuntimeError if called outside a handler context.
Topics and Publishing
TopicId
TopicId identifies a publish-subscribe topic:
from autogen_core import TopicId
# Create topic ID
topic = TopicId(
type = "task.status" , # Event type (CloudEvents spec)
source = "worker-1" # Event source context
)
print ( str (topic)) # "task.status/worker-1"
# Parse from string
topic = TopicId.from_str( "task.status/worker-1" )
TopicId components:
Event type following CloudEvents spec. Must match pattern: ^[\w\-\.\:\=]+$ Examples: task.completed, user.login, system.error
Context where the event happened. Can be any string. Examples: worker-1, api-gateway, user-123
DefaultTopicId
DefaultTopicId is a predefined topic for simple pub-sub:
from autogen_core import DefaultTopicId
# Use default topic
await runtime.publish_message(
message = StatusEvent( status = "ready" ),
topic_id = DefaultTopicId()
)
Publishing Messages
From an agent:
class Publisher ( RoutedAgent ):
@rpc
async def do_work ( self , message : WorkRequest, ctx : MessageContext) -> WorkResponse:
# Publish progress event
await self .publish_message(
StatusEvent( status = "working" ),
TopicId( type = "status" , source = self .id.key)
)
# Do work...
return WorkResponse( result = "done" )
From the runtime:
# External publish (no sender)
await runtime.publish_message(
message = SystemEvent( event = "startup" ),
topic_id = TopicId( type = "system" , source = "runtime" )
)
Subscriptions
Subscriptions define which agents receive messages published to topics.
Subscription Protocol
from autogen_core import Subscription, TopicId, AgentId
from typing import Protocol
class Subscription ( Protocol ):
@ property
def id ( self ) -> str :
"""Unique subscription ID (usually UUID)."""
...
def is_match ( self , topic_id : TopicId) -> bool :
"""Check if this subscription matches the topic."""
...
def map_to_agent ( self , topic_id : TopicId) -> AgentId:
"""Map topic to agent ID that should handle it."""
...
TypeSubscription
TypeSubscription matches topics by type and creates agent instances per source:
from autogen_core import TypeSubscription, TopicId, AgentId
# Subscribe to topic type
subscription = TypeSubscription(
topic_type = "task.status" ,
agent_type = "monitor"
)
# How it works:
# TopicId(type="task.status", source="worker-1") -> AgentId(type="monitor", key="worker-1")
# TopicId(type="task.status", source="worker-2") -> AgentId(type="monitor", key="worker-2")
await runtime.add_subscription(subscription)
TypeSubscription creates separate agent instances for each source. Use when you want per-source state isolation.
Example:
class Monitor ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Monitor" )
self .events = [] # Each source gets its own monitor instance
@event
async def handle_status ( self , message : StatusEvent, ctx : MessageContext) -> None :
self .events.append(message)
print ( f "Monitor { self .id.key } got event: { message.status } " )
# Register
await Monitor.register(runtime, "monitor" , lambda : Monitor())
# Subscribe to status updates
await runtime.add_subscription(
TypeSubscription( topic_type = "status" , agent_type = "monitor" )
)
# Publish from different sources
await runtime.publish_message(
StatusEvent( status = "ready" ),
TopicId( type = "status" , source = "worker-1" )
) # Creates Monitor(key="worker-1")
await runtime.publish_message(
StatusEvent( status = "busy" ),
TopicId( type = "status" , source = "worker-2" )
) # Creates Monitor(key="worker-2")
DefaultSubscription
DefaultSubscription subscribes an agent to the default topic:
from autogen_core import DefaultSubscription, default_subscription
# Manual subscription
await runtime.add_subscription(
DefaultSubscription( agent_type = "logger" )
)
# Or use decorator
@default_subscription
class Logger ( RoutedAgent ):
@event
async def log_event ( self , message : Any, ctx : MessageContext) -> None :
print ( f "Logged: { message } " )
TypePrefixSubscription
TypePrefixSubscription matches topics by type prefix (internal use):
from autogen_core import TypePrefixSubscription
# Subscribe to all topics starting with "task."
subscription = TypePrefixSubscription(
topic_type_prefix = "task." , # Must end with separator
agent_type = "task_handler"
)
TypePrefixSubscription is mainly used internally for direct message routing. For application-level subscriptions, use TypeSubscription or DefaultSubscription.
Subscription Decorators
from autogen_core import default_subscription, type_subscription, RoutedAgent
# Subscribe to default topic
@default_subscription
class DefaultHandler ( RoutedAgent ):
pass
# Subscribe to specific type
@type_subscription ( topic_type = "events" , agent_type = "event_handler" )
class EventHandler ( RoutedAgent ):
pass
# Register agent (subscriptions are added automatically)
await EventHandler.register(runtime, "event_handler" , lambda : EventHandler( "Handler" ))
Message Routing Patterns
Pattern 1: Direct RPC
# One-to-one request-response
response = await runtime.send_message(
message = Request( data = "process this" ),
recipient = AgentId( "processor" , "instance-1" )
)
Pattern 2: Broadcast Events
# One-to-many fire-and-forget
await runtime.publish_message(
message = Event( data = "something happened" ),
topic_id = TopicId( type = "events" , source = "system" )
)
Pattern 3: Per-Source Routing
# TypeSubscription creates separate agent per source
await runtime.add_subscription(
TypeSubscription( topic_type = "user.action" , agent_type = "user_handler" )
)
# Each user gets their own handler instance
await runtime.publish_message(
UserAction( action = "login" ),
TopicId( type = "user.action" , source = "user-123" )
) # Routes to AgentId("user_handler", "user-123")
await runtime.publish_message(
UserAction( action = "logout" ),
TopicId( type = "user.action" , source = "user-456" )
) # Routes to AgentId("user_handler", "user-456")
Cancellation
from autogen_core import CancellationToken
import asyncio
# Create cancellation token
token = CancellationToken()
# Send with token
task = asyncio.create_task(
runtime.send_message(
message = LongRunningRequest(),
recipient = AgentId( "worker" , "default" ),
cancellation_token = token
)
)
# Cancel after timeout
await asyncio.sleep( 5 )
token.cancel()
try :
await task
except asyncio.CancelledError:
print ( "Operation cancelled" )
Checking Cancellation in Handler
class Worker ( RoutedAgent ):
@rpc
async def long_task ( self , message : Request, ctx : MessageContext) -> Response:
for i in range ( 100 ):
# Check if cancelled
if ctx.cancellation_token.is_cancelled():
raise asyncio.CancelledError()
# Do work
await asyncio.sleep( 0.1 )
return Response( result = "completed" )
Message Serialization
Messages must be serializable for distributed runtime:
Custom Serializer
from autogen_core import MessageSerializer
import json
from typing import Any
class MyMessageSerializer (MessageSerializer[MyMessage]):
@ property
def data_content_type ( self ) -> str :
return "application/json"
def serialize ( self , message : MyMessage) -> bytes :
return json.dumps({
'field1' : message.field1,
'field2' : message.field2
}).encode( 'utf-8' )
def deserialize ( self , data : bytes ) -> MyMessage:
obj = json.loads(data.decode( 'utf-8' ))
return MyMessage( field1 = obj[ 'field1' ], field2 = obj[ 'field2' ])
# Register serializer
runtime.add_message_serializer(MyMessageSerializer())
Known Serializers
from autogen_core import try_get_known_serializers_for_type
# Get serializers for type
serializers = try_get_known_serializers_for_type(MyMessage)
if serializers:
print ( f "Found { len (serializers) } serializers" )
else :
print ( "No serializers found" )
Complete Example
import asyncio
from dataclasses import dataclass
from autogen_core import (
SingleThreadedAgentRuntime,
RoutedAgent,
MessageContext,
AgentId,
TopicId,
TypeSubscription,
event,
rpc
)
@dataclass
class TaskRequest :
task_id: str
data: str
@dataclass
class TaskResponse :
task_id: str
result: str
@dataclass
class TaskEvent :
task_id: str
event: str
class Worker ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Worker" )
@rpc
async def process ( self , message : TaskRequest, ctx : MessageContext) -> TaskResponse:
# Announce start
await self .publish_message(
TaskEvent( task_id = message.task_id, event = "started" ),
TopicId( type = "task.events" , source = message.task_id)
)
# Process
result = f "Processed: { message.data } "
# Announce completion
await self .publish_message(
TaskEvent( task_id = message.task_id, event = "completed" ),
TopicId( type = "task.events" , source = message.task_id)
)
return TaskResponse( task_id = message.task_id, result = result)
class Monitor ( RoutedAgent ):
def __init__ ( self ):
super (). __init__ ( "Monitor" )
self .events = []
@event
async def track ( self , message : TaskEvent, ctx : MessageContext) -> None :
self .events.append((message.task_id, message.event))
print ( f "Task { message.task_id } : { message.event } " )
async def main ():
runtime = SingleThreadedAgentRuntime()
# Register agents
await Worker.register(runtime, "worker" , lambda : Worker())
await Monitor.register(runtime, "monitor" , lambda : Monitor())
# Subscribe monitor to task events
await runtime.add_subscription(
TypeSubscription( topic_type = "task.events" , agent_type = "monitor" )
)
runtime.start()
# Send task
response = await runtime.send_message(
TaskRequest( task_id = "123" , data = "sample" ),
recipient = AgentId( "worker" , "default" )
)
print ( f "Response: { response.result } " )
await asyncio.sleep( 0.1 ) # Let events process
await runtime.stop_when_idle()
if __name__ == "__main__" :
asyncio.run(main())
Next Steps
Event-Driven Architecture Learn about event handlers and message routing
Distributed Runtime Scale message passing across processes
Agent Runtime Understand runtime operations
Core Overview Return to Core API overview