Skip to main content
ClaudeWave
Skill1.2k repo starsupdated 3mo ago

clickhouse-io

This Claude Code skill provides ClickHouse database patterns, query optimization techniques, and data engineering best practices for high-performance analytical workloads. It covers table design patterns using MergeTree engine variants, filtering and aggregation strategies, and window function implementations specifically optimized for ClickHouse's columnar architecture and OLAP use cases. Use this when building fast analytical queries on large datasets or designing efficient data pipelines in ClickHouse.

Install in Claude Code
Copy
git clone --depth 1 https://github.com/xu-xiang/everything-claude-code-zh /tmp/clickhouse-io && cp -r /tmp/clickhouse-io/docs/ja-JP/skills/clickhouse-io ~/.claude/skills/clickhouse-io
Then start a new Claude Code session; the skill loads automatically.

SKILL.md

# ClickHouse 分析模式

针对高性能分析和数据工程的 ClickHouse 特定模式。

## 概览

ClickHouse 是一个用于联机分析处理(OLAP)的列式数据库管理系统(DBMS)。它针对大规模数据集上的快速分析查询进行了优化。

**主要功能:**
- 列式存储
- 数据压缩
- 并行查询执行
- 分布式查询
- 实时分析

## 表设计模式

### MergeTree 引擎(最常用)

```sql
CREATE TABLE markets_analytics (
    date Date,
    market_id String,
    market_name String,
    volume UInt64,
    trades UInt32,
    unique_traders UInt32,
    avg_trade_size Float64,
    created_at DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, market_id)
SETTINGS index_granularity = 8192;
```

### ReplacingMergeTree (重复数据删除)

```sql
-- 用于可能存在重复的数据(如来自多个源的数据)
CREATE TABLE user_events (
    event_id String,
    user_id String,
    event_type String,
    timestamp DateTime,
    properties String
) ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, event_id, timestamp)
PRIMARY KEY (user_id, event_id);
```

### AggregatingMergeTree (预聚合)

```sql
-- 用于维护聚合指标
CREATE TABLE market_stats_hourly (
    hour DateTime,
    market_id String,
    total_volume AggregateFunction(sum, UInt64),
    total_trades AggregateFunction(count, UInt32),
    unique_users AggregateFunction(uniq, String)
) ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, market_id);

-- 查询聚合数据
SELECT
    hour,
    market_id,
    sumMerge(total_volume) AS volume,
    countMerge(total_trades) AS trades,
    uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
GROUP BY hour, market_id
ORDER BY hour DESC;
```

## 查询优化模式

### 高效过滤

```sql
-- ✅ 推荐:优先使用索引列
SELECT *
FROM markets_analytics
WHERE date >= '2025-01-01'
  AND market_id = 'market-123'
  AND volume > 1000
ORDER BY date DESC
LIMIT 100;

-- ❌ 不推荐:先过滤非索引列
SELECT *
FROM markets_analytics
WHERE volume > 1000
  AND market_name LIKE '%election%'
  AND date >= '2025-01-01';
```

### 聚合

```sql
-- ✅ 推荐:使用 ClickHouse 特有的聚合函数
SELECT
    toStartOfDay(created_at) AS day,
    market_id,
    sum(volume) AS total_volume,
    count() AS total_trades,
    uniq(trader_id) AS unique_traders,
    avg(trade_size) AS avg_size
FROM trades
WHERE created_at >= today() - INTERVAL 7 DAY
GROUP BY day, market_id
ORDER BY day DESC, total_volume DESC;

-- ✅ 计算分位数请使用 quantile(比 percentile 更高效)
SELECT
    quantile(0.50)(trade_size) AS median,
    quantile(0.95)(trade_size) AS p95,
    quantile(0.99)(trade_size) AS p99
FROM trades
WHERE created_at >= now() - INTERVAL 1 HOUR;
```

### 窗口函数 (Window Functions)

```sql
-- 累计计算
SELECT
    date,
    market_id,
    volume,
    sum(volume) OVER (
        PARTITION BY market_id
        ORDER BY date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS cumulative_volume
FROM markets_analytics
WHERE date >= today() - INTERVAL 30 DAY
ORDER BY market_id, date;
```

## 数据插入模式

### 批量插入 (推荐)

```typescript
import { ClickHouse } from 'clickhouse'

const clickhouse = new ClickHouse({
  url: process.env.CLICKHOUSE_URL,
  port: 8123,
  basicAuth: {
    username: process.env.CLICKHOUSE_USER,
    password: process.env.CLICKHOUSE_PASSWORD
  }
})

// ✅ 批量插入(高效)
async function bulkInsertTrades(trades: Trade[]) {
  const values = trades.map(trade => `(
    '${trade.id}',
    '${trade.market_id}',
    '${trade.user_id}',
    ${trade.amount},
    '${trade.timestamp.toISOString()}'
  )`).join(',')

  await clickhouse.query(`
    INSERT INTO trades (id, market_id, user_id, amount, timestamp)
    VALUES ${values}
  `).toPromise()
}

// ❌ 逐条插入(低效)
async function insertTrade(trade: Trade) {
  // 请勿在循环中执行此操作!
  await clickhouse.query(`
    INSERT INTO trades VALUES ('${trade.id}', ...)
  `).toPromise()
}
```

### 流式插入

```typescript
// 用于持续的数据摄取
import { createWriteStream } from 'fs'
import { pipeline } from 'stream/promises'

async function streamInserts() {
  const stream = clickhouse.insert('trades').stream()

  for await (const batch of dataSource) {
    stream.write(batch)
  }

  await stream.end()
}
```

## 物化视图 (Materialized Views)

### 实时聚合

```sql
-- 创建按小时统计的物化视图
CREATE MATERIALIZED VIEW market_stats_hourly_mv
TO market_stats_hourly
AS SELECT
    toStartOfHour(timestamp) AS hour,
    market_id,
    sumState(amount) AS total_volume,
    countState() AS total_trades,
    uniqState(user_id) AS unique_users
FROM trades
GROUP BY hour, market_id;

-- 查询物化视图
SELECT
    hour,
    market_id,
    sumMerge(total_volume) AS volume,
    countMerge(total_trades) AS trades,
    uniqMerge(unique_users) AS users
FROM market_stats_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, market_id;
```

## 性能监控

### 查询性能

```sql
-- 检查慢查询
SELECT
    query_id,
    user,
    query,
    query_duration_ms,
    read_rows,
    read_bytes,
    memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
  AND query_duration_ms > 1000
  AND event_time >= now() - INTERVAL 1 HOUR
ORDER BY query_duration_ms DESC
LIMIT 10;
```

### 表统计信息

```sql
-- 检查表大小
SELECT
    database,
    table,
    formatReadableSize(sum(bytes)) AS size,
    sum(rows) AS rows,
    max(modification_time) AS latest_modification
FROM system.parts
WHERE active
GROUP BY database, table
ORDER BY sum(bytes) DESC;
```

## 常见分析查询

### 时序分析

```sql
-- 日活跃用户 (DAU)
SELECT
    toDate(timestamp) AS date,
    uniq(user_id) AS daily_active_users
FROM events
WHERE timestamp >= today() - INTERVAL 30 DAY
GROUP BY date
ORDER BY date;

-- 留存分析
SELECT
    signup_date,
    countIf(days_since_signup = 0) AS day_0,
    countIf(days_since_signup = 1) AS day_1,
    countIf(days_since_signup = 7) AS day_7,
    countIf(days_since_signup = 30) AS day_30
FROM (
    SELECT
        user_id,
        min(toDate(timestamp)) AS signup_date,
        toDate(timestamp) AS activity_date,
        dateDiff('day', signup_date, activity_date) AS days_since_signup
    FROM events
    GROUP BY user_id, activity_date
)
GROUP BY signup_date
ORDER BY signup_date DESC;
```

### 漏斗分析 (Funnel Analysis)

```sql
-- 转化漏斗
SELECT
    countIf(step = 'viewed_market') AS viewed,
    countIf(step = 'clicked_trade') AS clicked,
api-designSkill

生产级 API 的 REST API 设计模式,包括资源命名、状态码、分页、过滤、错误响应、版本控制和速率限制。

article-writingSkill

编写文章、指南、博客、教程、时事通讯(Newsletter)等长内容,支持从示例或品牌指南中提取独特的语感语调。适用于需要撰写超过一个段落的精炼文本,尤其是对语气一致性、结构和可信度有较高要求时。

backend-patternsSkill

后端架构模式、API 设计、数据库优化以及 Node.js、Express 和 Next.js API 路由的服务端最佳实践。

coding-standardsSkill

TypeScript、JavaScript、React、Node.js 开发的通用编码标准、最佳实践和模式。

content-engineSkill

为 X、LinkedIn、TikTok、YouTube、时事通讯(Newsletters)以及跨平台内容重加工营销活动(Repurposed multi-platform campaigns)创建平台原生的内容系统。当用户需要社交媒体帖子、推文串(Threads)、脚本、内容日历,或将单一源素材清晰地适配到多个平台时使用。

e2e-testingSkill

Playwright E2E 测试模式、页面对象模型(POM)、配置、CI/CD 集成、产物管理以及不稳定测试(flaky test)策略。

eval-harnessSkill

适用于 Claude Code 会话的正规评测框架(Evaluation Framework),实现了评测驱动开发(Eval-Driven Development, EDD)原则

frontend-patternsSkill

React、Next.js、状态管理(State Management)、性能优化(Performance Optimization)及 UI 最佳实践的前端开发模式。