Smithery Logo
MCPsSkillsDocsPricing
Login
NewFlame, an assistant that learns and improves. Available onTelegramSlack
    majesticlabs-dev

    etl-incremental-patterns

    majesticlabs-dev/etl-incremental-patterns
    Data & Analytics
    20

    About

    SKILL.md

    Install

    • Telegram
      Telegram
    • Slack
      Slack
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    • Download skill
    ├─
    ├─
    └─
    Smithery Logo

    Give agents more agency

    Resources

    DocumentationPrivacy PolicySystem Status

    Company

    PricingAboutBlog

    Connect

    © 2026 Smithery. All rights reserved.

    About

    Incremental data loading patterns including backfill strategies, CDC, timestamp-based loads, and pipeline orchestration.

    SKILL.md

    ETL Incremental Patterns

    Patterns for incremental data loading and backfill operations.

    Backfill Strategy

    from datetime import date, timedelta
    from concurrent.futures import ThreadPoolExecutor, as_completed
    
    def backfill_date_range(
        start: date,
        end: date,
        process_fn: callable,
        parallel: int = 4
    ) -> None:
        """Backfill data for a date range."""
        dates = []
        current = start
        while current <= end:
            dates.append(current)
            current += timedelta(days=1)
    
        # Process in parallel with controlled concurrency
        with ThreadPoolExecutor(max_workers=parallel) as executor:
            futures = {executor.submit(process_fn, d): d for d in dates}
            for future in as_completed(futures):
                d = futures[future]
                try:
                    future.result()
                    print(f"Completed: {d}")
                except Exception as e:
                    print(f"Failed: {d} - {e}")
    
    # Usage
    backfill_date_range(
        start=date(2024, 1, 1),
        end=date(2024, 3, 31),
        process_fn=process_daily_data,
        parallel=4
    )
    

    Incremental Load Patterns

    Timestamp-Based Incremental

    def incremental_by_timestamp(table: str, timestamp_col: str) -> pd.DataFrame:
        last_run = get_last_run_timestamp(table)
        query = f"""
            SELECT * FROM {table}
            WHERE {timestamp_col} > :last_run
            ORDER BY {timestamp_col}
        """
        df = pd.read_sql(query, engine, params={'last_run': last_run})
        if not df.empty:
            set_last_run_timestamp(table, df[timestamp_col].max())
        return df
    

    Change Data Capture (CDC)

    def process_cdc_events(events: list[dict]) -> None:
        for event in events:
            op = event['operation']  # INSERT, UPDATE, DELETE
            data = event['data']
    
            if op == 'DELETE':
                soft_delete(data['id'])
            else:
                upsert(data)
    

    Full Refresh with Swap

    def full_refresh_with_swap(df: pd.DataFrame, table: str) -> None:
        temp_table = f"{table}_temp"
        df.to_sql(temp_table, engine, if_exists='replace', index=False)
    
        with engine.begin() as conn:
            conn.execute(text(f"DROP TABLE IF EXISTS {table}_old"))
            conn.execute(text(f"ALTER TABLE {table} RENAME TO {table}_old"))
            conn.execute(text(f"ALTER TABLE {temp_table} RENAME TO {table}"))
            conn.execute(text(f"DROP TABLE {table}_old"))
    

    Pipeline Orchestration

    from enum import Enum
    from dataclasses import dataclass, field
    
    class StepStatus(Enum):
        PENDING = "pending"
        RUNNING = "running"
        SUCCESS = "success"
        FAILED = "failed"
        SKIPPED = "skipped"
    
    @dataclass
    class PipelineStep:
        name: str
        func: callable
        dependencies: list[str] = field(default_factory=list)
        status: StepStatus = StepStatus.PENDING
        error: str | None = None
    
    class Pipeline:
        def __init__(self, name: str):
            self.name = name
            self.steps: dict[str, PipelineStep] = {}
    
        def add_step(self, name: str, func: callable, depends_on: list[str] = None):
            self.steps[name] = PipelineStep(name, func, depends_on or [])
    
        def run(self) -> bool:
            for step in self._topological_sort():
                # Skip if dependencies failed
                if any(self.steps[d].status == StepStatus.FAILED for d in step.dependencies):
                    step.status = StepStatus.SKIPPED
                    continue
    
                step.status = StepStatus.RUNNING
                try:
                    step.func()
                    step.status = StepStatus.SUCCESS
                except Exception as e:
                    step.status = StepStatus.FAILED
                    step.error = str(e)
    
            return all(s.status == StepStatus.SUCCESS for s in self.steps.values())
    
        def _topological_sort(self) -> list[PipelineStep]:
            # Implementation of topological sort for dependency ordering
            ...
    

    Load Strategy Decision Matrix

    Scenario Pattern When to Use
    Small tables (<100K rows) Full refresh Daily/hourly loads
    Large tables with timestamps Timestamp incremental Continuous sync
    Source supports CDC CDC events Real-time updates
    One-time historical load Parallel backfill Initial migration
    Critical tables Swap pattern Zero-downtime refresh
    Recommended Servers
    ThinAir Data
    ThinAir Data
    Nimble MCP Server
    Nimble MCP Server
    abm.dev
    abm.dev
    Repository
    majesticlabs-dev/majestic-marketplace
    Files