Skip to main content
AutoGen Core implements an event-driven architecture where agents react to messages asynchronously. Message handlers are decorated methods that process specific message types.

Handler Decorators

AutoGen Core provides three decorators for defining message handlers:

@event

One-way messages (no response)

@rpc

Request-response messages

@message_handler

Generic handler (both event and RPC)

@event Decorator

Use @event for handlers that process messages without returning a response (fire-and-forget).

Basic Usage

from autogen_core import RoutedAgent, event, MessageContext
from dataclasses import dataclass

@dataclass
class LogEntry:
    level: str
    message: str

@dataclass
class StatusUpdate:
    status: str
    progress: float

class Logger(RoutedAgent):
    def __init__(self):
        super().__init__("Logger agent")
        self.logs = []
    
    @event
    async def handle_log(self, message: LogEntry, ctx: MessageContext) -> None:
        """Event handlers must return None."""
        self.logs.append((message.level, message.message))
        print(f"[{message.level}] {message.message}")
    
    @event
    async def handle_status(self, message: StatusUpdate, ctx: MessageContext) -> None:
        print(f"Status: {message.status} ({message.progress*100}%)")
Event handlers must have return type None. They’re called when ctx.is_rpc == False.

Event Handler Signature

@event
async def handler_name(
    self,
    message: MessageType,  # Type-hinted message parameter
    ctx: MessageContext     # Context parameter
) -> None:                 # Must return None
    # Handler implementation
    pass
Requirements:
  1. Must be an async method
  2. Must have exactly 3 parameters: self, message, ctx
  3. message must be type-hinted with the message type to handle
  4. ctx must be type MessageContext
  5. Return type must be None

Event Parameters

strict
bool
default:"True"
If True, raises exception for type mismatches. If False, logs warnings.
match
Callable[[MessageType, MessageContext], bool]
default:"None"
Secondary routing function. Applied in alphabetical order of handlers. First matching handler is called.

With Match Function

@dataclass
class Alert:
    severity: str
    message: str

class AlertHandler(RoutedAgent):
    @event(match=lambda msg, ctx: msg.severity == "critical")
    async def handle_critical(self, message: Alert, ctx: MessageContext) -> None:
        print(f"CRITICAL: {message.message}")
        # Send notification
    
    @event(match=lambda msg, ctx: msg.severity == "warning")
    async def handle_warning(self, message: Alert, ctx: MessageContext) -> None:
        print(f"Warning: {message.message}")
    
    @event  # Catch-all (evaluated last alphabetically)
    async def handle_info(self, message: Alert, ctx: MessageContext) -> None:
        print(f"Info: {message.message}")
Match functions are evaluated in alphabetical order of handler names. Only the first matching handler is called. Remaining handlers are skipped.

@rpc Decorator

Use @rpc for handlers that process requests and return responses.

Basic Usage

from autogen_core import RoutedAgent, rpc, MessageContext
from dataclasses import dataclass

@dataclass
class CalculateRequest:
    operation: str
    a: float
    b: float

@dataclass
class CalculateResponse:
    result: float

@dataclass
class ErrorResponse:
    error: str

class Calculator(RoutedAgent):
    def __init__(self):
        super().__init__("Calculator agent")
    
    @rpc
    async def calculate(
        self,
        message: CalculateRequest,
        ctx: MessageContext
    ) -> CalculateResponse | ErrorResponse:
        """RPC handlers must return a value."""
        try:
            if message.operation == "add":
                result = message.a + message.b
            elif message.operation == "multiply":
                result = message.a * message.b
            elif message.operation == "divide":
                if message.b == 0:
                    return ErrorResponse(error="Division by zero")
                result = message.a / message.b
            else:
                return ErrorResponse(error=f"Unknown operation: {message.operation}")
            
            return CalculateResponse(result=result)
        except Exception as e:
            return ErrorResponse(error=str(e))
RPC handlers must return a value (not None). They’re called when ctx.is_rpc == True.

RPC Handler Signature

@rpc
async def handler_name(
    self,
    message: RequestType,    # Type-hinted request parameter
    ctx: MessageContext      # Context parameter
) -> ResponseType:           # Type-hinted return type (not None)
    # Handler implementation
    return response
Requirements:
  1. Must be an async method
  2. Must have exactly 3 parameters: self, message, ctx
  3. message must be type-hinted with the request type to handle
  4. ctx must be type MessageContext
  5. Return type must be type-hinted (not None)
  6. Must return a value

RPC Parameters

strict
bool
default:"True"
If True, raises exception for type mismatches. If False, logs warnings.
match
Callable[[MessageType, MessageContext], bool]
default:"None"
Secondary routing function for selecting between multiple RPC handlers.

With Match Function

@dataclass
class Query:
    query_type: str
    params: dict

@dataclass
class Result:
    data: list

class QueryHandler(RoutedAgent):
    @rpc(match=lambda msg, ctx: msg.query_type == "user")
    async def query_users(self, message: Query, ctx: MessageContext) -> Result:
        # Handle user queries
        users = await self.db.query_users(message.params)
        return Result(data=users)
    
    @rpc(match=lambda msg, ctx: msg.query_type == "product")
    async def query_products(self, message: Query, ctx: MessageContext) -> Result:
        # Handle product queries
        products = await self.db.query_products(message.params)
        return Result(data=products)
    
    @rpc  # Default handler
    async def query_default(self, message: Query, ctx: MessageContext) -> Result:
        return Result(data=[])

@message_handler Decorator

Use @message_handler for generic handlers that can process both events and RPCs.

Basic Usage

from autogen_core import RoutedAgent, message_handler, MessageContext
from dataclasses import dataclass
from typing import Any

@dataclass
class Command:
    action: str
    params: dict

@dataclass
class CommandResult:
    success: bool
    output: str

class Executor(RoutedAgent):
    def __init__(self):
        super().__init__("Executor agent")
    
    @message_handler
    async def execute(
        self,
        message: Command,
        ctx: MessageContext
    ) -> CommandResult | None:
        """Handles both RPC and event messages."""
        result = await self.run_command(message.action, message.params)
        
        # Return result for RPC, None for events
        if ctx.is_rpc:
            return CommandResult(success=True, output=result)
        else:
            # Event - just log
            print(f"Command {message.action} executed: {result}")
            return None
    
    async def run_command(self, action: str, params: dict) -> str:
        # Execute command
        return f"Executed {action}"

Message Handler Signature

@message_handler
async def handler_name(
    self,
    message: MessageType,    # Type-hinted message parameter
    ctx: MessageContext      # Context parameter
) -> ResponseType | None:    # Can return value or None
    # Handler implementation
    if ctx.is_rpc:
        return response
    else:
        return None

Message Handler Parameters

strict
bool
default:"True"
If True, raises exception for type mismatches. If False, logs warnings.
match
Callable[[MessageType, MessageContext], bool]
default:"None"
Secondary routing function.

Message Routing

Type-Based Routing

Messages are routed to handlers based on the type hint:
from typing import Union

@dataclass
class MessageA:
    data: str

@dataclass
class MessageB:
    value: int

@dataclass
class MessageC:
    items: list

class MultiHandler(RoutedAgent):
    @event
    async def handle_a(self, message: MessageA, ctx: MessageContext) -> None:
        print(f"Got A: {message.data}")
    
    @event
    async def handle_b(self, message: MessageB, ctx: MessageContext) -> None:
        print(f"Got B: {message.value}")
    
    @event
    async def handle_ab(self, message: MessageA | MessageB, ctx: MessageContext) -> None:
        """Handles both MessageA and MessageB."""
        if isinstance(message, MessageA):
            print(f"A or B (A): {message.data}")
        else:
            print(f"A or B (B): {message.value}")
If multiple handlers match the same message type, they’re evaluated in alphabetical order. Only the first matching handler (including match function) is called.

Handler Selection Order

  1. Type match: Handler’s type annotation must match message type
  2. Alphabetical order: Handlers are sorted alphabetically by method name
  3. Match function: If provided, must return True
  4. First match wins: First handler that passes all checks is called
class OrderDemo(RoutedAgent):
    @event(match=lambda msg, ctx: msg.priority > 5)
    async def handle_high_priority(self, message: Task, ctx: MessageContext) -> None:
        print("High priority")
    
    @event(match=lambda msg, ctx: msg.priority <= 5)
    async def handle_low_priority(self, message: Task, ctx: MessageContext) -> None:
        print("Low priority")
    
    @event  # Catch-all (evaluated last)
    async def handle_task(self, message: Task, ctx: MessageContext) -> None:
        print("Default handler")

Unhandled Messages

class MyAgent(RoutedAgent):
    @event
    async def handle_known(self, message: KnownMessage, ctx: MessageContext) -> None:
        print("Handled")
    
    async def on_unhandled_message(self, message: Any, ctx: MessageContext) -> None:
        """Called when no handler matches."""
        print(f"Unhandled message type: {type(message).__name__}")
        # Log, raise exception, or handle gracefully
By default, on_unhandled_message logs an info message. Override it to customize behavior.

Advanced Patterns

Chained Handlers

class ChainedAgent(RoutedAgent):
    @rpc
    async def process_step1(self, message: Step1Request, ctx: MessageContext) -> Step2Request:
        # Process step 1
        result = await self.do_step1(message.data)
        
        # Forward to step 2 handler
        return await self.send_message(
            Step2Request(data=result),
            recipient=AgentId("processor", "step2")
        )

Publish from Handler

class Publisher(RoutedAgent):
    @rpc
    async def handle_request(self, message: Request, ctx: MessageContext) -> Response:
        # Process request
        result = await self.process(message)
        
        # Notify subscribers
        await self.publish_message(
            ProcessedEvent(request_id=message.id, result=result),
            TopicId(type="processed", source=message.id)
        )
        
        return Response(status="done")

Error Handling

class RobustAgent(RoutedAgent):
    @rpc
    async def handle_with_errors(
        self,
        message: Request,
        ctx: MessageContext
    ) -> Response | ErrorResponse:
        try:
            # Check cancellation
            if ctx.cancellation_token.is_cancelled():
                return ErrorResponse(error="Cancelled")
            
            # Process
            result = await self.process(message)
            return Response(result=result)
            
        except ValueError as e:
            return ErrorResponse(error=f"Validation error: {e}")
        except Exception as e:
            # Log error
            print(f"Unexpected error: {e}")
            return ErrorResponse(error="Internal error")

State Management in Handlers

class StatefulAgent(RoutedAgent):
    def __init__(self):
        super().__init__("Stateful agent")
        self.counter = 0
        self.history = []
    
    @event
    async def increment(self, message: IncrementEvent, ctx: MessageContext) -> None:
        self.counter += message.amount
        self.history.append((message.amount, self.counter))
    
    @rpc
    async def get_count(self, message: GetCountRequest, ctx: MessageContext) -> CountResponse:
        return CountResponse(count=self.counter)
    
    async def save_state(self) -> dict:
        return {
            "counter": self.counter,
            "history": self.history
        }
    
    async def load_state(self, state: dict) -> None:
        self.counter = state["counter"]
        self.history = state["history"]

Complete Example

import asyncio
from dataclasses import dataclass
from autogen_core import (
    SingleThreadedAgentRuntime,
    RoutedAgent,
    MessageContext,
    AgentId,
    TopicId,
    TypeSubscription,
    event,
    rpc
)

@dataclass
class TaskRequest:
    task_id: str
    operation: str
    data: dict

@dataclass
class TaskResponse:
    task_id: str
    result: str
    success: bool

@dataclass
class TaskEvent:
    task_id: str
    status: str
    timestamp: float

class Worker(RoutedAgent):
    def __init__(self):
        super().__init__("Worker")
        self.completed_tasks = set()
    
    @rpc
    async def handle_task(self, message: TaskRequest, ctx: MessageContext) -> TaskResponse:
        # Publish start event
        await self.publish_message(
            TaskEvent(task_id=message.task_id, status="started", timestamp=asyncio.get_event_loop().time()),
            TopicId(type="task.events", source=message.task_id)
        )
        
        # Process
        result = f"Processed {message.operation} with {message.data}"
        self.completed_tasks.add(message.task_id)
        
        # Publish completion event
        await self.publish_message(
            TaskEvent(task_id=message.task_id, status="completed", timestamp=asyncio.get_event_loop().time()),
            TopicId(type="task.events", source=message.task_id)
        )
        
        return TaskResponse(task_id=message.task_id, result=result, success=True)
    
    @rpc
    async def get_status(self, message: TaskStatusRequest, ctx: MessageContext) -> TaskStatusResponse:
        completed = message.task_id in self.completed_tasks
        return TaskStatusResponse(task_id=message.task_id, completed=completed)

@dataclass
class TaskStatusRequest:
    task_id: str

@dataclass
class TaskStatusResponse:
    task_id: str
    completed: bool

class Monitor(RoutedAgent):
    def __init__(self):
        super().__init__("Monitor")
        self.events = []
    
    @event
    async def track_event(self, message: TaskEvent, ctx: MessageContext) -> None:
        self.events.append((message.task_id, message.status, message.timestamp))
        print(f"[{message.timestamp:.2f}] Task {message.task_id}: {message.status}")

async def main():
    runtime = SingleThreadedAgentRuntime()
    
    # Register agents
    await Worker.register(runtime, "worker", lambda: Worker())
    await Monitor.register(runtime, "monitor", lambda: Monitor())
    
    # Subscribe monitor to events
    await runtime.add_subscription(
        TypeSubscription(topic_type="task.events", agent_type="monitor")
    )
    
    runtime.start()
    
    # Send task
    response = await runtime.send_message(
        TaskRequest(task_id="task-1", operation="analyze", data={"key": "value"}),
        recipient=AgentId("worker", "default")
    )
    print(f"Result: {response.result}")
    
    # Check status
    status = await runtime.send_message(
        TaskStatusRequest(task_id="task-1"),
        recipient=AgentId("worker", "default")
    )
    print(f"Completed: {status.completed}")
    
    await asyncio.sleep(0.1)
    await runtime.stop_when_idle()

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

Use @event when the sender doesn’t need a response. Perfect for logging, notifications, and status updates.
Use @rpc when the sender needs a response. Perfect for queries, calculations, and operations that produce results.
Always type-hint message parameters. This enables type checking and automatic routing.
RPC handlers should return error responses rather than raising exceptions when possible. This provides better error information to callers.
For long-running handlers, periodically check ctx.cancellation_token.is_cancelled().

Next Steps

Message Passing

Learn about messages, contexts, and subscriptions

Distributed Runtime

Scale event-driven agents across processes

Agent Runtime

Understand runtime operations

Core Overview

Return to Core API overview