Skip to main content
ClaudeWave
Skill389 estrellas del repoactualizado 3d ago

tracing-upstream-lineage

This skill traces upstream data lineage by identifying target objects, finding their producing Airflow DAGs, and recursively mapping source tables and external systems. Use it when users need to understand data origins, identify what feeds a specific table or column, or discover upstream dependencies in their data pipeline.

Instalar en Claude Code
Copiar
git clone --depth 1 https://github.com/astronomer/agents /tmp/tracing-upstream-lineage && cp -r /tmp/tracing-upstream-lineage/skills/tracing-upstream-lineage ~/.claude/skills/tracing-upstream-lineage
Después abre una sesión nueva de Claude Code; el skill carga automáticamente.

SKILL.md

# Upstream Lineage: Sources

Trace the origins of data - answer "Where does this data come from?"

## Lineage Investigation

### Step 1: Identify the Target Type

Determine what we're tracing:
- **Table**: Trace what populates this table
- **Column**: Trace where this specific column comes from
- **DAG**: Trace what data sources this DAG reads from

### Step 2: Find the Producing DAG

Tables are typically populated by Airflow DAGs. Find the connection:

1. **Search DAGs by name**: Use `af dags list` and look for DAG names matching the table name
   - `load_customers` -> `customers` table
   - `etl_daily_orders` -> `orders` table

2. **Explore DAG source code**: Use `af dags source <dag_id>` to read the DAG definition
   - Look for INSERT, MERGE, CREATE TABLE statements
   - Find the target table in the code

3. **Check DAG tasks**: Use `af tasks list <dag_id>` to see what operations the DAG performs

### On Astro

If you're running on Astro, the **Lineage tab** in the Astro UI provides visual lineage exploration across DAGs and datasets. Use it to quickly trace upstream dependencies without manually searching DAG source code.

### On OSS Airflow

Use DAG source code and task logs to trace lineage (no built-in cross-DAG UI).

### Step 3: Trace Data Sources

From the DAG code, identify source tables and systems:

**SQL Sources** (look for FROM clauses):
```python
# In DAG code:
SELECT * FROM source_schema.source_table  # <- This is an upstream source
```

**External Sources** (look for connection references):
- `S3Operator` -> S3 bucket source
- `PostgresOperator` -> Postgres database source
- `SalesforceOperator` -> Salesforce API source
- `HttpOperator` -> REST API source

**File Sources**:
- CSV/Parquet files in object storage
- SFTP drops
- Local file paths

### Step 4: Build the Lineage Chain

Recursively trace each source:

```
TARGET: analytics.orders_daily
    ^
    +-- DAG: etl_daily_orders
            ^
            +-- SOURCE: raw.orders (table)
            |       ^
            |       +-- DAG: ingest_orders
            |               ^
            |               +-- SOURCE: Salesforce API (external)
            |
            +-- SOURCE: dim.customers (table)
                    ^
                    +-- DAG: load_customers
                            ^
                            +-- SOURCE: PostgreSQL (external DB)
```

### Step 5: Check Source Health

For each upstream source:
- **Tables**: Check freshness with the **checking-freshness** skill
- **DAGs**: Check recent run status with `af dags stats`
- **External systems**: Note connection info from DAG code

## Lineage for Columns

When tracing a specific column:

1. Find the column in the target table schema
2. Search DAG source code for references to that column name
3. Trace through transformations:
   - Direct mappings: `source.col AS target_col`
   - Transformations: `COALESCE(a.col, b.col) AS target_col`
   - Aggregations: `SUM(detail.amount) AS total_amount`

## Output: Lineage Report

### Summary
One-line answer: "This table is populated by DAG X from sources Y and Z"

### Lineage Diagram
```
[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG: ingest_sfdc         DAG: transform_sales
```

### Source Details

| Source | Type | Connection | Freshness | Owner |
|--------|------|------------|-----------|-------|
| raw.orders | Table | Internal | 2h ago | data-team |
| Salesforce | API | salesforce_conn | Real-time | sales-ops |

### Transformation Chain
Describe how data flows and transforms:
1. Raw data lands in `raw.orders` via Salesforce API sync
2. DAG `transform_orders` cleans and dedupes into `stg.orders`
3. DAG `build_order_facts` joins with dimensions into `fct.orders`

### Data Quality Implications
- Single points of failure?
- Stale upstream sources?
- Complex transformation chains that could break?

### Related Skills
- Check source freshness: **checking-freshness** skill
- Debug source DAG: **debugging-dags** skill
- Trace downstream impacts: **tracing-downstream-lineage** skill
- Add manual lineage annotations: **annotating-task-lineage** skill
- Build custom lineage extractors: **creating-openlineage-extractors** skill
add-adapter-methodSlash Command

Add a new method to both Airflow adapters

add-toolSlash Command

Add a new MCP tool to server.py

check-airflow-compatSlash Command

Verify code works with both Airflow 2.x and 3.x

airflow-adapterSkill

Airflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.

airflow-hitlSkill

Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).

airflow-pluginsSkill

Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.

airflowSkill

Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.

analyzing-dataSkill

Queries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.