Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created February 12, 2026 20:12
Show Gist options
  • Select an option

  • Save cnolanminich/64eec32f4c48b224012f9990220427ca to your computer and use it in GitHub Desktop.

Select an option

Save cnolanminich/64eec32f4c48b224012f9990220427ca to your computer and use it in GitHub Desktop.
Ways of depending on assets with graph-backed assets
# ---------------------------------------------------------------------------
# 1. Upstream asset — plain @asset
# ---------------------------------------------------------------------------
@dg.asset(
group_name="pipeline",
kinds={"python"},
tags={"domain": "orders"},
)
def raw_orders(context: dg.OpExecutionContext) -> list[dict]:
"""Simulated raw order data from source system."""
orders = [
{"order_id": i, "customer": f"CUST-{i % 50}", "amount": round(random.uniform(-5, 500), 2)}
for i in range(100)
]
context.log.info(f"Loaded {len(orders)} raw orders")
return orders
# ---------------------------------------------------------------------------
# 2. @graph_asset with function argument (standard pattern)
# Data flows from raw_orders → validate → enrich
# ---------------------------------------------------------------------------
@dg.graph_asset(
group_name="pipeline",
kinds={"python"},
)
def processed_orders(raw_orders: list[dict]) -> list[dict]:
"""Validate and enrich orders.
The upstream `raw_orders` asset is received as a function argument —
this is the standard pattern for @graph_asset when you need the actual data.
"""
validated = validate_records(raw_orders)
return enrich_records(validated)
# ---------------------------------------------------------------------------
# 3. @graph_asset with Nothing-typed input (ordering dep, no data flow)
# Depends on processed_orders but reads from warehouse instead of piped data
# ---------------------------------------------------------------------------
@dg.graph_asset(
group_name="pipeline",
kinds={"python"},
ins={"processed_orders": dg.AssetIn(dagster_type=Nothing)},
)
def order_report(processed_orders) -> dict:
"""Generate a report after processed_orders completes.
Uses `AssetIn(dagster_type=Nothing)` so the parameter exists for wiring
but NO data is actually passed. The op inside reads from its own source.
This is the closest @graph_asset gets to @asset(deps=[...]).
"""
return generate_report(processed_orders)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment