Skip to main content
ClaudeWave
Skill389 repo starsupdated 3d ago

annotating-task-lineage

This skill provides guidance for manually adding data lineage metadata to Apache Airflow tasks using inlets and outlets. Use it when operators lack built-in OpenLineage extraction methods and you need to annotate input and output datasets for lineage tracking visible in Astro's UI. The approach supports both OpenLineage Dataset objects and Airflow Asset types for defining data dependencies.

Install in Claude Code
Copy
git clone --depth 1 https://github.com/astronomer/agents /tmp/annotating-task-lineage && cp -r /tmp/annotating-task-lineage/skills/annotating-task-lineage ~/.claude/skills/annotating-task-lineage
Then start a new Claude Code session; the skill loads automatically.

SKILL.md

# Annotating Task Lineage with Inlets & Outlets

This skill guides you through adding manual lineage annotations to Airflow tasks using `inlets` and `outlets`.

> **Reference:** See the [OpenLineage provider developer guide](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/developer.html) for the latest supported operators and patterns.

### On Astro

Lineage annotations defined with inlets and outlets are visualized in Astro's enhanced **Lineage tab**, which provides cross-DAG and cross-deployment lineage views. This means your annotations are immediately visible in the Astro UI, giving you a unified view of data flow across your entire Astro organization.

## When to Use This Approach

| Scenario | Use Inlets/Outlets? |
|----------|---------------------|
| Operator has OpenLineage methods (`get_openlineage_facets_on_*`) | ❌ Modify the OL method directly |
| Operator has no built-in OpenLineage extractor | ✅ Yes |
| Simple table-level lineage is sufficient | ✅ Yes |
| Quick lineage setup without custom code | ✅ Yes |
| Need column-level lineage | ❌ Use OpenLineage methods or custom extractor |
| Complex extraction logic needed | ❌ Use OpenLineage methods or custom extractor |

> **Note:** Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors.

---

## Supported Types for Inlets/Outlets

You can use **OpenLineage Dataset** objects or **Airflow Assets** for inlets and outlets:

### OpenLineage Datasets (Recommended)

```python
from openlineage.client.event_v2 import Dataset

# Database tables
source_table = Dataset(
    namespace="postgres://mydb:5432",
    name="public.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)

# Files
input_file = Dataset(
    namespace="s3://my-bucket",
    name="raw/events/2024-01-01.json",
)
```

### Airflow Assets (Airflow 3+)

```python
from airflow.sdk import Asset

# Using Airflow's native Asset type
orders_asset = Asset(uri="s3://my-bucket/data/orders")
```

### Airflow Datasets (Airflow 2.4+)

```python
from airflow.datasets import Dataset

# Using Airflow's Dataset type (Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")
```

---

## Basic Usage

### Setting Inlets and Outlets on Operators

```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum

# Define your lineage datasets
source_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="raw.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)
output_file = Dataset(
    namespace="s3://my-bucket",
    name="exports/orders.parquet",
)

with DAG(
    dag_id="etl_with_lineage",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
) as dag:

    transform = BashOperator(
        task_id="transform_orders",
        bash_command="echo 'transforming...'",
        inlets=[source_table],           # What this task reads
        outlets=[target_table],          # What this task writes
    )

    export = BashOperator(
        task_id="export_to_s3",
        bash_command="echo 'exporting...'",
        inlets=[target_table],           # Reads from previous output
        outlets=[output_file],           # Writes to S3
    )

    transform >> export
```

### Multiple Inputs and Outputs

Tasks often read from multiple sources and write to multiple destinations:

```python
from openlineage.client.event_v2 import Dataset

# Multiple source tables
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")

# Multiple output tables
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")

aggregate_task = PythonOperator(
    task_id="build_daily_aggregates",
    python_callable=build_aggregates,
    inlets=[customers, orders, products],      # All inputs
    outlets=[daily_summary, customer_metrics], # All outputs
)
```

---

## Setting Lineage in Custom Operators

When building custom operators, you have two options:

### Option 1: Implement OpenLineage Methods (Recommended)

This is the preferred approach as it gives you full control over lineage extraction:

```python
from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        """Return lineage after successful execution."""
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )
```

### Option 2: Set Inlets/Outlets Dynamically

For simpler cases, set lineage within the `execute` method (non-deferrable operators only):

```python
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.so
add-adapter-methodSlash Command

Add a new method to both Airflow adapters

add-toolSlash Command

Add a new MCP tool to server.py

check-airflow-compatSlash Command

Verify code works with both Airflow 2.x and 3.x

airflow-adapterSkill

Airflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.

airflow-hitlSkill

Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).

airflow-pluginsSkill

Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.

airflowSkill

Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.

analyzing-dataSkill

Queries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.