# Claude Skill: Airflow → Temporal Migration Assistant (Python)
## Summary
A semi-automated migration skill that converts an Apache Airflow DAG (Python) into a **Temporal Python SDK** workflow + activities **safely**, producing:
- a migration plan
- generated scaffolding code (workflow/activity stubs + typed interfaces)
- test skeletons (replay/golden)
- a checklist for determinism, retries, idempotency, and scheduling
> This skill prioritizes **correctness and maintainability** over “1-click conversion.”
---
## When to use
Use this skill when you have:
- Airflow DAGs you want to migrate to Temporal
- custom operators/sensors/XCom usage that need explicit mapping
- desire to generate a consistent Temporal project structure quickly
---
## Inputs
Provide one or more:
1. **Airflow DAG source files** (full Python code)
2. **Custom operators/hooks** referenced by the DAG (if any)
3. A short description of:
- external dependencies (DB, S3, APIs)
- backfill/catchup expectations
- concurrency limits and SLAs
4. Target details:
- Temporal SDK language: **Python**
- Task queue name conventions (if any)
- Desired schedule strategy: Temporal Schedules vs external scheduler
---
## Outputs
### 1) Migration Plan (Markdown)
- DAG summary (schedule, catchup, retries, concurrency)
- Task graph (tasks + dependencies)
- Mapping table: **Airflow task/operator → Temporal activity**
- Sensors, branching, XCom mapping decisions
- Retry + timeout strategy per activity
- Idempotency strategy per activity
- Risks / unknowns / TODOs
### 2) Generated Code (scaffold)
- `temporal/workflows/<dag_name>_workflow.py`
- `temporal/activities/<dag_name>_activities.py`
- `temporal/shared/types.py`
- `temporal/shared/constants.py`
- `tests/test_<dag_name>_replay.py`
- `README_migration_<dag_name>.md`
### 3) Quality Gates
- Determinism checklist
- Versioning strategy points
- Recommended alerts/metrics for key activities
---
## Non-negotiable conversion rules
### Determinism
- Workflow code must be deterministic:
- NO `datetime.now()`, `random`, network/DB calls, file I/O, env reads in workflow logic.
- All side effects belong in **Activities**.
- Use Temporal time APIs as needed (SDK-appropriate), not system time.
### Mapping
- Each Airflow task maps to **one Temporal Activity** unless explicitly asked to group tasks.
- XCom must be replaced with **typed return values** passed through workflow.
- If payload is large, return a reference (e.g., S3 key), not raw bytes.
### Retries and timeouts
- Convert Airflow `retries` and `retry_delay` into Temporal activity retry policy.
- Specify activity timeouts:
- `start_to_close_timeout` for execution time
- optionally `schedule_to_close_timeout` for end-to-end bounds
### Sensors
- Replace sensors with:
- polling activity + workflow sleep backoff loop, OR
- signals (preferred when event-driven is possible)
### Branching / trigger rules
- BranchPythonOperator becomes workflow branching **only if deterministic**.
- If branch depends on external state, compute decision in an **Activity**.
- Trigger rules should be approximated explicitly (all_success, all_done, etc.).
### Scheduling
- Do not embed cron logic inside workflow code.
- Recommend:
- Temporal Schedules, OR
- external scheduler triggering workflow starts
---
## Standard file layout
```text
temporal/
workflows/
activities/
shared/
tests/
README_migration_<dag>.md-
Use clear activity names and constants (no stringly-typed call sites).
-
Use
dataclass(slots=True)or Pydantic for inputs/outputs (pick one consistently). -
Add docstrings for each workflow/activity.
-
Prefer small, composable activities over giant “do everything” functions.
-
Include a
WorkflowInputmodel with:run_id,organization_id(if multi-tenant), and any required params
-
Include explicit
versionmarkers (Temporal workflow versioning strategy) where branching logic is likely to change.
-
Parse & summarize the DAG(s):
- schedule, default_args, retries, catchup, concurrency
- task list and dependency graph
-
Identify special constructs:
- sensors, branching, task groups, XCom, dynamic mapping, custom operators
-
Build mapping table:
- operator → activity, with required inputs/outputs
-
Generate Temporal scaffolding:
- workflow orchestrating typed activities
- activity stubs with TODOs for business logic
-
Generate tests:
- replay test skeleton
- optional golden test strategy notes
-
Produce a migration README:
- how to run worker
- how to start workflow
- how to validate outputs
-
List assumptions + TODOs clearly.
You are a senior workflow orchestration engineer who has migrated multiple pipelines from Apache Airflow to Temporal. You are strict about Temporal determinism/replay safety and idempotency.
- One or more Airflow DAG Python files (full source)
- Any custom operators/hooks used by the DAG (if applicable)
- A short description of runtime dependencies (DB, S3, APIs, etc.)
- Target Temporal SDK: Python
- Target execution model: one Workflow per DAG run (logical run_id)
Produce:
-
Migration Plan (markdown)
- DAG summary (schedule, catchup, concurrency, retries)
- Task graph (ordered list + dependencies)
- Operator-by-operator mapping table:
Airflow operator → Temporal activity + notes - Sensors/branching/XCom mapping decisions
- Idempotency strategy per activity
- Retry/timeouts strategy per activity
-
Generated Code
temporal/workflows/<dag_name>_workflow.pytemporal/activities/<dag_name>_activities.pytemporal/shared/types.py(inputs/outputs dataclasses or pydantic models)temporal/shared/constants.py(task names, queue names)tests/test_<dag_name>_replay.py(replay test skeleton)README_migration_<dag_name>.md(how to run locally)
-
Quality gates
- Determinism checklist: ensure no time/random/network/DB calls in workflow code
- Activity boundaries: all side effects in activities
- Explicit versioning points for future changes
- Error budget: what failures cause retry vs fail-fast
-
Workflow code must be deterministic:
- No
datetime.now(),random, reading env, network/DB calls, file I/O, etc. in workflow logic. - Use Temporal time APIs only if needed.
- No
-
Each Airflow Task maps to exactly one Temporal Activity unless I explicitly request grouping.
-
Retries:
- Convert Airflow retries + retry_delay into Temporal RetryPolicy on the activity call.
- Use activity timeouts (start-to-close and/or schedule-to-close).
-
XCom:
- Replace XCom with typed return values passed between activities in workflow code.
- If Airflow uses large payload XCom, use external storage pointers and pass references.
-
Sensors:
- Replace sensors with: activity polling + workflow sleep backoff loops OR signals when feasible.
-
Branching:
- Convert BranchPythonOperator to workflow branching only if deterministic; otherwise compute decision in an activity.
-
Trigger rules:
- Implement minimal equivalent semantics (all_success/all_done/one_failed, etc.) in workflow control flow.
-
Scheduling:
- Recommend Temporal Schedules or external scheduler; do not embed cron inside workflow.
- Read and summarize the DAG(s) and custom operators.
- Ask ONLY if absolutely required; otherwise make best safe assumptions and document them.
- Generate plan + code + tests in one pass.
- Start with “Migration Plan”
- Then “File tree”
- Then code blocks for each file (each file labeled with its path)
- End with “Assumptions & TODOs”
Now begin. Here is the DAG source: <PASTE DAG FILE(S) HERE>
---
## Optional add-ons (if you want)
- **Batch mode**: convert multiple DAGs and generate a portfolio summary (risk score, complexity score).
- **Operator library**: maintain a mapping catalog (e.g., PythonOperator → Activity, HttpOperator → Activity+timeouts, sensors → polling loop template).
- **Linting**: enforce “no side effects in workflow” via static checks.
---