Skip to content

Instantly share code, notes, and snippets.

@sithu
Created December 18, 2025 22:39
Show Gist options
  • Select an option

  • Save sithu/c5bd44736cfd7f5180ba63f67b5933bb to your computer and use it in GitHub Desktop.

Select an option

Save sithu/c5bd44736cfd7f5180ba63f67b5933bb to your computer and use it in GitHub Desktop.
Claude skills for Airflow to Temporal Conversion
# Claude Skill: Airflow → Temporal Migration Assistant (Python)

## Summary
A semi-automated migration skill that converts an Apache Airflow DAG (Python) into a **Temporal Python SDK** workflow + activities **safely**, producing:
- a migration plan
- generated scaffolding code (workflow/activity stubs + typed interfaces)
- test skeletons (replay/golden)
- a checklist for determinism, retries, idempotency, and scheduling

> This skill prioritizes **correctness and maintainability** over “1-click conversion.”

---

## When to use
Use this skill when you have:
- Airflow DAGs you want to migrate to Temporal
- custom operators/sensors/XCom usage that need explicit mapping
- desire to generate a consistent Temporal project structure quickly

---

## Inputs
Provide one or more:
1. **Airflow DAG source files** (full Python code)
2. **Custom operators/hooks** referenced by the DAG (if any)
3. A short description of:
   - external dependencies (DB, S3, APIs)
   - backfill/catchup expectations
   - concurrency limits and SLAs
4. Target details:
   - Temporal SDK language: **Python**
   - Task queue name conventions (if any)
   - Desired schedule strategy: Temporal Schedules vs external scheduler

---

## Outputs
### 1) Migration Plan (Markdown)
- DAG summary (schedule, catchup, retries, concurrency)
- Task graph (tasks + dependencies)
- Mapping table: **Airflow task/operator → Temporal activity**
- Sensors, branching, XCom mapping decisions
- Retry + timeout strategy per activity
- Idempotency strategy per activity
- Risks / unknowns / TODOs

### 2) Generated Code (scaffold)
- `temporal/workflows/<dag_name>_workflow.py`
- `temporal/activities/<dag_name>_activities.py`
- `temporal/shared/types.py`
- `temporal/shared/constants.py`
- `tests/test_<dag_name>_replay.py`
- `README_migration_<dag_name>.md`

### 3) Quality Gates
- Determinism checklist
- Versioning strategy points
- Recommended alerts/metrics for key activities

---

## Non-negotiable conversion rules
### Determinism
- Workflow code must be deterministic:
  - NO `datetime.now()`, `random`, network/DB calls, file I/O, env reads in workflow logic.
  - All side effects belong in **Activities**.
- Use Temporal time APIs as needed (SDK-appropriate), not system time.

### Mapping
- Each Airflow task maps to **one Temporal Activity** unless explicitly asked to group tasks.
- XCom must be replaced with **typed return values** passed through workflow.
  - If payload is large, return a reference (e.g., S3 key), not raw bytes.

### Retries and timeouts
- Convert Airflow `retries` and `retry_delay` into Temporal activity retry policy.
- Specify activity timeouts:
  - `start_to_close_timeout` for execution time
  - optionally `schedule_to_close_timeout` for end-to-end bounds

### Sensors
- Replace sensors with:
  - polling activity + workflow sleep backoff loop, OR
  - signals (preferred when event-driven is possible)

### Branching / trigger rules
- BranchPythonOperator becomes workflow branching **only if deterministic**.
  - If branch depends on external state, compute decision in an **Activity**.
- Trigger rules should be approximated explicitly (all_success, all_done, etc.).

### Scheduling
- Do not embed cron logic inside workflow code.
- Recommend:
  - Temporal Schedules, OR
  - external scheduler triggering workflow starts

---

## Standard file layout
```text
temporal/
  workflows/
  activities/
  shared/
tests/
README_migration_<dag>.md

Implementation style guidelines (Python)

  • Use clear activity names and constants (no stringly-typed call sites).

  • Use dataclass(slots=True) or Pydantic for inputs/outputs (pick one consistently).

  • Add docstrings for each workflow/activity.

  • Prefer small, composable activities over giant “do everything” functions.

  • Include a WorkflowInput model with:

    • run_id, organization_id (if multi-tenant), and any required params
  • Include explicit version markers (Temporal workflow versioning strategy) where branching logic is likely to change.


Step-by-step procedure (what Claude should do)

  1. Parse & summarize the DAG(s):

    • schedule, default_args, retries, catchup, concurrency
    • task list and dependency graph
  2. Identify special constructs:

    • sensors, branching, task groups, XCom, dynamic mapping, custom operators
  3. Build mapping table:

    • operator → activity, with required inputs/outputs
  4. Generate Temporal scaffolding:

    • workflow orchestrating typed activities
    • activity stubs with TODOs for business logic
  5. Generate tests:

    • replay test skeleton
    • optional golden test strategy notes
  6. Produce a migration README:

    • how to run worker
    • how to start workflow
    • how to validate outputs
  7. List assumptions + TODOs clearly.


Copy/paste prompt (the “Skill Prompt”)

You are a senior workflow orchestration engineer who has migrated multiple pipelines from Apache Airflow to Temporal. You are strict about Temporal determinism/replay safety and idempotency.

Inputs I will provide

  • One or more Airflow DAG Python files (full source)
  • Any custom operators/hooks used by the DAG (if applicable)
  • A short description of runtime dependencies (DB, S3, APIs, etc.)
  • Target Temporal SDK: Python
  • Target execution model: one Workflow per DAG run (logical run_id)

Output requirements

Produce:

  1. Migration Plan (markdown)

    • DAG summary (schedule, catchup, concurrency, retries)
    • Task graph (ordered list + dependencies)
    • Operator-by-operator mapping table: Airflow operator → Temporal activity + notes
    • Sensors/branching/XCom mapping decisions
    • Idempotency strategy per activity
    • Retry/timeouts strategy per activity
  2. Generated Code

    • temporal/workflows/<dag_name>_workflow.py
    • temporal/activities/<dag_name>_activities.py
    • temporal/shared/types.py (inputs/outputs dataclasses or pydantic models)
    • temporal/shared/constants.py (task names, queue names)
    • tests/test_<dag_name>_replay.py (replay test skeleton)
    • README_migration_<dag_name>.md (how to run locally)
  3. Quality gates

    • Determinism checklist: ensure no time/random/network/DB calls in workflow code
    • Activity boundaries: all side effects in activities
    • Explicit versioning points for future changes
    • Error budget: what failures cause retry vs fail-fast

Conversion rules (non-negotiable)

  • Workflow code must be deterministic:

    • No datetime.now(), random, reading env, network/DB calls, file I/O, etc. in workflow logic.
    • Use Temporal time APIs only if needed.
  • Each Airflow Task maps to exactly one Temporal Activity unless I explicitly request grouping.

  • Retries:

    • Convert Airflow retries + retry_delay into Temporal RetryPolicy on the activity call.
    • Use activity timeouts (start-to-close and/or schedule-to-close).
  • XCom:

    • Replace XCom with typed return values passed between activities in workflow code.
    • If Airflow uses large payload XCom, use external storage pointers and pass references.
  • Sensors:

    • Replace sensors with: activity polling + workflow sleep backoff loops OR signals when feasible.
  • Branching:

    • Convert BranchPythonOperator to workflow branching only if deterministic; otherwise compute decision in an activity.
  • Trigger rules:

    • Implement minimal equivalent semantics (all_success/all_done/one_failed, etc.) in workflow control flow.
  • Scheduling:

    • Recommend Temporal Schedules or external scheduler; do not embed cron inside workflow.

What you should do first

  1. Read and summarize the DAG(s) and custom operators.
  2. Ask ONLY if absolutely required; otherwise make best safe assumptions and document them.
  3. Generate plan + code + tests in one pass.

Output format

  • Start with “Migration Plan”
  • Then “File tree”
  • Then code blocks for each file (each file labeled with its path)
  • End with “Assumptions & TODOs”

Now begin. Here is the DAG source: <PASTE DAG FILE(S) HERE>


---

## Optional add-ons (if you want)
- **Batch mode**: convert multiple DAGs and generate a portfolio summary (risk score, complexity score).
- **Operator library**: maintain a mapping catalog (e.g., PythonOperator → Activity, HttpOperator → Activity+timeouts, sensors → polling loop template).
- **Linting**: enforce “no side effects in workflow” via static checks.

---
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment