Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    wshobson

    airflow-dag-patterns

    wshobson/airflow-dag-patterns
    Data & Analytics
    28,185
    2 installs

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    Build production Apache Airflow DAGs with best practices for operators, sensors, testing, and deployment. Use when creating data pipelines, orchestrating workflows, or scheduling batch jobs.

    SKILL.md

    Apache Airflow DAG Patterns

    Production-ready patterns for Apache Airflow including DAG design, operators, sensors, testing, and deployment strategies.

    When to Use This Skill

    • Creating data pipeline orchestration with Airflow
    • Designing DAG structures and dependencies
    • Implementing custom operators and sensors
    • Testing Airflow DAGs locally
    • Setting up Airflow in production
    • Debugging failed DAG runs

    Core Concepts

    1. DAG Design Principles

    Principle Description
    Idempotent Running twice produces same result
    Atomic Tasks succeed or fail completely
    Incremental Process only new/changed data
    Observable Logs, metrics, alerts at every step

    2. Task Dependencies

    # Linear
    task1 >> task2 >> task3
    
    # Fan-out
    task1 >> [task2, task3, task4]
    
    # Fan-in
    [task1, task2, task3] >> task4
    
    # Complex
    task1 >> task2 >> task4
    task1 >> task3 >> task4
    

    Quick Start

    # dags/example_dag.py
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.empty import EmptyOperator
    
    default_args = {
        'owner': 'data-team',
        'depends_on_past': False,
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'retry_exponential_backoff': True,
        'max_retry_delay': timedelta(hours=1),
    }
    
    with DAG(
        dag_id='example_etl',
        default_args=default_args,
        description='Example ETL pipeline',
        schedule='0 6 * * *',  # Daily at 6 AM
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['etl', 'example'],
        max_active_runs=1,
    ) as dag:
    
        start = EmptyOperator(task_id='start')
    
        def extract_data(**context):
            execution_date = context['ds']
            # Extract logic here
            return {'records': 1000}
    
        extract = PythonOperator(
            task_id='extract',
            python_callable=extract_data,
        )
    
        end = EmptyOperator(task_id='end')
    
        start >> extract >> end
    

    Patterns

    Pattern 1: TaskFlow API (Airflow 2.0+)

    # dags/taskflow_example.py
    from datetime import datetime
    from airflow.decorators import dag, task
    from airflow.models import Variable
    
    @dag(
        dag_id='taskflow_etl',
        schedule='@daily',
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['etl', 'taskflow'],
    )
    def taskflow_etl():
        """ETL pipeline using TaskFlow API"""
    
        @task()
        def extract(source: str) -> dict:
            """Extract data from source"""
            import pandas as pd
    
            df = pd.read_csv(f's3://bucket/{source}/{{ ds }}.csv')
            return {'data': df.to_dict(), 'rows': len(df)}
    
        @task()
        def transform(extracted: dict) -> dict:
            """Transform extracted data"""
            import pandas as pd
    
            df = pd.DataFrame(extracted['data'])
            df['processed_at'] = datetime.now()
            df = df.dropna()
            return {'data': df.to_dict(), 'rows': len(df)}
    
        @task()
        def load(transformed: dict, target: str):
            """Load data to target"""
            import pandas as pd
    
            df = pd.DataFrame(transformed['data'])
            df.to_parquet(f's3://bucket/{target}/{{ ds }}.parquet')
            return transformed['rows']
    
        @task()
        def notify(rows_loaded: int):
            """Send notification"""
            print(f'Loaded {rows_loaded} rows')
    
        # Define dependencies with XCom passing
        extracted = extract(source='raw_data')
        transformed = transform(extracted)
        loaded = load(transformed, target='processed_data')
        notify(loaded)
    
    # Instantiate the DAG
    taskflow_etl()
    

    Pattern 2: Dynamic DAG Generation

    # dags/dynamic_dag_factory.py
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.models import Variable
    import json
    
    # Configuration for multiple similar pipelines
    PIPELINE_CONFIGS = [
        {'name': 'customers', 'schedule': '@daily', 'source': 's3://raw/customers'},
        {'name': 'orders', 'schedule': '@hourly', 'source': 's3://raw/orders'},
        {'name': 'products', 'schedule': '@weekly', 'source': 's3://raw/products'},
    ]
    
    def create_dag(config: dict) -> DAG:
        """Factory function to create DAGs from config"""
    
        dag_id = f"etl_{config['name']}"
    
        default_args = {
            'owner': 'data-team',
            'retries': 3,
            'retry_delay': timedelta(minutes=5),
        }
    
        dag = DAG(
            dag_id=dag_id,
            default_args=default_args,
            schedule=config['schedule'],
            start_date=datetime(2024, 1, 1),
            catchup=False,
            tags=['etl', 'dynamic', config['name']],
        )
    
        with dag:
            def extract_fn(source, **context):
                print(f"Extracting from {source} for {context['ds']}")
    
            def transform_fn(**context):
                print(f"Transforming data for {context['ds']}")
    
            def load_fn(table_name, **context):
                print(f"Loading to {table_name} for {context['ds']}")
    
            extract = PythonOperator(
                task_id='extract',
                python_callable=extract_fn,
                op_kwargs={'source': config['source']},
            )
    
            transform = PythonOperator(
                task_id='transform',
                python_callable=transform_fn,
            )
    
            load = PythonOperator(
                task_id='load',
                python_callable=load_fn,
                op_kwargs={'table_name': config['name']},
            )
    
            extract >> transform >> load
    
        return dag
    
    # Generate DAGs
    for config in PIPELINE_CONFIGS:
        globals()[f"dag_{config['name']}"] = create_dag(config)
    

    Pattern 3: Branching and Conditional Logic

    # dags/branching_example.py
    from airflow.decorators import dag, task
    from airflow.operators.python import BranchPythonOperator
    from airflow.operators.empty import EmptyOperator
    from airflow.utils.trigger_rule import TriggerRule
    
    @dag(
        dag_id='branching_pipeline',
        schedule='@daily',
        start_date=datetime(2024, 1, 1),
        catchup=False,
    )
    def branching_pipeline():
    
        @task()
        def check_data_quality() -> dict:
            """Check data quality and return metrics"""
            quality_score = 0.95  # Simulated
            return {'score': quality_score, 'rows': 10000}
    
        def choose_branch(**context) -> str:
            """Determine which branch to execute"""
            ti = context['ti']
            metrics = ti.xcom_pull(task_ids='check_data_quality')
    
            if metrics['score'] >= 0.9:
                return 'high_quality_path'
            elif metrics['score'] >= 0.7:
                return 'medium_quality_path'
            else:
                return 'low_quality_path'
    
        quality_check = check_data_quality()
    
        branch = BranchPythonOperator(
            task_id='branch',
            python_callable=choose_branch,
        )
    
        high_quality = EmptyOperator(task_id='high_quality_path')
        medium_quality = EmptyOperator(task_id='medium_quality_path')
        low_quality = EmptyOperator(task_id='low_quality_path')
    
        # Join point - runs after any branch completes
        join = EmptyOperator(
            task_id='join',
            trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
        )
    
        quality_check >> branch >> [high_quality, medium_quality, low_quality] >> join
    
    branching_pipeline()
    

    Pattern 4: Sensors and External Dependencies

    # dags/sensor_patterns.py
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.sensors.filesystem import FileSensor
    from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
    from airflow.sensors.external_task import ExternalTaskSensor
    from airflow.operators.python import PythonOperator
    
    with DAG(
        dag_id='sensor_example',
        schedule='@daily',
        start_date=datetime(2024, 1, 1),
        catchup=False,
    ) as dag:
    
        # Wait for file on S3
        wait_for_file = S3KeySensor(
            task_id='wait_for_s3_file',
            bucket_name='data-lake',
            bucket_key='raw/{{ ds }}/data.parquet',
            aws_conn_id='aws_default',
            timeout=60 * 60 * 2,  # 2 hours
            poke_interval=60 * 5,  # Check every 5 minutes
            mode='reschedule',  # Free up worker slot while waiting
        )
    
        # Wait for another DAG to complete
        wait_for_upstream = ExternalTaskSensor(
            task_id='wait_for_upstream_dag',
            external_dag_id='upstream_etl',
            external_task_id='final_task',
            execution_date_fn=lambda dt: dt,  # Same execution date
            timeout=60 * 60 * 3,
            mode='reschedule',
        )
    
        # Custom sensor using @task.sensor decorator
        @task.sensor(poke_interval=60, timeout=3600, mode='reschedule')
        def wait_for_api() -> PokeReturnValue:
            """Custom sensor for API availability"""
            import requests
    
            response = requests.get('https://api.example.com/health')
            is_done = response.status_code == 200
    
            return PokeReturnValue(is_done=is_done, xcom_value=response.json())
    
        api_ready = wait_for_api()
    
        def process_data(**context):
            api_result = context['ti'].xcom_pull(task_ids='wait_for_api')
            print(f"API returned: {api_result}")
    
        process = PythonOperator(
            task_id='process',
            python_callable=process_data,
        )
    
        [wait_for_file, wait_for_upstream, api_ready] >> process
    

    Pattern 5: Error Handling and Alerts

    # dags/error_handling.py
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.models import Variable
    
    def task_failure_callback(context):
        """Callback on task failure"""
        task_instance = context['task_instance']
        exception = context.get('exception')
    
        # Send to Slack/PagerDuty/etc
        message = f"""
        Task Failed!
        DAG: {task_instance.dag_id}
        Task: {task_instance.task_id}
        Execution Date: {context['ds']}
        Error: {exception}
        Log URL: {task_instance.log_url}
        """
        # send_slack_alert(message)
        print(message)
    
    def dag_failure_callback(context):
        """Callback on DAG failure"""
        # Aggregate failures, send summary
        pass
    
    with DAG(
        dag_id='error_handling_example',
        schedule='@daily',
        start_date=datetime(2024, 1, 1),
        catchup=False,
        on_failure_callback=dag_failure_callback,
        default_args={
            'on_failure_callback': task_failure_callback,
            'retries': 3,
            'retry_delay': timedelta(minutes=5),
        },
    ) as dag:
    
        def might_fail(**context):
            import random
            if random.random() < 0.3:
                raise ValueError("Random failure!")
            return "Success"
    
        risky_task = PythonOperator(
            task_id='risky_task',
            python_callable=might_fail,
        )
    
        def cleanup(**context):
            """Cleanup runs regardless of upstream failures"""
            print("Cleaning up...")
    
        cleanup_task = PythonOperator(
            task_id='cleanup',
            python_callable=cleanup,
            trigger_rule=TriggerRule.ALL_DONE,  # Run even if upstream fails
        )
    
        def notify_success(**context):
            """Only runs if all upstream succeeded"""
            print("All tasks succeeded!")
    
        success_notification = PythonOperator(
            task_id='notify_success',
            python_callable=notify_success,
            trigger_rule=TriggerRule.ALL_SUCCESS,
        )
    
        risky_task >> [cleanup_task, success_notification]
    

    Pattern 6: Testing DAGs

    # tests/test_dags.py
    import pytest
    from datetime import datetime
    from airflow.models import DagBag
    
    @pytest.fixture
    def dagbag():
        return DagBag(dag_folder='dags/', include_examples=False)
    
    def test_dag_loaded(dagbag):
        """Test that all DAGs load without errors"""
        assert len(dagbag.import_errors) == 0, f"DAG import errors: {dagbag.import_errors}"
    
    def test_dag_structure(dagbag):
        """Test specific DAG structure"""
        dag = dagbag.get_dag('example_etl')
    
        assert dag is not None
        assert len(dag.tasks) == 3
        assert dag.schedule_interval == '0 6 * * *'
    
    def test_task_dependencies(dagbag):
        """Test task dependencies are correct"""
        dag = dagbag.get_dag('example_etl')
    
        extract_task = dag.get_task('extract')
        assert 'start' in [t.task_id for t in extract_task.upstream_list]
        assert 'end' in [t.task_id for t in extract_task.downstream_list]
    
    def test_dag_integrity(dagbag):
        """Test DAG has no cycles and is valid"""
        for dag_id, dag in dagbag.dags.items():
            assert dag.test_cycle() is None, f"Cycle detected in {dag_id}"
    
    # Test individual task logic
    def test_extract_function():
        """Unit test for extract function"""
        from dags.example_dag import extract_data
    
        result = extract_data(ds='2024-01-01')
        assert 'records' in result
        assert isinstance(result['records'], int)
    

    Project Structure

    airflow/
    ├── dags/
    │   ├── __init__.py
    │   ├── common/
    │   │   ├── __init__.py
    │   │   ├── operators.py    # Custom operators
    │   │   ├── sensors.py      # Custom sensors
    │   │   └── callbacks.py    # Alert callbacks
    │   ├── etl/
    │   │   ├── customers.py
    │   │   └── orders.py
    │   └── ml/
    │       └── training.py
    ├── plugins/
    │   └── custom_plugin.py
    ├── tests/
    │   ├── __init__.py
    │   ├── test_dags.py
    │   └── test_operators.py
    ├── docker-compose.yml
    └── requirements.txt
    

    Best Practices

    Do's

    • Use TaskFlow API - Cleaner code, automatic XCom
    • Set timeouts - Prevent zombie tasks
    • Use mode='reschedule' - For sensors, free up workers
    • Test DAGs - Unit tests and integration tests
    • Idempotent tasks - Safe to retry

    Don'ts

    • Don't use depends_on_past=True - Creates bottlenecks
    • Don't hardcode dates - Use {{ ds }} macros
    • Don't use global state - Tasks should be stateless
    • Don't skip catchup blindly - Understand implications
    • Don't put heavy logic in DAG file - Import from modules
    Recommended Servers
    MantleKit Launch Planner
    MantleKit Launch Planner
    OpenZeppelin
    OpenZeppelin
    Data Compliance Classifier MCP
    Data Compliance Classifier MCP
    Repository
    wshobson/agents
    Files