Building Custom Agents
While AutoGen provides built-in agents like AssistantAgent and CodeExecutorAgent, you can create custom agents with specialized behaviors by extending the BaseChatAgent class.
Understanding BaseChatAgent
All agents in AutoGen inherit from BaseChatAgent and must implement:
on_messages: Defines agent behavior in response to messages
on_reset: Resets the agent to its initial state
produced_message_types: List of message types the agent can produce
Optionally, implement on_messages_stream for streaming responses.
Simple Custom Agent Example
Here’s a countdown agent that produces a stream of messages:
from typing import AsyncGenerator, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseAgentEvent, BaseChatMessage, TextMessage
from autogen_core import CancellationToken
class CountDownAgent(BaseChatAgent):
def __init__(self, name: str, count: int = 3):
super().__init__(name, "A simple agent that counts down.")
self._count = count
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(
self,
messages: Sequence[BaseChatMessage],
cancellation_token: CancellationToken
) -> Response:
# Calls the on_messages_stream
response: Response | None = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
response = message
assert response is not None
return response
async def on_messages_stream(
self,
messages: Sequence[BaseChatMessage],
cancellation_token: CancellationToken
) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]:
inner_messages: List[BaseAgentEvent | BaseChatMessage] = []
for i in range(self._count, 0, -1):
msg = TextMessage(content=f"{i}...", source=self.name)
inner_messages.append(msg)
yield msg
# Return response at the end with all inner messages
yield Response(
chat_message=TextMessage(content="Done!", source=self.name),
inner_messages=inner_messages
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
# Usage
async def main():
countdown_agent = CountDownAgent("countdown", count=5)
async for message in countdown_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(f"Final: {message.chat_message.content}")
else:
print(message.content)
Arithmetic Agent Example
A more practical example - an agent that performs arithmetic operations:
from typing import Callable, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage
from autogen_core import CancellationToken
class ArithmeticAgent(BaseChatAgent):
def __init__(
self,
name: str,
description: str,
operator_func: Callable[[int], int]
) -> None:
super().__init__(name, description=description)
self._operator_func = operator_func
self._message_history: List[BaseChatMessage] = []
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(
self,
messages: Sequence[BaseChatMessage],
cancellation_token: CancellationToken
) -> Response:
# Update message history
self._message_history.extend(messages)
# Parse number from last message
assert isinstance(self._message_history[-1], TextMessage)
number = int(self._message_history[-1].content)
# Apply operation
result = self._operator_func(number)
# Create response
response_message = TextMessage(content=str(result), source=self.name)
self._message_history.append(response_message)
return Response(chat_message=response_message)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
self._message_history.clear()
# Create specialized agents
add_agent = ArithmeticAgent("add_agent", "Adds 1 to the number", lambda x: x + 1)
multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies by 2", lambda x: x * 2)
divide_agent = ArithmeticAgent("divide_agent", "Divides by 2, rounds down", lambda x: x // 2)
The on_messages method may be called with an empty list, meaning the agent was called previously without new messages. Always maintain message history internally.
Custom Agent with LLM
Create custom agents that use LLMs with custom behavior:
import asyncio
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage
from autogen_core import CancellationToken
from autogen_core.models import ChatCompletionClient, UserMessage
class CustomLLMAgent(BaseChatAgent):
def __init__(
self,
name: str,
model_client: ChatCompletionClient,
system_prompt: str = "You are a helpful assistant."
):
super().__init__(name, "Custom LLM-powered agent")
self._model_client = model_client
self._system_prompt = system_prompt
self._history: List[BaseChatMessage] = []
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(
self,
messages: Sequence[BaseChatMessage],
cancellation_token: CancellationToken
) -> Response:
self._history.extend(messages)
# Prepare messages for model
model_messages = [
UserMessage(content=self._system_prompt, source="system")
]
for msg in self._history:
if isinstance(msg, TextMessage):
model_messages.append(
UserMessage(content=msg.content, source=msg.source)
)
# Get model response
result = await self._model_client.create(model_messages)
response_text = result.content
response_message = TextMessage(content=response_text, source=self.name)
self._history.append(response_message)
return Response(chat_message=response_message)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
self._history.clear()
# Usage
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = CustomLLMAgent(
"custom_assistant",
model_client,
system_prompt="You are a helpful coding assistant."
)
response = await agent.on_messages(
[TextMessage(content="Write a hello world in Python", source="user")],
CancellationToken()
)
print(response.chat_message.content)
await model_client.close()
State Management
Implement state persistence for your custom agents:
from typing import Any, Mapping
class StatefulAgent(BaseChatAgent):
def __init__(self, name: str, initial_count: int = 0):
super().__init__(name, "Agent with state")
self._count = initial_count
self._history: List[BaseChatMessage] = []
async def save_state(self) -> Mapping[str, Any]:
return {
"count": self._count,
"history": [msg.model_dump() for msg in self._history]
}
async def load_state(self, state: Mapping[str, Any]) -> None:
self._count = state["count"]
# Reconstruct messages from state
self._history = [
TextMessage.model_validate(msg) for msg in state["history"]
]
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)
async def on_messages(
self,
messages: Sequence[BaseChatMessage],
cancellation_token: CancellationToken
) -> Response:
self._history.extend(messages)
self._count += len(messages)
response_message = TextMessage(
content=f"Processed {self._count} total messages",
source=self.name
)
return Response(chat_message=response_message)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
self._count = 0
self._history.clear()
Using Custom Agents in Teams
Custom agents work seamlessly with AutoGen’s team patterns:
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
# Create custom agents
add_agent = ArithmeticAgent("add_agent", "Adds 1", lambda x: x + 1)
multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies by 2", lambda x: x * 2)
divide_agent = ArithmeticAgent("divide_agent", "Divides by 2", lambda x: x // 2)
# Create team with custom agents
team = SelectorGroupChat(
[add_agent, multiply_agent, divide_agent],
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
termination_condition=MaxMessageTermination(10),
allow_repeated_speaker=True
)
# Run the team
result = await team.run(
task=[TextMessage(content="Transform 10 into 25", source="user")]
)
print(f"Final result: {result.messages[-1].content}")
Best Practices
Always track message history internally, as on_messages may be called with empty lists.
Respect the CancellationToken for long-running operations:
async def on_messages(self, messages, cancellation_token):
for i in range(100):
if cancellation_token.is_cancelled():
raise asyncio.CancelledError()
# Do work
await asyncio.sleep(0.1)
Clear all state in on_reset to ensure clean restarts:
async def on_reset(self, cancellation_token: CancellationToken) -> None:
self._history.clear()
self._counter = 0
self._cache.clear()
Properly specify message types your agent produces:
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage, ToolCallSummaryMessage)
Custom agents are not thread-safe or coroutine-safe. Do not share agents between multiple tasks or call methods concurrently.
Advanced: Nested Agents
Create agents that contain other agents or teams:
class NestedAgent(BaseChatAgent):
def __init__(self, name: str, inner_team: RoundRobinGroupChat):
super().__init__(name, "Agent with nested team")
self._inner_team = inner_team
async def on_messages(
self,
messages: Sequence[BaseChatMessage],
cancellation_token: CancellationToken
) -> Response:
# Run inner team with messages
result = await self._inner_team.run(
task=messages,
cancellation_token=cancellation_token
)
# Return last message from inner team
return Response(
chat_message=result.messages[-1],
inner_messages=result.messages[:-1]
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
await self._inner_team.reset()
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:
return (TextMessage,)