Skip to main content
Learn how to create custom extensions that integrate seamlessly with AutoGen’s architecture.

Component Model

All AutoGen extensions implement the Component interface, which provides:
  • Configuration serialization: Save and load component state
  • Lifecycle management: Start, stop, and restart components
  • Type safety: Pydantic-based configuration validation

Creating a Custom Model Client

Model clients implement the ChatCompletionClient protocol.

Basic Structure

from typing import AsyncGenerator, List, Sequence
from autogen_core import Component, CancellationToken
from autogen_core.models import (
    ChatCompletionClient,
    CreateResult,
    LLMMessage,
    ModelCapabilities,
    RequestUsage,
)
from pydantic import BaseModel
from typing_extensions import Self

class MyModelConfig(BaseModel):
    """Configuration for MyModelClient."""
    model: str
    api_key: str
    temperature: float = 0.7
    max_tokens: int = 1024

class MyModelClient(ChatCompletionClient, Component[MyModelConfig]):
    """Custom model client implementation."""
    
    component_config_schema = MyModelConfig
    component_provider_override = "mypackage.MyModelClient"
    
    def __init__(
        self,
        model: str,
        api_key: str,
        temperature: float = 0.7,
        max_tokens: int = 1024,
    ):
        self._model = model
        self._api_key = api_key
        self._temperature = temperature
        self._max_tokens = max_tokens
    
    async def create(
        self,
        messages: Sequence[LLMMessage],
        cancellation_token: CancellationToken,
        **kwargs,
    ) -> CreateResult:
        """Generate a completion."""
        # Implement your API call here
        response = await self._call_api(messages)
        
        return CreateResult(
            content=response["content"],
            usage=RequestUsage(
                prompt_tokens=response["usage"]["prompt_tokens"],
                completion_tokens=response["usage"]["completion_tokens"],
            ),
            finish_reason="stop",
            cached=False,
        )
    
    async def create_stream(
        self,
        messages: Sequence[LLMMessage],
        cancellation_token: CancellationToken,
        **kwargs,
    ) -> AsyncGenerator[str | CreateResult, None]:
        """Generate a streaming completion."""
        async for chunk in self._stream_api(messages):
            if chunk["type"] == "content":
                yield chunk["data"]
            elif chunk["type"] == "done":
                yield CreateResult(
                    content="",
                    usage=RequestUsage(
                        prompt_tokens=chunk["usage"]["prompt_tokens"],
                        completion_tokens=chunk["usage"]["completion_tokens"],
                    ),
                    finish_reason="stop",
                    cached=False,
                )
    
    @property
    def capabilities(self) -> ModelCapabilities:
        """Return model capabilities."""
        return ModelCapabilities(
            vision=False,
            function_calling=True,
            json_output=True,
        )
    
    def count_tokens(self, messages: Sequence[LLMMessage]) -> int:
        """Count tokens in messages."""
        # Implement token counting
        return sum(len(str(msg.content).split()) for msg in messages)
    
    def remaining_tokens(self, messages: Sequence[LLMMessage]) -> int:
        """Calculate remaining tokens."""
        return self._max_tokens - self.count_tokens(messages)
    
    @property
    def model(self) -> str:
        """Return model name."""
        return self._model
    
    def _to_config(self) -> MyModelConfig:
        """Export configuration."""
        return MyModelConfig(
            model=self._model,
            api_key=self._api_key,
            temperature=self._temperature,
            max_tokens=self._max_tokens,
        )
    
    @classmethod
    def _from_config(cls, config: MyModelConfig) -> Self:
        """Create from configuration."""
        return cls(
            model=config.model,
            api_key=config.api_key,
            temperature=config.temperature,
            max_tokens=config.max_tokens,
        )
    
    async def _call_api(self, messages: Sequence[LLMMessage]) -> dict:
        """Make API call to your LLM service."""
        # Implement your API logic
        raise NotImplementedError
    
    async def _stream_api(self, messages: Sequence[LLMMessage]):
        """Stream API responses."""
        # Implement your streaming logic
        raise NotImplementedError

Message Conversion

Convert AutoGen messages to your API format:
from autogen_core.models import (
    SystemMessage,
    UserMessage,
    AssistantMessage,
    FunctionExecutionResultMessage,
)

def convert_messages(messages: Sequence[LLMMessage]) -> List[dict]:
    """Convert AutoGen messages to API format."""
    result = []
    
    for msg in messages:
        if isinstance(msg, SystemMessage):
            result.append({
                "role": "system",
                "content": msg.content,
            })
        elif isinstance(msg, UserMessage):
            result.append({
                "role": "user",
                "content": msg.content,
            })
        elif isinstance(msg, AssistantMessage):
            result.append({
                "role": "assistant",
                "content": msg.content,
                "tool_calls": msg.function_calls if msg.function_calls else None,
            })
        elif isinstance(msg, FunctionExecutionResultMessage):
            result.append({
                "role": "tool",
                "content": msg.content,
            })
    
    return result

Tool Support

Handle function calling:
from autogen_core import FunctionCall
from autogen_core.tools import Tool, ToolSchema

async def create(
    self,
    messages: Sequence[LLMMessage],
    cancellation_token: CancellationToken,
    tools: Sequence[Tool] | None = None,
    **kwargs,
) -> CreateResult:
    # Convert tools to API format
    api_tools = None
    if tools:
        api_tools = [self._convert_tool(tool) for tool in tools]
    
    response = await self._call_api(messages, tools=api_tools)
    
    # Handle tool calls in response
    function_calls = []
    if "tool_calls" in response:
        for tool_call in response["tool_calls"]:
            function_calls.append(
                FunctionCall(
                    id=tool_call["id"],
                    name=tool_call["function"]["name"],
                    arguments=tool_call["function"]["arguments"],
                )
            )
    
    return CreateResult(
        content=response["content"],
        usage=RequestUsage(
            prompt_tokens=response["usage"]["prompt_tokens"],
            completion_tokens=response["usage"]["completion_tokens"],
        ),
        finish_reason="tool_calls" if function_calls else "stop",
        cached=False,
        function_calls=function_calls,
    )

def _convert_tool(self, tool: Tool) -> dict:
    """Convert AutoGen tool to API format."""
    schema = tool.schema
    return {
        "type": "function",
        "function": {
            "name": schema["name"],
            "description": schema.get("description", ""),
            "parameters": schema.get("parameters", {}),
        },
    }

Creating a Custom Code Executor

Code executors implement the CodeExecutor protocol.

Basic Structure

from pathlib import Path
from typing import List, Optional
from autogen_core import Component, CancellationToken
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from pydantic import BaseModel
from typing_extensions import Self

class MyExecutorConfig(BaseModel):
    """Configuration for MyExecutor."""
    timeout: int = 60
    work_dir: Optional[str] = None

class MyCodeResult(BaseModel):
    """Execution result."""
    exit_code: int
    output: str
    code_file: Optional[str] = None

class MyCodeExecutor(CodeExecutor, Component[MyExecutorConfig]):
    """Custom code executor implementation."""
    
    component_config_schema = MyExecutorConfig
    component_provider_override = "mypackage.MyCodeExecutor"
    
    SUPPORTED_LANGUAGES = ["python", "bash"]
    
    def __init__(
        self,
        timeout: int = 60,
        work_dir: Optional[Path] = None,
    ):
        self._timeout = timeout
        self._work_dir = work_dir or Path.cwd()
        self._started = False
    
    async def execute_code_blocks(
        self,
        code_blocks: List[CodeBlock],
        cancellation_token: CancellationToken,
    ) -> MyCodeResult:
        """Execute code blocks."""
        if not self._started:
            raise RuntimeError("Executor not started")
        
        outputs = []
        exit_code = 0
        
        for block in code_blocks:
            if block.language not in self.SUPPORTED_LANGUAGES:
                return MyCodeResult(
                    exit_code=1,
                    output=f"Unsupported language: {block.language}",
                )
            
            result = await self._execute_block(block, cancellation_token)
            outputs.append(result["output"])
            
            if result["exit_code"] != 0:
                exit_code = result["exit_code"]
                break
        
        return MyCodeResult(
            exit_code=exit_code,
            output="\n".join(outputs),
        )
    
    async def _execute_block(
        self,
        block: CodeBlock,
        cancellation_token: CancellationToken,
    ) -> dict:
        """Execute a single code block."""
        # Implement your execution logic
        raise NotImplementedError
    
    async def start(self) -> None:
        """Start the executor."""
        self._started = True
        # Initialize resources
    
    async def stop(self) -> None:
        """Stop the executor."""
        self._started = False
        # Clean up resources
    
    async def restart(self) -> None:
        """Restart the executor."""
        await self.stop()
        await self.start()
    
    def _to_config(self) -> MyExecutorConfig:
        """Export configuration."""
        return MyExecutorConfig(
            timeout=self._timeout,
            work_dir=str(self._work_dir),
        )
    
    @classmethod
    def _from_config(cls, config: MyExecutorConfig) -> Self:
        """Create from configuration."""
        return cls(
            timeout=config.timeout,
            work_dir=Path(config.work_dir) if config.work_dir else None,
        )

Sandboxing

Implement safe execution:
import asyncio
import subprocess
from pathlib import Path

async def _execute_block(
    self,
    block: CodeBlock,
    cancellation_token: CancellationToken,
) -> dict:
    """Execute code block safely."""
    # Write code to file
    code_file = self._work_dir / f"code_{hash(block.code)}.py"
    code_file.write_text(block.code)
    
    try:
        # Execute with timeout
        process = await asyncio.create_subprocess_exec(
            "python",
            str(code_file),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=self._work_dir,
        )
        
        # Link cancellation token
        cancellation_token.link_future(asyncio.create_task(process.wait()))
        
        stdout, stderr = await asyncio.wait_for(
            process.communicate(),
            timeout=self._timeout,
        )
        
        return {
            "exit_code": process.returncode or 0,
            "output": stdout.decode() + stderr.decode(),
        }
    except asyncio.TimeoutError:
        process.kill()
        return {
            "exit_code": 124,
            "output": "Execution timed out",
        }
    finally:
        # Cleanup
        code_file.unlink(missing_ok=True)

Creating a Custom Tool

Tools implement the BaseTool interface.

Basic Structure

from typing import Any
from autogen_core import CancellationToken, Component
from autogen_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing_extensions import Self

class MyToolArgs(BaseModel):
    """Arguments for MyTool."""
    query: str = Field(..., description="Search query")
    max_results: int = Field(10, description="Maximum results to return")

class MyToolResult(BaseModel):
    """Result from MyTool."""
    results: list[dict]
    count: int

class MyToolConfig(BaseModel):
    """Configuration for MyTool."""
    api_key: str
    timeout: float = 30.0

class MyTool(BaseTool[MyToolArgs, MyToolResult], Component[MyToolConfig]):
    """Custom tool implementation."""
    
    component_config_schema = MyToolConfig
    component_provider_override = "mypackage.MyTool"
    
    def __init__(self, api_key: str, timeout: float = 30.0):
        self._api_key = api_key
        self._timeout = timeout
    
    async def run(self, args: MyToolArgs, cancellation_token: CancellationToken) -> MyToolResult:
        """Execute the tool."""
        # Implement your tool logic
        results = await self._search(args.query, args.max_results)
        
        return MyToolResult(
            results=results,
            count=len(results),
        )
    
    async def _search(self, query: str, max_results: int) -> list[dict]:
        """Perform search."""
        # Implement your search logic
        raise NotImplementedError
    
    def _to_config(self) -> MyToolConfig:
        """Export configuration."""
        return MyToolConfig(
            api_key=self._api_key,
            timeout=self._timeout,
        )
    
    @classmethod
    def _from_config(cls, config: MyToolConfig) -> Self:
        """Create from configuration."""
        return cls(
            api_key=config.api_key,
            timeout=config.timeout,
        )

Tool Schema

The schema is automatically generated from your Args model:
tool = MyTool(api_key="...")
schema = tool.schema

# Schema includes:
# - name: Tool name
# - description: Tool description
# - parameters: JSON schema from MyToolArgs

Using with Agents

from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

tool = MyTool(api_key="...")

agent = AssistantAgent(
    name="assistant",
    model_client=OpenAIChatCompletionClient(model="gpt-4o"),
    tools=[tool],
)

Best Practices

Type Safety

Use Pydantic models for configuration:
from pydantic import BaseModel, Field, SecretStr

class MyConfig(BaseModel):
    api_key: SecretStr  # Secure string handling
    timeout: float = Field(30.0, gt=0, le=300)  # Validation
    max_retries: int = Field(3, ge=0)

Error Handling

Provide clear error messages:
async def create(self, messages, cancellation_token, **kwargs):
    try:
        return await self._call_api(messages)
    except ConnectionError as e:
        raise RuntimeError(
            f"Failed to connect to API: {e}. "
            "Check your network connection and API endpoint."
        ) from e
    except TimeoutError as e:
        raise RuntimeError(
            f"API request timed out after {self._timeout}s. "
            "Consider increasing the timeout value."
        ) from e

Logging

Use AutoGen’s logging infrastructure:
import logging
from autogen_core import EVENT_LOGGER_NAME, TRACE_LOGGER_NAME

logger = logging.getLogger(EVENT_LOGGER_NAME)
trace_logger = logging.getLogger(TRACE_LOGGER_NAME)

class MyClient:
    async def create(self, messages, cancellation_token):
        logger.info(f"Creating completion with {len(messages)} messages")
        trace_logger.debug(f"Messages: {messages}")
        
        result = await self._call_api(messages)
        
        logger.info(f"Completion created: {result.usage}")
        return result

Resource Management

Implement proper lifecycle methods:
class MyExecutor:
    def __init__(self):
        self._process = None
        self._temp_dir = None
    
    async def start(self) -> None:
        """Initialize resources."""
        self._temp_dir = tempfile.mkdtemp()
        self._process = await self._start_process()
    
    async def stop(self) -> None:
        """Clean up resources."""
        if self._process:
            self._process.terminate()
            await self._process.wait()
        
        if self._temp_dir:
            shutil.rmtree(self._temp_dir)
    
    async def __aenter__(self):
        await self.start()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.stop()

Testing

Write comprehensive tests:
import pytest
from autogen_core import CancellationToken

@pytest.mark.asyncio
async def test_my_client():
    client = MyModelClient(model="test", api_key="test")
    
    messages = [UserMessage(content="Hello", source="user")]
    result = await client.create(messages, CancellationToken())
    
    assert result.content
    assert result.usage.prompt_tokens > 0
    assert result.finish_reason == "stop"

@pytest.mark.asyncio
async def test_my_executor():
    executor = MyCodeExecutor()
    await executor.start()
    
    try:
        result = await executor.execute_code_blocks(
            [CodeBlock(language="python", code="print('test')")],
            CancellationToken(),
        )
        
        assert result.exit_code == 0
        assert "test" in result.output
    finally:
        await executor.stop()

Publishing Extensions

Package Structure

my-autogen-extension/
├── pyproject.toml
├── README.md
├── src/
│   └── autogen_my_ext/
│       ├── __init__.py
│       ├── models/
│       │   └── my_client.py
│       └── tools/
│           └── my_tool.py
└── tests/
    ├── test_client.py
    └── test_tool.py

pyproject.toml

[project]
name = "autogen-my-ext"
version = "0.1.0"
description = "My AutoGen extension"
requires-python = ">=3.10"
dependencies = [
    "autogen-core>=0.4.0",
    "autogen-ext>=0.4.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=7.0",
    "pytest-asyncio>=0.21",
]

Next Steps

Model Clients

See existing model client implementations

Code Executors

See existing code executor implementations

Tools

See existing tool implementations

Extensions Overview

Back to extensions overview