Created
February 12, 2026 20:12
-
-
Save cnolanminich/64eec32f4c48b224012f9990220427ca to your computer and use it in GitHub Desktop.
Ways of depending on assets with graph-backed assets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # --------------------------------------------------------------------------- | |
| # 1. Upstream asset — plain @asset | |
| # --------------------------------------------------------------------------- | |
| @dg.asset( | |
| group_name="pipeline", | |
| kinds={"python"}, | |
| tags={"domain": "orders"}, | |
| ) | |
| def raw_orders(context: dg.OpExecutionContext) -> list[dict]: | |
| """Simulated raw order data from source system.""" | |
| orders = [ | |
| {"order_id": i, "customer": f"CUST-{i % 50}", "amount": round(random.uniform(-5, 500), 2)} | |
| for i in range(100) | |
| ] | |
| context.log.info(f"Loaded {len(orders)} raw orders") | |
| return orders | |
| # --------------------------------------------------------------------------- | |
| # 2. @graph_asset with function argument (standard pattern) | |
| # Data flows from raw_orders → validate → enrich | |
| # --------------------------------------------------------------------------- | |
| @dg.graph_asset( | |
| group_name="pipeline", | |
| kinds={"python"}, | |
| ) | |
| def processed_orders(raw_orders: list[dict]) -> list[dict]: | |
| """Validate and enrich orders. | |
| The upstream `raw_orders` asset is received as a function argument — | |
| this is the standard pattern for @graph_asset when you need the actual data. | |
| """ | |
| validated = validate_records(raw_orders) | |
| return enrich_records(validated) | |
| # --------------------------------------------------------------------------- | |
| # 3. @graph_asset with Nothing-typed input (ordering dep, no data flow) | |
| # Depends on processed_orders but reads from warehouse instead of piped data | |
| # --------------------------------------------------------------------------- | |
| @dg.graph_asset( | |
| group_name="pipeline", | |
| kinds={"python"}, | |
| ins={"processed_orders": dg.AssetIn(dagster_type=Nothing)}, | |
| ) | |
| def order_report(processed_orders) -> dict: | |
| """Generate a report after processed_orders completes. | |
| Uses `AssetIn(dagster_type=Nothing)` so the parameter exists for wiring | |
| but NO data is actually passed. The op inside reads from its own source. | |
| This is the closest @graph_asset gets to @asset(deps=[...]). | |
| """ | |
| return generate_report(processed_orders) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment