cosmos-dbt-fusion
cosmos-dbt-fusion is a Claude Code skill for configuring and implementing dbt Fusion projects with Astronomer Cosmos 1.11 or later. Use it when running Fusion on supported warehouses (Snowflake, Databricks, BigQuery, Redshift) with local execution mode, after confirming your dbt engine is Fusion rather than Core and verifying that async execution and virtualenv are not required for your setup.
git clone --depth 1 https://github.com/astronomer/agents /tmp/cosmos-dbt-fusion && cp -r /tmp/cosmos-dbt-fusion/skills/cosmos-dbt-fusion ~/.claude/skills/cosmos-dbt-fusionSKILL.md
# Cosmos + dbt Fusion: Implementation Checklist
Execute steps in order. This skill covers Fusion-specific constraints only.
> **Version note**: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.
>
> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md)** for ProfileConfig, operator_args, and Airflow 3 compatibility details.
> **Before starting**, confirm: (1) dbt engine = Fusion (not Core → use **cosmos-dbt-core**), (2) warehouse = Snowflake, Databricks, Bigquery and Redshift only.
### Fusion-Specific Constraints
| Constraint | Details |
|------------|---------|
| No async | `AIRFLOW_ASYNC` not supported |
| No virtualenv | Fusion is a binary, not a Python package |
| Warehouse support | Snowflake, Databricks, Bigquery and Redshift support [while in preview](https://github.com/dbt-labs/dbt-fusion) |
---
## 1. Confirm Cosmos Version
> **CRITICAL**: Cosmos 1.11.0 introduced dbt Fusion compatibility.
```bash
# Check installed version
pip show astronomer-cosmos
# Install/upgrade if needed
pip install "astronomer-cosmos>=1.11.0"
```
**Validate**: `pip show astronomer-cosmos` reports version ≥ 1.11.0
---
## 2. Install the dbt Fusion Binary (REQUIRED)
dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.
Determine where to install the Fusion binary (Dockerfile / base image / runtime).
### Example Dockerfile Install
```dockerfile
USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro
```
### Common Install Paths
| Environment | Typical path |
|-------------|--------------|
| Astro Runtime | `/home/astro/.local/bin/dbt` |
| System-wide | `/usr/local/bin/dbt` |
**Validate**: The `dbt` binary exists at the chosen path and `dbt --version` succeeds.
---
## 3. Choose Parsing Strategy (RenderConfig)
Parsing strategy is the same as dbt Core. Pick ONE:
| Load mode | When to use | Required inputs |
|-----------|-------------|-----------------|
| `dbt_manifest` | Large projects; fastest parsing | `ProjectConfig.manifest_path` |
| `dbt_ls` | Complex selectors; need dbt-native selection | Fusion binary accessible to scheduler |
| `automatic` | Simple setups; let Cosmos pick | (none) |
```python
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.AUTOMATIC, # or DBT_MANIFEST, DBT_LS
)
```
---
## 4. Configure Warehouse Connection (ProfileConfig)
> **Reference**: See **[reference/cosmos-config.md](reference/cosmos-config.md#profileconfig-warehouse-connection)** for full ProfileConfig options and examples.
```python
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
```
---
## 5. Configure ExecutionConfig (LOCAL Only)
> **CRITICAL**: dbt Fusion with Cosmos requires `ExecutionMode.LOCAL` with `dbt_executable_path` pointing to the Fusion binary.
```python
from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode
_execution_config = ExecutionConfig(
invocation_mode=InvocationMode.SUBPROCESS,
dbt_executable_path="/home/astro/.local/bin/dbt", # REQUIRED: path to Fusion binary
# execution_mode is LOCAL by default - do not change
)
```
---
## 6. Configure Project (ProjectConfig)
```python
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# install_dbt_deps=False, # if deps precomputed in CI
)
```
---
## 7. Assemble DAG / TaskGroup
### Option A: DbtDag (Standalone)
```python
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig(
dbt_executable_path="/home/astro/.local/bin/dbt", # Fusion binary
)
_render_config = RenderConfig()
my_fusion_dag = DbtDag(
dag_id="my_fusion_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
```
### Option B: DbtTaskGroup (Inside Existing DAG)
```python
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_fusion_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
```
---
## 8. Final Validation
Before finalizing, verify:
- [ ] **Cosmos version**: ≥1.11.0
- [ ] **Fusion binary installed**: Path exists and is executable
- [ ] **Warehouse supported**: Snowflake, Databricks, BigqueryAdd a new method to both Airflow adapters
Add a new MCP tool to server.py
Verify code works with both Airflow 2.x and 3.x
Airflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.
Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.
Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.
Queries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.