Skip to content

Instantly share code, notes, and snippets.

@kausmeows
Created December 28, 2025 23:33
Show Gist options
  • Select an option

  • Save kausmeows/1801f939f65c338dcc7bccc649dac0bb to your computer and use it in GitHub Desktop.

Select an option

Save kausmeows/1801f939f65c338dcc7bccc649dac0bb to your computer and use it in GitHub Desktop.
To show how router can return a step (with stop=True in the custom function step that will end the workflow)
from typing import List
from agno.agent.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow
# Define your agents/steps
research_agent = Agent(
name="Researcher",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a research specialist.",
)
writer_agent = Agent(
name="Writer",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a content writer.",
)
analyst_agent = Agent(
name="Analyst",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a data analyst.",
)
# Define the steps that can be dynamically selected
research_step = Step(name="research", agent=research_agent)
writer_step = Step(name="write", agent=writer_agent)
analyst_step = Step(name="analyze", agent=analyst_agent)
def end_no_action(step_input: StepInput) -> StepOutput:
"""Returns no action message"""
return StepOutput(content="No action taken", stop=True)
no_action_step = Step(name="no_action", executor=end_no_action)
def entrypoint_router(step_input: StepInput) -> List[Step]:
"""
Entrypoint that can either:
1. Return a Step to execute (dynamic routing)
2. Return an empty list [] to end without running anything
3. Return multiple steps to run sequentially
"""
user_input = (step_input.input or "").lower()
# Option 1: Return a Step to execute
if "research" in user_input:
print("Routing to research step...")
return [research_step]
if "write" in user_input:
print("Routing to writer step...")
return [writer_step]
if "analyze" in user_input:
print("Routing to analyst step...")
return [analyst_step]
# Option 2: Return multiple steps to run in sequence
if "full pipeline" in user_input:
print("Running full pipeline: research -> analyze -> write")
return [research_step, analyst_step, writer_step]
# Option 3: Return empty list to end without running any step
# (or return a no-op step with a message)
print("No matching route - returning no_action step")
return [no_action_step]
# Workflow with Router as the entrypoint
workflow = Workflow(
name="Dynamic Entrypoint Workflow",
description="Entrypoint router that can return steps or end the workflow",
steps=[
Router(
name="entrypoint",
selector=entrypoint_router,
choices=[research_step, writer_step, analyst_step, no_action_step],
description="Routes to appropriate step based on input",
),
],
)
if __name__ == "__main__":
# workflow.print_response("I need to research AI trends", stream=True)
# workflow.print_response("Run full pipeline on climate change", stream=True)
# This should end the workflow
workflow.print_response("What is happening in France?")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment