Created
December 28, 2025 23:33
-
-
Save kausmeows/1801f939f65c338dcc7bccc649dac0bb to your computer and use it in GitHub Desktop.
To show how router can return a step (with stop=True in the custom function step that will end the workflow)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import List | |
| from agno.agent.agent import Agent | |
| from agno.models.openai import OpenAIChat | |
| from agno.workflow.router import Router | |
| from agno.workflow.step import Step | |
| from agno.workflow.types import StepInput, StepOutput | |
| from agno.workflow.workflow import Workflow | |
| # Define your agents/steps | |
| research_agent = Agent( | |
| name="Researcher", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| instructions="You are a research specialist.", | |
| ) | |
| writer_agent = Agent( | |
| name="Writer", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| instructions="You are a content writer.", | |
| ) | |
| analyst_agent = Agent( | |
| name="Analyst", | |
| model=OpenAIChat(id="gpt-4o-mini"), | |
| instructions="You are a data analyst.", | |
| ) | |
| # Define the steps that can be dynamically selected | |
| research_step = Step(name="research", agent=research_agent) | |
| writer_step = Step(name="write", agent=writer_agent) | |
| analyst_step = Step(name="analyze", agent=analyst_agent) | |
| def end_no_action(step_input: StepInput) -> StepOutput: | |
| """Returns no action message""" | |
| return StepOutput(content="No action taken", stop=True) | |
| no_action_step = Step(name="no_action", executor=end_no_action) | |
| def entrypoint_router(step_input: StepInput) -> List[Step]: | |
| """ | |
| Entrypoint that can either: | |
| 1. Return a Step to execute (dynamic routing) | |
| 2. Return an empty list [] to end without running anything | |
| 3. Return multiple steps to run sequentially | |
| """ | |
| user_input = (step_input.input or "").lower() | |
| # Option 1: Return a Step to execute | |
| if "research" in user_input: | |
| print("Routing to research step...") | |
| return [research_step] | |
| if "write" in user_input: | |
| print("Routing to writer step...") | |
| return [writer_step] | |
| if "analyze" in user_input: | |
| print("Routing to analyst step...") | |
| return [analyst_step] | |
| # Option 2: Return multiple steps to run in sequence | |
| if "full pipeline" in user_input: | |
| print("Running full pipeline: research -> analyze -> write") | |
| return [research_step, analyst_step, writer_step] | |
| # Option 3: Return empty list to end without running any step | |
| # (or return a no-op step with a message) | |
| print("No matching route - returning no_action step") | |
| return [no_action_step] | |
| # Workflow with Router as the entrypoint | |
| workflow = Workflow( | |
| name="Dynamic Entrypoint Workflow", | |
| description="Entrypoint router that can return steps or end the workflow", | |
| steps=[ | |
| Router( | |
| name="entrypoint", | |
| selector=entrypoint_router, | |
| choices=[research_step, writer_step, analyst_step, no_action_step], | |
| description="Routes to appropriate step based on input", | |
| ), | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| # workflow.print_response("I need to research AI trends", stream=True) | |
| # workflow.print_response("Run full pipeline on climate change", stream=True) | |
| # This should end the workflow | |
| workflow.print_response("What is happening in France?") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment