Created
December 28, 2025 23:24
-
-
Save kausmeows/52f963ac3e280c7c84cd33dd6ce2e66a to your computer and use it in GitHub Desktop.
Showing how a router step returns one/multiple steps.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import List | |
| from agno.agent.agent import Agent | |
| from agno.models.openai import OpenAIChat | |
| from agno.workflow.router import Router | |
| from agno.workflow.step import Step | |
| from agno.workflow.types import StepInput | |
| from agno.workflow.workflow import Workflow | |
| # Define your agents/steps | |
| research_agent = Agent( | |
| name="Researcher", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| instructions="You are a research specialist.", | |
| ) | |
| writer_agent = Agent( | |
| name="Writer", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| instructions="You are a content writer.", | |
| ) | |
| analyst_agent = Agent( | |
| name="Analyst", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| instructions="You are a data analyst.", | |
| ) | |
| # Define the steps | |
| research_step = Step(name="research", agent=research_agent) | |
| writer_step = Step(name="write", agent=writer_agent) | |
| analyst_step = Step(name="analyze", agent=analyst_agent) | |
| # Entrypoint function that decides what to run | |
| def entrypoint_router(step_input: StepInput) -> List[Step]: | |
| """ | |
| Entrypoint that decides which step(s) to run. | |
| Can return: | |
| - A single Step | |
| - A list of Steps to run sequentially | |
| - An empty list [] to end without running anything | |
| """ | |
| user_input = (step_input.input or "").lower() | |
| if "research" in user_input: | |
| print("Routing to research step...") | |
| return [research_step] | |
| elif "write" in user_input or "article" in user_input: | |
| print("Routing to writer step...") | |
| return [writer_step] | |
| elif "analyze" in user_input or "data" in user_input: | |
| print("Routing to analyst step...") | |
| return [analyst_step] | |
| elif "full pipeline" in user_input: | |
| # Return multiple steps to run sequentially | |
| print("Running full pipeline...") | |
| return [research_step, analyst_step, writer_step] | |
| else: | |
| # Return empty list to end without running any step | |
| print("No matching route - ending workflow") | |
| return [] | |
| # Workflow with Router as entrypoint | |
| workflow = Workflow( | |
| name="Dynamic Routing Workflow", | |
| steps=[ | |
| Router( | |
| name="entrypoint", | |
| selector=entrypoint_router, | |
| choices=[research_step, writer_step, analyst_step], | |
| description="Routes to appropriate step based on input", | |
| ), | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| # Test different routes | |
| # workflow.print_response("I need to research AI trends", stream=True) | |
| # workflow.print_response("Write an article about Python", stream=True) | |
| # workflow.print_response("Analyze the sales data", stream=True) | |
| workflow.print_response("Run full pipeline on climate change", stream=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment