Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created January 28, 2026 17:05
Show Gist options
  • Select an option

  • Save cnolanminich/efc91e958b44f79858f0f73b30a3eb57 to your computer and use it in GitHub Desktop.

Select an option

Save cnolanminich/efc91e958b44f79858f0f73b30a3eb57 to your computer and use it in GitHub Desktop.
Dynamic Dagster asset pool limit of 1

Per-Asset Concurrency with Dagster Pools

This document explains how to limit concurrent executions of individual assets using Dagster's pool-based concurrency system.

Overview

By default, Dagster allows unlimited concurrent executions of the same asset across multiple runs. This can cause issues when:

  • An asset writes to a shared resource (database table, file, API)
  • An asset has rate limits or resource constraints
  • You want to prevent duplicate work

Solution: Assign each asset to its own unique pool with a limit of 1.

Configuration

1. dagster.yaml

Set a default limit for all pools:

concurrency:
  pools:
    default_limit: 1

Important: DAGSTER_HOME must be set for Dagster to load this config:

export DAGSTER_HOME=/path/to/your/project

2. Assigning Pools to Assets

Option A: Direct assignment

@asset(
    key=["my_group", "my_asset"],
    pool="my_group_my_asset",  # Unique pool per asset
)
def my_asset():
    pass

Option B: Programmatic assignment (recommended)

Transform all assets to automatically get pools based on their keys:

from dagster import AssetsDefinition, Definitions, OpDefinition

def _add_pool_to_asset(asset_def: AssetsDefinition) -> AssetsDefinition:
    """Transform an asset to add a pool based on its key."""
    original_op = asset_def.op
    key = list(asset_def.specs)[0].key
    pool_name = "_".join(key.path)

    new_op = OpDefinition(
        compute_fn=original_op._compute_fn,
        name=original_op.name,
        ins=dict(original_op.ins),
        outs=dict(original_op.outs),
        description=original_op.description,
        config_schema=original_op._config_schema,
        required_resource_keys=original_op.required_resource_keys,
        tags=original_op.tags,
        retry_policy=original_op.retry_policy,
        pool=pool_name,
    )

    return AssetsDefinition(
        keys_by_input_name=asset_def.keys_by_input_name,
        keys_by_output_name=asset_def.keys_by_output_name,
        node_def=new_op,
        partitions_def=asset_def.partitions_def,
        partition_mappings=asset_def._partition_mappings,
        asset_deps=asset_def.asset_deps,
        can_subset=asset_def.can_subset,
        group_names_by_key=asset_def.group_names_by_key,
        metadata_by_key=dict(asset_def.metadata_by_key),
        tags_by_key=dict(asset_def.tags_by_key),
    )


def apply_per_asset_concurrency(defs: Definitions) -> Definitions:
    """Apply per-asset concurrency pools to all assets."""
    new_assets = [_add_pool_to_asset(a) for a in defs.assets]
    return Definitions(
        assets=new_assets,
        jobs=defs.jobs,
        schedules=defs.schedules,
        sensors=defs.sensors,
        resources=defs.resources,
    )

Usage:

defs = Definitions(assets=[asset_a, asset_b, asset_c])
defs = apply_per_asset_concurrency(defs)  # All assets now have unique pools

Behavior

With Pool (limit 1)

Asset: my_group/my_asset
Pool: my_group_my_asset (limit: 1)

Run 1: [=== EXECUTING ===]
Run 2:                     [=== QUEUED ===] → [=== EXECUTING ===]
Run 3:                     [=== QUEUED ===]   [=== QUEUED ===] → [=== EXECUTING ===]
  • Only 1 execution at a time
  • Additional runs queue until the current one completes

Without Pool

Asset: unlimited_asset
Pool: None

Run 1: [=== EXECUTING ===]
Run 2: [=== EXECUTING ===]
Run 3: [=== EXECUTING ===]
  • All executions run simultaneously
  • No queuing or limits

Excluding Assets from Concurrency Limits

To allow an asset to run with unlimited concurrency, simply don't assign it a pool:

# Assets that should have concurrency limits
assets_with_limits = [asset_a, asset_b, asset_c]

# Assets that can run unlimited
assets_unlimited = [unlimited_asset]

# Apply pools only to limited assets
defs = Definitions(assets=assets_with_limits)
defs_with_pools = apply_per_asset_concurrency(defs)

# Combine with unlimited assets
final_defs = Definitions(
    assets=list(defs_with_pools.assets) + assets_unlimited,
    jobs=defs_with_pools.jobs,
    ...
)

Verifying Pool Assignment

Check which assets have pools:

from your_project.definitions import defs

definitions = defs()
for asset in definitions.assets:
    pool = getattr(asset.op, 'pool', None)
    key = list(asset.specs)[0].key
    print(f'{key}: pool={pool}')

Common Patterns

Use Case Pool Strategy
Database writes One pool per table
API calls with rate limits One pool per API
File writes One pool per file path
Per-asset isolation One pool per asset (this doc)
Shared resource Multiple assets share one pool

Troubleshooting

Pools not working?

  1. Check DAGSTER_HOME is set:

    echo $DAGSTER_HOME
  2. Verify dagster.yaml is loaded:

    cat $DAGSTER_HOME/dagster.yaml
  3. Confirm pools are assigned:

    print(asset.op.pool)  # Should not be None

Want to set a specific pool limit?

Use the CLI:

dagster instance concurrency set my_pool_name 5

Or in Dagster+ UI: Deployment → Concurrency → Add pool limit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment