Skip to main content
ClaudeWave
Skill374 estrellas del repoactualizado 6mo ago

ingesting-data

This skill provides reusable patterns and code examples for loading data from external sources including cloud storage buckets, files, REST APIs, and streaming platforms into databases and data warehouses. Use it when building ETL pipelines, importing CSV or JSON files, consuming API feeds, migrating legacy databases, or setting up streaming data ingestion with tools like Kafka or Kinesis.

Instalar en Claude Code
Copiar
git clone --depth 1 https://github.com/ancoleman/ai-design-components /tmp/ingesting-data && cp -r /tmp/ingesting-data/skills/ingesting-data ~/.claude/skills/ingesting-data
Después abre una sesión nueva de Claude Code; el skill carga automáticamente.

SKILL.md

# Data Ingestion Patterns

This skill provides patterns for getting data INTO systems from external sources.

## When to Use This Skill

- Importing CSV, JSON, Parquet, or Excel files
- Loading data from S3, GCS, or Azure Blob storage
- Consuming REST/GraphQL API feeds
- Building ETL/ELT pipelines
- Database migration and CDC (Change Data Capture)
- Streaming data ingestion from Kafka/Kinesis

## Ingestion Pattern Decision Tree

```
What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md
```

## Quick Start by Language

### Python (Recommended for ETL)

**dlt (data load tool) - Modern Python ETL:**
```python
import dlt

# Define a source
@dlt.source
def github_source(repo: str):
    @dlt.resource(write_disposition="merge", primary_key="id")
    def issues():
        response = requests.get(f"https://api.github.com/repos/{repo}/issues")
        yield response.json()
    return issues

# Load to destination
pipeline = dlt.pipeline(
    pipeline_name="github_issues",
    destination="postgres",  # or duckdb, bigquery, snowflake
    dataset_name="github_data"
)

load_info = pipeline.run(github_source("owner/repo"))
print(load_info)
```

**Polars for file processing (faster than pandas):**
```python
import polars as pl

# Read CSV with schema inference
df = pl.read_csv("data.csv")

# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")

# Read JSON lines
df = pl.read_ndjson("events.jsonl")

# Write to database
df.write_database(
    table_name="events",
    connection="postgresql://user:pass@localhost/db",
    if_table_exists="append"
)
```

### TypeScript/Node.js

**S3 ingestion:**
```typescript
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";

const s3 = new S3Client({ region: "us-east-1" });

async function ingestFromS3(bucket: string, key: string) {
  const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
  const body = await response.Body?.transformToString();

  // Parse CSV
  const records = parse(body, { columns: true, skip_empty_lines: true });

  // Insert to database
  await db.insert(eventsTable).values(records);
}
```

**API feed polling:**
```typescript
import { Hono } from "hono";

// Webhook receiver for real-time ingestion
const app = new Hono();

app.post("/webhooks/stripe", async (c) => {
  const event = await c.req.json();

  // Validate webhook signature
  const signature = c.req.header("stripe-signature");
  // ... validation logic

  // Ingest event
  await db.insert(stripeEventsTable).values({
    eventId: event.id,
    type: event.type,
    data: event.data,
    receivedAt: new Date()
  });

  return c.json({ received: true });
});
```

### Rust

**High-performance file ingestion:**
```rust
use polars::prelude::*;
use aws_sdk_s3::Client;

async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
    // Download from S3
    let resp = client.get_object()
        .bucket(bucket)
        .key(key)
        .send()
        .await?;

    let bytes = resp.body.collect().await?.into_bytes();

    // Parse with Polars
    let df = ParquetReader::new(Cursor::new(bytes))
        .finish()?;

    Ok(df)
}
```

### Go

**Concurrent file processing:**
```go
package main

import (
    "context"
    "encoding/csv"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
    resp, err := client.GetObject(ctx, &s3.GetObjectInput{
        Bucket: &bucket,
        Key:    &key,
    })
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    reader := csv.NewReader(resp.Body)
    records, err := reader.ReadAll()
    if err != nil {
        return err
    }

    // Batch insert to database
    return batchInsert(ctx, records)
}
```

## Ingestion Patterns

### 1. Batch Ingestion (Files/Storage)

For periodic bulk loads:

```
Source → Extract → Transform → Load → Validate
  ↓         ↓          ↓         ↓        ↓
 S3      Download   Clean/Map  Insert   Count check
```

**Key considerations:**
- Use chunked reading for large files (>100MB)
- Implement idempotency with checksums
- Track file processing state
- Handle partial failures

### 2. Streaming Ingestion (Real-time)

For continuous data flow:

```
Source → Buffer → Process → Load → Ack
  ↓        ↓         ↓        ↓      ↓
Kafka   In-memory  Transform  DB   Commit offset
```

**Key considerations:**
- At-least-once vs exactly-once semantics
- Backpressure handling
- Dead letter queues for failures
- Checkpoint management

### 3. API Polling (Feeds)

For external API data:

```
Schedule → Fetch → Dedupe → Load → Update cursor
   ↓         ↓        ↓       ↓         ↓
 Cron     API call  By ID   Insert   Last timestamp
```

**Key considerations:**
- Rate limiting and backoff
- Incremental loading (cursors, timestamps)
- API pagination handling
- Retry with exponential backoff

### 4. Change Data Capture (CDC)

For database replication:

```
Source DB → Capture changes → Transform → Target DB
    ↓             ↓               ↓            ↓
 Postgres    Debezium/WAL      Map schema   Insert/Update
```

**Key considerations:**
- Initial snapshot + streaming changes
- Schema evolution handling
- Ordering guarantees
- Conflict resolution

## Library Recommendations

| Use Case | Python | TypeScript | Rust | Go |
|----------|--------|------------|------|-----|
| **ETL Framework** | dlt, Meltano, Dagster | - | - | - |
| **Cloud Storage** | boto3, gcsfs, adlfs | @aws-sdk/*, @google-cloud/* | aws-sdk-s3, object_store | aws-sdk-go-v2 |
| **File Processing** | polars, pandas, pyarrow | papaparse, xlsx, parquetjs | polars-rs, arrow-rs |
administering-linuxSkill

Manage Linux systems covering systemd services, process management, filesystems, networking, performance tuning, and troubleshooting. Use when deploying applications, optimizing server performance, diagnosing production issues, or managing users and security on Linux servers.

ai-data-engineeringSkill

Data pipelines, feature stores, and embedding generation for AI/ML systems. Use when building RAG pipelines, ML feature serving, or data transformations. Covers feature stores (Feast, Tecton), embedding pipelines, chunking strategies, orchestration (Dagster, Prefect, Airflow), dbt transformations, data versioning (LakeFS), and experiment tracking (MLflow, W&B).

architecting-dataSkill

Strategic guidance for designing modern data platforms, covering storage paradigms (data lake, warehouse, lakehouse), modeling approaches (dimensional, normalized, data vault, wide tables), data mesh principles, and medallion architecture patterns. Use when architecting data platforms, choosing between centralized vs decentralized patterns, selecting table formats (Iceberg, Delta Lake), or designing data governance frameworks.

architecting-networksSkill

Design cloud network architectures with VPC patterns, subnet strategies, zero trust principles, and hybrid connectivity. Use when planning VPC topology, implementing multi-cloud networking, or establishing secure network segmentation for cloud workloads.

architecting-securitySkill

Design comprehensive security architectures using defense-in-depth, zero trust principles, threat modeling (STRIDE, PASTA), and control frameworks (NIST CSF, CIS Controls, ISO 27001). Use when designing security for new systems, auditing existing architectures, or establishing security governance programs.

assembling-componentsSkill

Assembles component outputs from AI Design Components skills into unified, production-ready component systems with validated token integration, proper import chains, and framework-specific scaffolding. Use as the capstone skill after running theming, layout, dashboard, data-viz, or feedback skills to wire components into working React/Next.js, Python, or Rust projects.

building-ai-chatSkill

Builds AI chat interfaces and conversational UI with streaming responses, context management, and multi-modal support. Use when creating ChatGPT-style interfaces, AI assistants, code copilots, or conversational agents. Handles streaming text, token limits, regeneration, feedback loops, tool usage visualization, and AI-specific error patterns. Provides battle-tested components from leading AI products with accessibility and performance built in.

building-ci-pipelinesSkill

Constructs secure, efficient CI/CD pipelines with supply chain security (SLSA), monorepo optimization, caching strategies, and parallelization patterns for GitHub Actions, GitLab CI, and Argo Workflows. Use when setting up automated testing, building, or deployment workflows.