Skip to main content
ClaudeWave
Skill389 estrellas del repoactualizado 3d ago

cosmos-dbt-fusion

cosmos-dbt-fusion is a Claude Code skill for configuring and implementing dbt Fusion projects with Astronomer Cosmos 1.11 or later. Use it when running Fusion on supported warehouses (Snowflake, Databricks, BigQuery, Redshift) with local execution mode, after confirming your dbt engine is Fusion rather than Core and verifying that async execution and virtualenv are not required for your setup.

Instalar en Claude Code
Copiar
git clone --depth 1 https://github.com/astronomer/agents /tmp/cosmos-dbt-fusion && cp -r /tmp/cosmos-dbt-fusion/skills/cosmos-dbt-fusion ~/.claude/skills/cosmos-dbt-fusion
Después abre una sesión nueva de Claude Code; el skill carga automáticamente.

SKILL.md

# Cosmos + dbt Fusion: Implementation Checklist

Execute steps in order. This skill covers Fusion-specific constraints only.

> **Version note**: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.
>
> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md)** for ProfileConfig, operator_args, and Airflow 3 compatibility details.

> **Before starting**, confirm: (1) dbt engine = Fusion (not Core → use **cosmos-dbt-core**), (2) warehouse = Snowflake, Databricks, Bigquery and Redshift only.

### Fusion-Specific Constraints

| Constraint | Details |
|------------|---------|
| No async | `AIRFLOW_ASYNC` not supported |
| No virtualenv | Fusion is a binary, not a Python package |
| Warehouse support | Snowflake, Databricks, Bigquery and Redshift support [while in preview](https://github.com/dbt-labs/dbt-fusion) |

---

## 1. Confirm Cosmos Version

> **CRITICAL**: Cosmos 1.11.0 introduced dbt Fusion compatibility.

```bash
# Check installed version
pip show astronomer-cosmos

# Install/upgrade if needed
pip install "astronomer-cosmos>=1.11.0"
```

**Validate**: `pip show astronomer-cosmos` reports version ≥ 1.11.0

---

## 2. Install the dbt Fusion Binary (REQUIRED)

dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.

Determine where to install the Fusion binary (Dockerfile / base image / runtime).

### Example Dockerfile Install

```dockerfile
USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro
```

### Common Install Paths

| Environment | Typical path |
|-------------|--------------|
| Astro Runtime | `/home/astro/.local/bin/dbt` |
| System-wide | `/usr/local/bin/dbt` |

**Validate**: The `dbt` binary exists at the chosen path and `dbt --version` succeeds.

---

## 3. Choose Parsing Strategy (RenderConfig)

Parsing strategy is the same as dbt Core. Pick ONE:

| Load mode | When to use | Required inputs |
|-----------|-------------|-----------------|
| `dbt_manifest` | Large projects; fastest parsing | `ProjectConfig.manifest_path` |
| `dbt_ls` | Complex selectors; need dbt-native selection | Fusion binary accessible to scheduler |
| `automatic` | Simple setups; let Cosmos pick | (none) |

```python
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.AUTOMATIC,  # or DBT_MANIFEST, DBT_LS
)
```

---

## 4. Configure Warehouse Connection (ProfileConfig)

> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md#profileconfig-warehouse-connection)** for full ProfileConfig options and examples.


```python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)
```

---

## 5. Configure ExecutionConfig (LOCAL Only)

> **CRITICAL**: dbt Fusion with Cosmos requires `ExecutionMode.LOCAL` with `dbt_executable_path` pointing to the Fusion binary.

```python
from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode

_execution_config = ExecutionConfig(
    invocation_mode=InvocationMode.SUBPROCESS,
    dbt_executable_path="/home/astro/.local/bin/dbt",  # REQUIRED: path to Fusion binary
    # execution_mode is LOCAL by default - do not change
)
```

---

## 6. Configure Project (ProjectConfig)

```python
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # install_dbt_deps=False,  # if deps precomputed in CI
)
```

---

## 7. Assemble DAG / TaskGroup

### Option A: DbtDag (Standalone)

```python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # Fusion binary
)

_render_config = RenderConfig()

my_fusion_dag = DbtDag(
    dag_id="my_fusion_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)
```

### Option B: DbtTaskGroup (Inside Existing DAG)

```python
from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_fusion_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()
```

---

## 8. Final Validation

Before finalizing, verify:

- [ ] **Cosmos version**: ≥1.11.0
- [ ] **Fusion binary installed**: Path exists and is executable
- [ ] **Warehouse supported**: Snowflake, Databricks, Bigquery
add-adapter-methodSlash Command

Add a new method to both Airflow adapters

add-toolSlash Command

Add a new MCP tool to server.py

check-airflow-compatSlash Command

Verify code works with both Airflow 2.x and 3.x

airflow-adapterSkill

Airflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.

airflow-hitlSkill

Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).

airflow-pluginsSkill

Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.

airflowSkill

Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.

analyzing-dataSkill

Queries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.