Documentation Index Fetch the complete documentation index at: https://mintlify.com/microsoft/autogen/llms.txt
Use this file to discover all available pages before exploring further.
GraphFlow orchestration uses directed graphs to define complex agent workflows with conditional logic, parallel execution, and sophisticated control flow.
What You’ll Learn
How to design graph-based workflows
Conditional agent transitions
Parallel agent execution
State management across agents
Building complex orchestration patterns
Prerequisites
Install AutoGen
pip install -U "autogen-agentchat" "autogen-ext[openai]"
Set your OpenAI API key
export OPENAI_API_KEY = "sk-..."
What is GraphFlow?
GraphFlow is an orchestration pattern where:
Agents are nodes in a directed graph
Edges define possible transitions between agents
Conditions determine which path to take
Multiple paths can execute in parallel
State flows through the graph
Basic Graph Workflow
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import BaseGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
class WorkflowGraph ( BaseGroupChat ):
"""Custom graph-based workflow."""
def __init__ ( self , participants , model_client , workflow_graph ):
super (). __init__ (participants)
self ._model_client = model_client
self ._graph = workflow_graph
self ._current_agent = None
async def select_speaker ( self , thread ):
"""Select next agent based on graph transitions."""
if self ._current_agent is None :
# Start with first agent
self ._current_agent = self ._graph[ "start" ]
else :
# Use LLM to decide next transition
last_message = thread[ - 1 ] if thread else None
transitions = self ._graph.get( self ._current_agent, [])
if not transitions:
return None # End of workflow
# Select next agent based on context
self ._current_agent = await self ._select_transition(
last_message, transitions
)
return self ._current_agent
async def _select_transition ( self , message , transitions ):
"""Use LLM to select best transition."""
# Implementation details...
return transitions[ 0 ] # Simplified
async def main () -> None :
model_client = OpenAIChatCompletionClient( model = "gpt-4o" )
# Define agents
analyzer = AssistantAgent(
"analyzer" ,
model_client = model_client,
system_message = "Analyze the problem and identify key requirements." ,
)
designer = AssistantAgent(
"designer" ,
model_client = model_client,
system_message = "Design a solution based on requirements." ,
)
implementer = AssistantAgent(
"implementer" ,
model_client = model_client,
system_message = "Implement the designed solution." ,
)
tester = AssistantAgent(
"tester" ,
model_client = model_client,
system_message = "Test the implementation and report issues." ,
)
# Define workflow graph
workflow = {
"start" : "analyzer" ,
"analyzer" : [ "designer" ],
"designer" : [ "implementer" ],
"implementer" : [ "tester" ],
"tester" : [ "implementer" , None ], # Can loop back or end
}
# Create workflow
team = WorkflowGraph(
participants = [analyzer, designer, implementer, tester],
model_client = model_client,
workflow_graph = workflow,
)
await Console(
team.run_stream(
task = "Build a function to validate email addresses"
)
)
await model_client.close()
asyncio.run(main())
Conditional Workflow
This example shows conditional branching based on agent output:
import asyncio
from typing import Dict, List, Optional
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import BaseGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
class ConditionalWorkflow ( BaseGroupChat ):
"""Workflow with conditional branching."""
def __init__ ( self , participants , transitions , model_client ):
super (). __init__ (participants)
self ._transitions = transitions
self ._model_client = model_client
self ._current = "start"
async def select_speaker ( self , thread ):
if self ._current not in self ._transitions:
return None
next_options = self ._transitions[ self ._current]
if callable (next_options):
# Dynamic selection based on last message
last_msg = thread[ - 1 ].content if thread else ""
self ._current = next_options(last_msg)
else :
self ._current = next_options
return self ._current
async def main () -> None :
model_client = OpenAIChatCompletionClient( model = "gpt-4o" )
# Create agents
classifier = AssistantAgent(
"classifier" ,
model_client = model_client,
system_message = """Classify the request type: 'bug', 'feature', or 'question'.
Respond with just the classification.""" ,
)
bug_handler = AssistantAgent(
"bug_handler" ,
model_client = model_client,
system_message = "You handle bug reports. Analyze and provide fix suggestions." ,
)
feature_handler = AssistantAgent(
"feature_handler" ,
model_client = model_client,
system_message = "You handle feature requests. Analyze feasibility and design." ,
)
qa_handler = AssistantAgent(
"qa_handler" ,
model_client = model_client,
system_message = "You answer questions. Provide clear, helpful information." ,
)
# Define conditional transitions
def route_by_classification ( message : str ) -> Optional[ str ]:
message_lower = message.lower()
if "bug" in message_lower:
return "bug_handler"
elif "feature" in message_lower:
return "feature_handler"
elif "question" in message_lower:
return "qa_handler"
return None
transitions = {
"start" : "classifier" ,
"classifier" : route_by_classification,
"bug_handler" : None ,
"feature_handler" : None ,
"qa_handler" : None ,
}
team = ConditionalWorkflow(
participants = [classifier, bug_handler, feature_handler, qa_handler],
transitions = transitions,
model_client = model_client,
)
await Console(
team.run_stream(
task = "I found a bug where the login page crashes on mobile devices"
)
)
await model_client.close()
asyncio.run(main())
Parallel Execution
Execute multiple agents in parallel:
import asyncio
from typing import List
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def parallel_agent_execution (
agents : List[AssistantAgent],
task : str
) -> List[TextMessage]:
"""Execute multiple agents in parallel on the same task."""
tasks = [agent.run( task = task) for agent in agents]
results = await asyncio.gather( * tasks)
return [result.messages[ - 1 ] for result in results]
async def main () -> None :
model_client = OpenAIChatCompletionClient( model = "gpt-4o" )
# Create specialized analyzers
security_analyzer = AssistantAgent(
"security_analyst" ,
model_client = model_client,
system_message = "Analyze security implications." ,
)
performance_analyzer = AssistantAgent(
"performance_analyst" ,
model_client = model_client,
system_message = "Analyze performance implications." ,
)
cost_analyzer = AssistantAgent(
"cost_analyst" ,
model_client = model_client,
system_message = "Analyze cost implications." ,
)
# Run parallel analysis
task = "We're considering migrating to a microservices architecture."
print ( "Running parallel analysis..." )
results = await parallel_agent_execution(
[security_analyzer, performance_analyzer, cost_analyzer],
task
)
print ( " \n Results:" )
for result in results:
print ( f " \n { result.source } : { result.content } " )
# Synthesizer combines results
synthesizer = AssistantAgent(
"synthesizer" ,
model_client = model_client,
system_message = "Synthesize multiple analyses into recommendations." ,
)
synthesis_task = f """Synthesize these analyses:
Security: { results[ 0 ].content }
Performance: { results[ 1 ].content }
Cost: { results[ 2 ].content }
Provide unified recommendations."""
final = await synthesizer.run( task = synthesis_task)
print ( f " \n Final Recommendation: \n { final.messages[ - 1 ].content } " )
await model_client.close()
asyncio.run(main())
Key Concepts
Graph Structure Defines the workflow as nodes (agents) and edges (transitions).
Conditional Routing Choose next agent based on current state or output.
Parallel Execution Run multiple agents simultaneously for efficiency.
State Management Maintain context as it flows through the graph.
Workflow Patterns
Sequential Pipeline
Input → Agent A → Agent B → Agent C → Output
Conditional Branching
Input → Classifier → Bug Handler
→ Feature Handler
→ QA Handler
Loop with Exit
Develop → Test → [Pass: Exit, Fail: Develop]
Parallel-Merge
→ Agent A →
Input →→ Agent B →→ Synthesizer → Output
→ Agent C →
Best Practices
Clear Graph Design : Document the workflow before implementation
Error Handling : Define what happens when agents fail
Termination Conditions : Prevent infinite loops
State Validation : Ensure state is valid at each transition
Monitoring : Log transitions for debugging
Visualization
Visualize your workflow:
import graphviz
def visualize_workflow ( graph : dict , filename : str = "workflow" ):
"""Generate workflow diagram."""
dot = graphviz.Digraph( comment = 'Agent Workflow' )
for node, edges in graph.items():
dot.node(node, node)
if isinstance (edges, list ):
for edge in edges:
if edge:
dot.edge(node, edge)
elif edges:
dot.edge(node, edges)
dot.render(filename, format = 'png' , view = True )
# Usage
workflow = {
"analyzer" : [ "designer" ],
"designer" : [ "implementer" ],
"implementer" : [ "tester" ],
"tester" : [ "implementer" , None ],
}
visualize_workflow(workflow)
Troubleshooting
Infinite Loops
Add max iteration limit:
class WorkflowGraph ( BaseGroupChat ):
def __init__ ( self , * args , max_iterations = 50 , ** kwargs ):
super (). __init__ ( * args, ** kwargs)
self ._max_iterations = max_iterations
self ._iterations = 0
async def select_speaker ( self , thread ):
self ._iterations += 1
if self ._iterations >= self ._max_iterations:
return None
# ... rest of selection logic
State Loss
Preserve state between transitions:
class StatefulWorkflow ( BaseGroupChat ):
def __init__ ( self , * args , ** kwargs ):
super (). __init__ ( * args, ** kwargs)
self ._state = {}
def update_state ( self , key , value ):
self ._state[key] = value
def get_state ( self , key ):
return self ._state.get(key)
Next Steps
Research Assistant Build a research assistant with complex workflows
Data Analysis Create data analysis pipelines