This document explains how to limit concurrent executions of individual assets using Dagster's pool-based concurrency system.
By default, Dagster allows unlimited concurrent executions of the same asset across multiple runs. This can cause issues when:
- An asset writes to a shared resource (database table, file, API)
- An asset has rate limits or resource constraints
- You want to prevent duplicate work
Solution: Assign each asset to its own unique pool with a limit of 1.
Set a default limit for all pools:
concurrency:
pools:
default_limit: 1Important: DAGSTER_HOME must be set for Dagster to load this config:
export DAGSTER_HOME=/path/to/your/project@asset(
key=["my_group", "my_asset"],
pool="my_group_my_asset", # Unique pool per asset
)
def my_asset():
passTransform all assets to automatically get pools based on their keys:
from dagster import AssetsDefinition, Definitions, OpDefinition
def _add_pool_to_asset(asset_def: AssetsDefinition) -> AssetsDefinition:
"""Transform an asset to add a pool based on its key."""
original_op = asset_def.op
key = list(asset_def.specs)[0].key
pool_name = "_".join(key.path)
new_op = OpDefinition(
compute_fn=original_op._compute_fn,
name=original_op.name,
ins=dict(original_op.ins),
outs=dict(original_op.outs),
description=original_op.description,
config_schema=original_op._config_schema,
required_resource_keys=original_op.required_resource_keys,
tags=original_op.tags,
retry_policy=original_op.retry_policy,
pool=pool_name,
)
return AssetsDefinition(
keys_by_input_name=asset_def.keys_by_input_name,
keys_by_output_name=asset_def.keys_by_output_name,
node_def=new_op,
partitions_def=asset_def.partitions_def,
partition_mappings=asset_def._partition_mappings,
asset_deps=asset_def.asset_deps,
can_subset=asset_def.can_subset,
group_names_by_key=asset_def.group_names_by_key,
metadata_by_key=dict(asset_def.metadata_by_key),
tags_by_key=dict(asset_def.tags_by_key),
)
def apply_per_asset_concurrency(defs: Definitions) -> Definitions:
"""Apply per-asset concurrency pools to all assets."""
new_assets = [_add_pool_to_asset(a) for a in defs.assets]
return Definitions(
assets=new_assets,
jobs=defs.jobs,
schedules=defs.schedules,
sensors=defs.sensors,
resources=defs.resources,
)Usage:
defs = Definitions(assets=[asset_a, asset_b, asset_c])
defs = apply_per_asset_concurrency(defs) # All assets now have unique poolsAsset: my_group/my_asset
Pool: my_group_my_asset (limit: 1)
Run 1: [=== EXECUTING ===]
Run 2: [=== QUEUED ===] → [=== EXECUTING ===]
Run 3: [=== QUEUED ===] [=== QUEUED ===] → [=== EXECUTING ===]
- Only 1 execution at a time
- Additional runs queue until the current one completes
Asset: unlimited_asset
Pool: None
Run 1: [=== EXECUTING ===]
Run 2: [=== EXECUTING ===]
Run 3: [=== EXECUTING ===]
- All executions run simultaneously
- No queuing or limits
To allow an asset to run with unlimited concurrency, simply don't assign it a pool:
# Assets that should have concurrency limits
assets_with_limits = [asset_a, asset_b, asset_c]
# Assets that can run unlimited
assets_unlimited = [unlimited_asset]
# Apply pools only to limited assets
defs = Definitions(assets=assets_with_limits)
defs_with_pools = apply_per_asset_concurrency(defs)
# Combine with unlimited assets
final_defs = Definitions(
assets=list(defs_with_pools.assets) + assets_unlimited,
jobs=defs_with_pools.jobs,
...
)Check which assets have pools:
from your_project.definitions import defs
definitions = defs()
for asset in definitions.assets:
pool = getattr(asset.op, 'pool', None)
key = list(asset.specs)[0].key
print(f'{key}: pool={pool}')| Use Case | Pool Strategy |
|---|---|
| Database writes | One pool per table |
| API calls with rate limits | One pool per API |
| File writes | One pool per file path |
| Per-asset isolation | One pool per asset (this doc) |
| Shared resource | Multiple assets share one pool |
-
Check DAGSTER_HOME is set:
echo $DAGSTER_HOME
-
Verify dagster.yaml is loaded:
cat $DAGSTER_HOME/dagster.yaml -
Confirm pools are assigned:
print(asset.op.pool) # Should not be None
Use the CLI:
dagster instance concurrency set my_pool_name 5Or in Dagster+ UI: Deployment → Concurrency → Add pool limit