Skip to main content
ClaudeWave
Skill389 estrellas del repoactualizado 3d ago

testing-dags

This skill executes iterative test-debug-fix cycles for Airflow DAGs using the `af` CLI. Use it when users request complex testing workflows like "test this DAG and fix it if it fails" or "run the pipeline and troubleshoot issues." It's designed for multi-step scenarios requiring debugging and code modifications, whereas simple test requests go directly to the airflow entrypoint skill.

Instalar en Claude Code
Copiar
git clone --depth 1 https://github.com/astronomer/agents /tmp/testing-dags && cp -r /tmp/testing-dags/skills/testing-dags ~/.claude/skills/testing-dags
Después abre una sesión nueva de Claude Code; el skill carga automáticamente.

SKILL.md

# DAG Testing Skill

Use `af` commands to test, debug, and fix DAGs in iterative cycles.

## Running the CLI

These commands assume `af` is on PATH. Run via `astro otto` to get it automatically, or install standalone with `uv tool install astro-airflow-mcp`.

---

## Quick Validation with Astro CLI

If the user has the Astro CLI available, these commands provide fast feedback without needing a running Airflow instance:

```bash
# Parse DAGs to catch import errors, syntax issues, and DAG-level problems
astro dev parse

# Run pytest against DAGs (runs tests in tests/ directory)
astro dev pytest
```

Use these for quick validation during development. For full end-to-end testing against a live Airflow instance, continue to the trigger-and-wait workflow below.

---

## FIRST ACTION: Just Trigger the DAG

When the user asks to test a DAG, your **FIRST AND ONLY action** should be:

```bash
af runs trigger-wait <dag_id>
```

**DO NOT:**
- Call `af dags list` first
- Call `af dags get` first
- Call `af dags errors` first
- Use `grep` or `ls` or any other bash command
- Do any "pre-flight checks"

**Just trigger the DAG.** If it fails, THEN debug.

---

## Testing Workflow Overview

```
┌─────────────────────────────────────┐
│ 1. TRIGGER AND WAIT                 │
│    Run DAG, wait for completion     │
└─────────────────────────────────────┘
                 ↓
        ┌───────┴───────┐
        ↓               ↓
   ┌─────────┐    ┌──────────┐
   │ SUCCESS │    │ FAILED   │
   │ Done!   │    │ Debug... │
   └─────────┘    └──────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 2. DEBUG (only if failed)           │
        │    Get logs, identify root cause    │
        └─────────────────────────────────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 3. FIX AND RETEST                   │
        │    Apply fix, restart from step 1   │
        └─────────────────────────────────────┘
```

**Philosophy: Try first, debug on failure.** Don't waste time on pre-flight checks — just run the DAG and diagnose if something goes wrong.

---

## Phase 1: Trigger and Wait

Use `af runs trigger-wait` to test the DAG:

### Primary Method: Trigger and Wait

```bash
af runs trigger-wait <dag_id> --timeout 300
```

**Example:**

```bash
af runs trigger-wait my_dag --timeout 300
```

**Why this is the preferred method:**
- Single command handles trigger + monitoring
- Returns immediately when DAG completes (success or failure)
- Includes failed task details if run fails
- No manual polling required

### Response Interpretation

**Success:**
```json
{
  "dag_run": {
    "dag_id": "my_dag",
    "dag_run_id": "manual__2025-01-14T...",
    "state": "success",
    "start_date": "...",
    "end_date": "..."
  },
  "timed_out": false,
  "elapsed_seconds": 45.2
}
```

**Failure:**
```json
{
  "dag_run": {
    "state": "failed"
  },
  "timed_out": false,
  "elapsed_seconds": 30.1,
  "failed_tasks": [
    {
      "task_id": "extract_data",
      "state": "failed",
      "try_number": 2
    }
  ]
}
```

**Timeout:**
```json
{
  "dag_id": "my_dag",
  "dag_run_id": "manual__...",
  "state": "running",
  "timed_out": true,
  "elapsed_seconds": 300.0,
  "message": "Timed out after 300 seconds. DAG run is still running."
}
```

### Alternative: Trigger and Monitor Separately

Use this only when you need more control:

```bash
# Step 1: Trigger
af runs trigger my_dag
# Returns: {"dag_run_id": "manual__...", "state": "queued"}

# Step 2: Check status
af runs get my_dag manual__2025-01-14T...
# Returns current state
```

---

## Handling Results

### If Success

The DAG ran successfully. Summarize for the user:
- Total elapsed time
- Number of tasks completed
- Any notable outputs (if visible in logs)

**You're done!**

### If Timed Out

The DAG is still running. Options:
1. Check current status: `af runs get <dag_id> <dag_run_id>`
2. Ask user if they want to continue waiting
3. Increase timeout and try again

### If Failed

Move to Phase 2 (Debug) to identify the root cause.

---

## Phase 2: Debug Failures (Only If Needed)

When a DAG run fails, use these commands to diagnose:

### Get Comprehensive Diagnosis

```bash
af runs diagnose <dag_id> <dag_run_id>
```

Returns in one call:
- Run metadata (state, timing)
- All task instances with states
- Summary of failed tasks
- State counts (success, failed, skipped, etc.)

### Get Task Logs

```bash
af tasks logs <dag_id> <dag_run_id> <task_id>
```

**Example:**

```bash
af tasks logs my_dag manual__2025-01-14T... extract_data
```

**For specific retry attempt:**

```bash
af tasks logs my_dag manual__2025-01-14T... extract_data --try 2
```

**Look for:**
- Exception messages and stack traces
- Connection errors (database, API, S3)
- Permission errors
- Timeout errors
- Missing dependencies

### Check Upstream Tasks

If a task shows `upstream_failed`, the root cause is in an upstream task. Use `af runs diagnose` to find which task actually failed.

### Check Import Errors (If DAG Didn't Run)

If the trigger failed because the DAG doesn't exist:

```bash
af dags errors
```

This reveals syntax errors or missing dependencies that prevented the DAG from loading.

---

## Phase 3: Fix and Retest

Once you identify the issue:

### Common Fixes

| Issue | Fix |
|-------|-----|
| Missing import | Add to DAG file |
| Missing package | Add to `requirements.txt` |
| Connection error | Check `af config connections`, verify credentials |
| Variable missing | Check `af config variables`, create if needed |
| Timeout | Increase task timeout or optimize query |
| Permission error | Check credentials in connection |

### After Fixing

1. Save the file
2. **Retest:** `af runs trigger-wait <dag_id>`

**Repeat the test → debug → fix loop until the DAG succeeds.**

---

## CLI Quick Reference

| Phase | Command | Purpose |
|-------|---------|---------|
| Test | `af runs trigger-wait <dag_id>` | **Primary test method — start here*
add-adapter-methodSlash Command

Add a new method to both Airflow adapters

add-toolSlash Command

Add a new MCP tool to server.py

check-airflow-compatSlash Command

Verify code works with both Airflow 2.x and 3.x

airflow-adapterSkill

Airflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.

airflow-hitlSkill

Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).

airflow-pluginsSkill

Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.

airflowSkill

Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.

analyzing-dataSkill

Queries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.