creating-openlineage-extractors
This Claude Code skill provides guidance for creating custom OpenLineage extractors to capture data lineage from Airflow operators lacking built-in support. Use it when you need lineage from unsupported or third-party operators, require column-level lineage tracking, or must implement complex extraction logic beyond standard inlet/outlet mechanisms.
git clone --depth 1 https://github.com/astronomer/agents /tmp/creating-openlineage-extractors && cp -r /tmp/creating-openlineage-extractors/skills/creating-openlineage-extractors ~/.claude/skills/creating-openlineage-extractorsSKILL.md
# Creating OpenLineage Extractors
This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.
> **Reference:** See the [OpenLineage provider developer guide](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/developer.html) for the latest patterns and list of supported operators/hooks.
## When to Use Each Approach
| Scenario | Approach |
|----------|----------|
| Operator you own/maintain | **OpenLineage Methods** (recommended, simplest) |
| Third-party operator you can't modify | Custom Extractor |
| Need column-level lineage | OpenLineage Methods or Custom Extractor |
| Complex extraction logic | OpenLineage Methods or Custom Extractor |
| Simple table-level lineage | Inlets/Outlets (simplest, but lowest priority) |
> **Important:** Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.
### On Astro
Astro includes built-in OpenLineage integration — no additional transport configuration is needed. Lineage events are automatically collected and displayed in the Astro UI's **Lineage tab**. Custom extractors deployed to an Astro project are automatically picked up, so you only need to register them in `airflow.cfg` or via environment variable and deploy.
---
## Two Approaches
### 1. OpenLineage Methods (Recommended)
Use when you can add methods directly to your custom operator. This is the **go-to solution** for operators you own.
### 2. Custom Extractors
Use when you need lineage from third-party or provider operators that you **cannot modify**.
---
## Approach 1: OpenLineage Methods (Recommended)
When you own the operator, add OpenLineage methods directly:
```python
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return None
```
### OpenLineage Methods Reference
| Method | When Called | Required |
|--------|-------------|----------|
| `get_openlineage_facets_on_start()` | Task enters RUNNING | No |
| `get_openlineage_facets_on_complete(ti)` | Task succeeds | No |
| `get_openlineage_facets_on_failure(ti)` | Task fails | No |
> Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.
---
## Approach 2: Custom Extractors
Use this approach only when you **cannot modify** the operator (e.g., third-party or provider operators).
### Basic Structure
```python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return None
```
### OperatorLineage Structure
```python
from airflow.providAdd a new method to both Airflow adapters
Add a new MCP tool to server.py
Verify code works with both Airflow 2.x and 3.x
Airflow adapter pattern for v2/v3 API compatibility. Use when working with adapters, version detection, or adding new API methods that need to work across Airflow 2.x and 3.x.
Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.
Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.
Queries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.