Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    affaan-m

    clickhouse-io

    affaan-m/clickhouse-io
    Data & Analytics
    42,727
    4 installs

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    ClickHouse数据库模式、查询优化、分析和数据工程最佳实践,适用于高性能分析工作负载。

    SKILL.md

    ClickHouse 分析模式

    用于高性能分析和数据工程的 ClickHouse 特定模式。

    何时激活

    • 设计 ClickHouse 表架构(MergeTree 引擎选择)
    • 编写分析查询(聚合、窗口函数、连接)
    • 优化查询性能(分区裁剪、投影、物化视图)
    • 摄取大量数据(批量插入、Kafka 集成)
    • 为分析目的从 PostgreSQL/MySQL 迁移到 ClickHouse
    • 实现实时仪表板或时间序列分析

    概述

    ClickHouse 是一个用于在线分析处理 (OLAP) 的列式数据库管理系统 (DBMS)。它针对大型数据集上的快速分析查询进行了优化。

    关键特性:

    • 列式存储
    • 数据压缩
    • 并行查询执行
    • 分布式查询
    • 实时分析

    表设计模式

    MergeTree 引擎 (最常用)

    CREATE TABLE markets_analytics (
        date Date,
        market_id String,
        market_name String,
        volume UInt64,
        trades UInt32,
        unique_traders UInt32,
        avg_trade_size Float64,
        created_at DateTime
    ) ENGINE = MergeTree()
    PARTITION BY toYYYYMM(date)
    ORDER BY (date, market_id)
    SETTINGS index_granularity = 8192;
    

    ReplacingMergeTree (去重)

    -- For data that may have duplicates (e.g., from multiple sources)
    CREATE TABLE user_events (
        event_id String,
        user_id String,
        event_type String,
        timestamp DateTime,
        properties String
    ) ENGINE = ReplacingMergeTree()
    PARTITION BY toYYYYMM(timestamp)
    ORDER BY (user_id, event_id, timestamp)
    PRIMARY KEY (user_id, event_id);
    

    AggregatingMergeTree (预聚合)

    -- For maintaining aggregated metrics
    CREATE TABLE market_stats_hourly (
        hour DateTime,
        market_id String,
        total_volume AggregateFunction(sum, UInt64),
        total_trades AggregateFunction(count, UInt32),
        unique_users AggregateFunction(uniq, String)
    ) ENGINE = AggregatingMergeTree()
    PARTITION BY toYYYYMM(hour)
    ORDER BY (hour, market_id);
    
    -- Query aggregated data
    SELECT
        hour,
        market_id,
        sumMerge(total_volume) AS volume,
        countMerge(total_trades) AS trades,
        uniqMerge(unique_users) AS users
    FROM market_stats_hourly
    WHERE hour >= toStartOfHour(now() - INTERVAL 24 HOUR)
    GROUP BY hour, market_id
    ORDER BY hour DESC;
    

    查询优化模式

    高效过滤

    -- PASS: GOOD: Use indexed columns first
    SELECT *
    FROM markets_analytics
    WHERE date >= '2025-01-01'
      AND market_id = 'market-123'
      AND volume > 1000
    ORDER BY date DESC
    LIMIT 100;
    
    -- FAIL: BAD: Filter on non-indexed columns first
    SELECT *
    FROM markets_analytics
    WHERE volume > 1000
      AND market_name LIKE '%election%'
      AND date >= '2025-01-01';
    

    聚合

    -- PASS: GOOD: Use ClickHouse-specific aggregation functions
    SELECT
        toStartOfDay(created_at) AS day,
        market_id,
        sum(volume) AS total_volume,
        count() AS total_trades,
        uniq(trader_id) AS unique_traders,
        avg(trade_size) AS avg_size
    FROM trades
    WHERE created_at >= today() - INTERVAL 7 DAY
    GROUP BY day, market_id
    ORDER BY day DESC, total_volume DESC;
    
    -- PASS: Use quantile for percentiles (more efficient than percentile)
    SELECT
        quantile(0.50)(trade_size) AS median,
        quantile(0.95)(trade_size) AS p95,
        quantile(0.99)(trade_size) AS p99
    FROM trades
    WHERE created_at >= now() - INTERVAL 1 HOUR;
    

    窗口函数

    -- Calculate running totals
    SELECT
        date,
        market_id,
        volume,
        sum(volume) OVER (
            PARTITION BY market_id
            ORDER BY date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS cumulative_volume
    FROM markets_analytics
    WHERE date >= today() - INTERVAL 30 DAY
    ORDER BY market_id, date;
    

    数据插入模式

    批量插入 (推荐)

    import { ClickHouse } from 'clickhouse'
    
    const clickhouse = new ClickHouse({
      url: process.env.CLICKHOUSE_URL,
      port: 8123,
      basicAuth: {
        username: process.env.CLICKHOUSE_USER,
        password: process.env.CLICKHOUSE_PASSWORD
      }
    })
    
    // PASS: Batch insert (efficient)
    async function bulkInsertTrades(trades: Trade[]) {
      const values = trades.map(trade => `(
        '${trade.id}',
        '${trade.market_id}',
        '${trade.user_id}',
        ${trade.amount},
        '${trade.timestamp.toISOString()}'
      )`).join(',')
    
      await clickhouse.query(`
        INSERT INTO trades (id, market_id, user_id, amount, timestamp)
        VALUES ${values}
      `).toPromise()
    }
    
    // FAIL: Individual inserts (slow)
    async function insertTrade(trade: Trade) {
      // Don't do this in a loop!
      await clickhouse.query(`
        INSERT INTO trades VALUES ('${trade.id}', ...)
      `).toPromise()
    }
    

    流式插入

    // For continuous data ingestion
    import { createWriteStream } from 'fs'
    import { pipeline } from 'stream/promises'
    
    async function streamInserts() {
      const stream = clickhouse.insert('trades').stream()
    
      for await (const batch of dataSource) {
        stream.write(batch)
      }
    
      await stream.end()
    }
    

    物化视图

    实时聚合

    -- Create materialized view for hourly stats
    CREATE MATERIALIZED VIEW market_stats_hourly_mv
    TO market_stats_hourly
    AS SELECT
        toStartOfHour(timestamp) AS hour,
        market_id,
        sumState(amount) AS total_volume,
        countState() AS total_trades,
        uniqState(user_id) AS unique_users
    FROM trades
    GROUP BY hour, market_id;
    
    -- Query the materialized view
    SELECT
        hour,
        market_id,
        sumMerge(total_volume) AS volume,
        countMerge(total_trades) AS trades,
        uniqMerge(unique_users) AS users
    FROM market_stats_hourly
    WHERE hour >= now() - INTERVAL 24 HOUR
    GROUP BY hour, market_id;
    

    性能监控

    查询性能

    -- Check slow queries
    SELECT
        query_id,
        user,
        query,
        query_duration_ms,
        read_rows,
        read_bytes,
        memory_usage
    FROM system.query_log
    WHERE type = 'QueryFinish'
      AND query_duration_ms > 1000
      AND event_time >= now() - INTERVAL 1 HOUR
    ORDER BY query_duration_ms DESC
    LIMIT 10;
    

    表统计信息

    -- Check table sizes
    SELECT
        database,
        table,
        formatReadableSize(sum(bytes)) AS size,
        sum(rows) AS rows,
        max(modification_time) AS latest_modification
    FROM system.parts
    WHERE active
    GROUP BY database, table
    ORDER BY sum(bytes) DESC;
    

    常见分析查询

    时间序列分析

    -- Daily active users
    SELECT
        toDate(timestamp) AS date,
        uniq(user_id) AS daily_active_users
    FROM events
    WHERE timestamp >= today() - INTERVAL 30 DAY
    GROUP BY date
    ORDER BY date;
    
    -- Retention analysis
    SELECT
        signup_date,
        countIf(days_since_signup = 0) AS day_0,
        countIf(days_since_signup = 1) AS day_1,
        countIf(days_since_signup = 7) AS day_7,
        countIf(days_since_signup = 30) AS day_30
    FROM (
        SELECT
            user_id,
            min(toDate(timestamp)) AS signup_date,
            toDate(timestamp) AS activity_date,
            dateDiff('day', signup_date, activity_date) AS days_since_signup
        FROM events
        GROUP BY user_id, activity_date
    )
    GROUP BY signup_date
    ORDER BY signup_date DESC;
    

    漏斗分析

    -- Conversion funnel
    SELECT
        countIf(step = 'viewed_market') AS viewed,
        countIf(step = 'clicked_trade') AS clicked,
        countIf(step = 'completed_trade') AS completed,
        round(clicked / viewed * 100, 2) AS view_to_click_rate,
        round(completed / clicked * 100, 2) AS click_to_completion_rate
    FROM (
        SELECT
            user_id,
            session_id,
            event_type AS step
        FROM events
        WHERE event_date = today()
    )
    GROUP BY session_id;
    

    队列分析

    -- User cohorts by signup month
    SELECT
        toStartOfMonth(signup_date) AS cohort,
        toStartOfMonth(activity_date) AS month,
        dateDiff('month', cohort, month) AS months_since_signup,
        count(DISTINCT user_id) AS active_users
    FROM (
        SELECT
            user_id,
            min(toDate(timestamp)) OVER (PARTITION BY user_id) AS signup_date,
            toDate(timestamp) AS activity_date
        FROM events
    )
    GROUP BY cohort, month, months_since_signup
    ORDER BY cohort, months_since_signup;
    

    数据流水线模式

    ETL 模式

    // Extract, Transform, Load
    async function etlPipeline() {
      // 1. Extract from source
      const rawData = await extractFromPostgres()
    
      // 2. Transform
      const transformed = rawData.map(row => ({
        date: new Date(row.created_at).toISOString().split('T')[0],
        market_id: row.market_slug,
        volume: parseFloat(row.total_volume),
        trades: parseInt(row.trade_count)
      }))
    
      // 3. Load to ClickHouse
      await bulkInsertToClickHouse(transformed)
    }
    
    // Run periodically
    setInterval(etlPipeline, 60 * 60 * 1000)  // Every hour
    

    变更数据捕获 (CDC)

    // Listen to PostgreSQL changes and sync to ClickHouse
    import { Client } from 'pg'
    
    const pgClient = new Client({ connectionString: process.env.DATABASE_URL })
    
    pgClient.query('LISTEN market_updates')
    
    pgClient.on('notification', async (msg) => {
      const update = JSON.parse(msg.payload)
    
      await clickhouse.insert('market_updates', [
        {
          market_id: update.id,
          event_type: update.operation,  // INSERT, UPDATE, DELETE
          timestamp: new Date(),
          data: JSON.stringify(update.new_data)
        }
      ])
    })
    

    最佳实践

    1. 分区策略

    • 按时间分区 (通常是月或日)
    • 避免过多分区 (影响性能)
    • 对分区键使用 DATE 类型

    2. 排序键

    • 将最常过滤的列放在前面
    • 考虑基数 (高基数优先)
    • 排序影响压缩

    3. 数据类型

    • 使用最合适的较小类型 (UInt32 对比 UInt64)
    • 对重复字符串使用 LowCardinality
    • 对分类数据使用 Enum

    4. 避免

    • SELECT * (指定列)
    • FINAL (改为在查询前合并数据)
    • 过多的 JOIN (分析场景下进行反规范化)
    • 频繁的小批量插入 (改为批量)

    5. 监控

    • 跟踪查询性能
    • 监控磁盘使用情况
    • 检查合并操作
    • 查看慢查询日志

    记住: ClickHouse 擅长分析工作负载。根据查询模式设计表,批量插入,并利用物化视图进行实时聚合。

    Recommended Servers
    ClickHouse
    ClickHouse
    Browser tool
    Browser tool
    Bizfile MCP — Global Company Intelligence
    Bizfile MCP — Global Company Intelligence
    Repository
    affaan-m/everything-claude-code
    Files