Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Created December 28, 2025 23:24
Show Gist options
  • Select an option

  • Save kausmeows/52f963ac3e280c7c84cd33dd6ce2e66a to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/52f963ac3e280c7c84cd33dd6ce2e66a to your computer and use it in GitHub Desktop.
Showing how a router step returns one/multiple steps.
from typing import List
from agno.agent.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput
from agno.workflow.workflow import Workflow
# Define your agents/steps
research_agent = Agent(
name="Researcher",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a research specialist.",
)
writer_agent = Agent(
name="Writer",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a content writer.",
)
analyst_agent = Agent(
name="Analyst",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a data analyst.",
)
# Define the steps
research_step = Step(name="research", agent=research_agent)
writer_step = Step(name="write", agent=writer_agent)
analyst_step = Step(name="analyze", agent=analyst_agent)
# Entrypoint function that decides what to run
def entrypoint_router(step_input: StepInput) -> List[Step]:
"""
Entrypoint that decides which step(s) to run.
Can return:
- A single Step
- A list of Steps to run sequentially
- An empty list [] to end without running anything
"""
user_input = (step_input.input or "").lower()
if "research" in user_input:
print("Routing to research step...")
return [research_step]
elif "write" in user_input or "article" in user_input:
print("Routing to writer step...")
return [writer_step]
elif "analyze" in user_input or "data" in user_input:
print("Routing to analyst step...")
return [analyst_step]
elif "full pipeline" in user_input:
# Return multiple steps to run sequentially
print("Running full pipeline...")
return [research_step, analyst_step, writer_step]
else:
# Return empty list to end without running any step
print("No matching route - ending workflow")
return []
# Workflow with Router as entrypoint
workflow = Workflow(
name="Dynamic Routing Workflow",
steps=[
Router(
name="entrypoint",
selector=entrypoint_router,
choices=[research_step, writer_step, analyst_step],
description="Routes to appropriate step based on input",
),
],
)
if __name__ == "__main__":
# Test different routes
# workflow.print_response("I need to research AI trends", stream=True)
# workflow.print_response("Write an article about Python", stream=True)
# workflow.print_response("Analyze the sales data", stream=True)
workflow.print_response("Run full pipeline on climate change", stream=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment