Skip to main content
ClaudeWave
Skill389 estrellas del repoactualizado 3d ago

airflow-hitl

The airflow-hitl skill enables pausing Apache Airflow DAGs to collect human decisions through the UI or REST API using deferrable operators that release worker slots while waiting. Use this skill when building workflows that require approval gates, form submissions, branching based on human choices, or other human-in-the-loop patterns in Airflow 3.1 and later, and consult the live Airflow registry to verify current class signatures before implementation.

Instalar en Claude Code
Copiar
git clone --depth 1 https://github.com/astronomer/agents /tmp/airflow-hitl && cp -r /tmp/airflow-hitl/skills/airflow-hitl ~/.claude/skills/airflow-hitl
Después abre una sesión nueva de Claude Code; el skill carga automáticamente.

SKILL.md

# Airflow Human-in-the-Loop Operators

Pause a DAG until a human responds via the Airflow UI or REST API. HITL operators are deferrable — they release their worker slot while waiting.

> **Requires Airflow 3.1+** (`af config version`).
>
> **UI location**: Browse → Required Actions. Respond from the task instance page's Required Actions tab.
>
> **Cross-references**: `airflow-ai` for AI/LLM task decorators; `airflow` for registry and API discovery commands used below.

---

## Step 1 — Pick the capability you need

| Capability | Class (verify in Step 2) |
|---|---|
| Approve or reject; downstream skips on reject | `ApprovalOperator` |
| Present N options and return which were chosen | `HITLOperator` |
| Branch to one or more downstream tasks based on a choice | `HITLBranchOperator` |
| Collect a form (no approve/select step) | `HITLEntryOperator` |
| Use the HITL trigger directly (advanced / custom operators) | `HITLTrigger` |

This is the only place class names are hardcoded. The provider adds, renames, and removes params across releases — do not copy parameter lists from memory. Fetch the current signature before writing code.

---

## Step 2 — Discover the current signatures from the Airflow Registry

Before writing HITL code, run these to see the live roster and constructor params (see the `airflow` skill for the full `af registry` reference):

```bash
# Every HITL-related module in the standard provider
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'

# Constructor signatures: name, type, default, required, description
af registry parameters standard \
  | jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'

# Pin to the exact installed provider version
af config providers \
  | jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# then: af registry parameters standard --version <VERSION>
```

If the registry shows a param that this skill does not mention, prefer the registry. If the registry shows a class that is not in Step 1, treat it as additive — the decision table above may be stale.

---

## Step 3 — Canonical example (approval gate)

Starting point for any HITL task. Adapt by swapping the class name and params per Step 2.

```python
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",              # Auto-selected on timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()
```

For the other classes in Step 1, the shape is the same (`task_id`, `subject`, plus class-specific params). Verify each constructor through Step 2 — for example, `HITLBranchOperator` requires every option either to match a downstream task id directly or to be resolved via a mapping param surfaced in the registry.

---

## Step 4 — Behavior contracts (stable across versions)

### Timeout
- With `defaults` set: task succeeds on timeout, default option(s) selected.
- Without `defaults`: task fails on timeout.

### Markdown + Jinja in `body`
`body` supports Markdown and is Jinja-templatable. Render XCom context directly:

```python
body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""
```

### Callbacks
All HITL operators accept the standard Airflow callback kwargs (`on_success_callback`, `on_failure_callback`, etc.).

### Notifiers
HITL operators accept a `notifiers` list. Inside a notifier's `notify(context)` method, build a link to the pending task with `HITLOperator.generate_link_to_ui_from_context(context, base_url=...)`.

### Restricting who can respond
The parameter name and accepted identifier format depend on the active auth manager. Do **not** hardcode — check which one is active and which kwarg the current provider exposes:

```bash
af config show | jq '.auth_manager // .core.auth_manager'
```

Then look up the current kwarg in Step 2 (at the time of writing it is `assigned_users`, accepting identifiers in whatever format the active auth manager uses — Astro uses the Astro user ID, FabAuthManager uses email, SimpleAuthManager uses username).

---

## Step 5 — Responding from external integrations

For Slack bots, custom apps, or scripts. Discover the live endpoint rather than hardcoding a path:

```bash
af api ls --filter hitl           # live endpoint list
af api spec \
  | jq '.paths | to_entries[] | select(.key | test("hitl"))'   # request/response schemas
```

The PATCH-to-respond pattern is stable; the exact path is discovered. Typical shape:

```python
import os, requests

HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}

# List pending — use the path from `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})

# Respond — same discovered path family, PATCH
requests.patch(
    f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
    headers=HEADERS,
    json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)
```

---

## Step 6 — Safety checks

- [ ] Airflow version ≥ 3.1 (`af config version`).
- [ ] Constructor kwargs match the current registry output from Step 2 — no `respondents`-vs-`assigned_users` style drift.
- [ ] For branching: every option resolves
add-adapter-methodSlash Command

Add a new method to both Airflow adapters

add-toolSlash Command

Add a new MCP tool to server.py

check-airflow-compatSlash Command

Verify code works with both Airflow 2.x and 3.x

airflow-adapterSkill

Airflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.

airflow-pluginsSkill

Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.

airflowSkill

Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.

analyzing-dataSkill

Queries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.

annotating-task-lineageSkill

Annotate Airflow tasks with data lineage using inlets and outlets. Use when the user wants to add lineage metadata to tasks, specify input/output datasets, or enable lineage tracking for operators without built-in OpenLineage extraction.