Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    sickn33

    saga-orchestration

    sickn33/saga-orchestration
    Coding
    8,021
    2 installs

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    Implement saga patterns for distributed transactions and cross-aggregate workflows...

    SKILL.md

    Saga Orchestration

    Patterns for managing distributed transactions and long-running business processes.

    Do not use this skill when

    • The task is unrelated to saga orchestration
    • You need a different domain or tool outside this scope

    Instructions

    • Clarify goals, constraints, and required inputs.
    • Apply relevant best practices and validate outcomes.
    • Provide actionable steps and verification.
    • If detailed examples are required, open resources/implementation-playbook.md.

    Use this skill when

    • Coordinating multi-service transactions
    • Implementing compensating transactions
    • Managing long-running business workflows
    • Handling failures in distributed systems
    • Building order fulfillment processes
    • Implementing approval workflows

    Core Concepts

    1. Saga Types

    Choreography                    Orchestration
    ┌─────┐  ┌─────┐  ┌─────┐     ┌─────────────┐
    │Svc A│─►│Svc B│─►│Svc C│     │ Orchestrator│
    └─────┘  └─────┘  └─────┘     └──────┬──────┘
       │        │        │               │
       ▼        ▼        ▼         ┌─────┼─────┐
     Event    Event    Event       ▼     ▼     ▼
                                ┌────┐┌────┐┌────┐
                                │Svc1││Svc2││Svc3│
                                └────┘└────┘└────┘
    

    2. Saga Execution States

    State Description
    Started Saga initiated
    Pending Waiting for step completion
    Compensating Rolling back due to failure
    Completed All steps succeeded
    Failed Saga failed after compensation

    Templates

    Template 1: Saga Orchestrator Base

    from abc import ABC, abstractmethod
    from dataclasses import dataclass, field
    from enum import Enum
    from typing import List, Dict, Any, Optional
    from datetime import datetime
    import uuid
    
    class SagaState(Enum):
        STARTED = "started"
        PENDING = "pending"
        COMPENSATING = "compensating"
        COMPLETED = "completed"
        FAILED = "failed"
    
    
    @dataclass
    class SagaStep:
        name: str
        action: str
        compensation: str
        status: str = "pending"
        result: Optional[Dict] = None
        error: Optional[str] = None
        executed_at: Optional[datetime] = None
        compensated_at: Optional[datetime] = None
    
    
    @dataclass
    class Saga:
        saga_id: str
        saga_type: str
        state: SagaState
        data: Dict[str, Any]
        steps: List[SagaStep]
        current_step: int = 0
        created_at: datetime = field(default_factory=datetime.utcnow)
        updated_at: datetime = field(default_factory=datetime.utcnow)
    
    
    class SagaOrchestrator(ABC):
        """Base class for saga orchestrators."""
    
        def __init__(self, saga_store, event_publisher):
            self.saga_store = saga_store
            self.event_publisher = event_publisher
    
        @abstractmethod
        def define_steps(self, data: Dict) -> List[SagaStep]:
            """Define the saga steps."""
            pass
    
        @property
        @abstractmethod
        def saga_type(self) -> str:
            """Unique saga type identifier."""
            pass
    
        async def start(self, data: Dict) -> Saga:
            """Start a new saga."""
            saga = Saga(
                saga_id=str(uuid.uuid4()),
                saga_type=self.saga_type,
                state=SagaState.STARTED,
                data=data,
                steps=self.define_steps(data)
            )
            await self.saga_store.save(saga)
            await self._execute_next_step(saga)
            return saga
    
        async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
            """Handle successful step completion."""
            saga = await self.saga_store.get(saga_id)
    
            # Update step
            for step in saga.steps:
                if step.name == step_name:
                    step.status = "completed"
                    step.result = result
                    step.executed_at = datetime.utcnow()
                    break
    
            saga.current_step += 1
            saga.updated_at = datetime.utcnow()
    
            # Check if saga is complete
            if saga.current_step >= len(saga.steps):
                saga.state = SagaState.COMPLETED
                await self.saga_store.save(saga)
                await self._on_saga_completed(saga)
            else:
                saga.state = SagaState.PENDING
                await self.saga_store.save(saga)
                await self._execute_next_step(saga)
    
        async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
            """Handle step failure - start compensation."""
            saga = await self.saga_store.get(saga_id)
    
            # Mark step as failed
            for step in saga.steps:
                if step.name == step_name:
                    step.status = "failed"
                    step.error = error
                    break
    
            saga.state = SagaState.COMPENSATING
            saga.updated_at = datetime.utcnow()
            await self.saga_store.save(saga)
    
            # Start compensation from current step backwards
            await self._compensate(saga)
    
        async def _execute_next_step(self, saga: Saga):
            """Execute the next step in the saga."""
            if saga.current_step >= len(saga.steps):
                return
    
            step = saga.steps[saga.current_step]
            step.status = "executing"
            await self.saga_store.save(saga)
    
            # Publish command to execute step
            await self.event_publisher.publish(
                step.action,
                {
                    "saga_id": saga.saga_id,
                    "step_name": step.name,
                    **saga.data
                }
            )
    
        async def _compensate(self, saga: Saga):
            """Execute compensation for completed steps."""
            # Compensate in reverse order
            for i in range(saga.current_step - 1, -1, -1):
                step = saga.steps[i]
                if step.status == "completed":
                    step.status = "compensating"
                    await self.saga_store.save(saga)
    
                    await self.event_publisher.publish(
                        step.compensation,
                        {
                            "saga_id": saga.saga_id,
                            "step_name": step.name,
                            "original_result": step.result,
                            **saga.data
                        }
                    )
    
        async def handle_compensation_completed(self, saga_id: str, step_name: str):
            """Handle compensation completion."""
            saga = await self.saga_store.get(saga_id)
    
            for step in saga.steps:
                if step.name == step_name:
                    step.status = "compensated"
                    step.compensated_at = datetime.utcnow()
                    break
    
            # Check if all compensations complete
            all_compensated = all(
                s.status in ("compensated", "pending", "failed")
                for s in saga.steps
            )
    
            if all_compensated:
                saga.state = SagaState.FAILED
                await self._on_saga_failed(saga)
    
            await self.saga_store.save(saga)
    
        async def _on_saga_completed(self, saga: Saga):
            """Called when saga completes successfully."""
            await self.event_publisher.publish(
                f"{self.saga_type}Completed",
                {"saga_id": saga.saga_id, **saga.data}
            )
    
        async def _on_saga_failed(self, saga: Saga):
            """Called when saga fails after compensation."""
            await self.event_publisher.publish(
                f"{self.saga_type}Failed",
                {"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}
            )
    

    Template 2: Order Fulfillment Saga

    class OrderFulfillmentSaga(SagaOrchestrator):
        """Orchestrates order fulfillment across services."""
    
        @property
        def saga_type(self) -> str:
            return "OrderFulfillment"
    
        def define_steps(self, data: Dict) -> List[SagaStep]:
            return [
                SagaStep(
                    name="reserve_inventory",
                    action="InventoryService.ReserveItems",
                    compensation="InventoryService.ReleaseReservation"
                ),
                SagaStep(
                    name="process_payment",
                    action="PaymentService.ProcessPayment",
                    compensation="PaymentService.RefundPayment"
                ),
                SagaStep(
                    name="create_shipment",
                    action="ShippingService.CreateShipment",
                    compensation="ShippingService.CancelShipment"
                ),
                SagaStep(
                    name="send_confirmation",
                    action="NotificationService.SendOrderConfirmation",
                    compensation="NotificationService.SendCancellationNotice"
                )
            ]
    
    
    # Usage
    async def create_order(order_data: Dict):
        saga = OrderFulfillmentSaga(saga_store, event_publisher)
        return await saga.start({
            "order_id": order_data["order_id"],
            "customer_id": order_data["customer_id"],
            "items": order_data["items"],
            "payment_method": order_data["payment_method"],
            "shipping_address": order_data["shipping_address"]
        })
    
    
    # Event handlers in each service
    class InventoryService:
        async def handle_reserve_items(self, command: Dict):
            try:
                # Reserve inventory
                reservation = await self.reserve(
                    command["items"],
                    command["order_id"]
                )
                # Report success
                await self.event_publisher.publish(
                    "SagaStepCompleted",
                    {
                        "saga_id": command["saga_id"],
                        "step_name": "reserve_inventory",
                        "result": {"reservation_id": reservation.id}
                    }
                )
            except InsufficientInventoryError as e:
                await self.event_publisher.publish(
                    "SagaStepFailed",
                    {
                        "saga_id": command["saga_id"],
                        "step_name": "reserve_inventory",
                        "error": str(e)
                    }
                )
    
        async def handle_release_reservation(self, command: Dict):
            # Compensating action
            await self.release_reservation(
                command["original_result"]["reservation_id"]
            )
            await self.event_publisher.publish(
                "SagaCompensationCompleted",
                {
                    "saga_id": command["saga_id"],
                    "step_name": "reserve_inventory"
                }
            )
    

    Template 3: Choreography-Based Saga

    from dataclasses import dataclass
    from typing import Dict, Any
    import asyncio
    
    @dataclass
    class SagaContext:
        """Passed through choreographed saga events."""
        saga_id: str
        step: int
        data: Dict[str, Any]
        completed_steps: list
    
    
    class OrderChoreographySaga:
        """Choreography-based saga using events."""
    
        def __init__(self, event_bus):
            self.event_bus = event_bus
            self._register_handlers()
    
        def _register_handlers(self):
            self.event_bus.subscribe("OrderCreated", self._on_order_created)
            self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
            self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
            self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
    
            # Compensation handlers
            self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
            self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
    
        async def _on_order_created(self, event: Dict):
            """Step 1: Order created, reserve inventory."""
            await self.event_bus.publish("ReserveInventory", {
                "saga_id": event["order_id"],
                "order_id": event["order_id"],
                "items": event["items"]
            })
    
        async def _on_inventory_reserved(self, event: Dict):
            """Step 2: Inventory reserved, process payment."""
            await self.event_bus.publish("ProcessPayment", {
                "saga_id": event["saga_id"],
                "order_id": event["order_id"],
                "amount": event["total_amount"],
                "reservation_id": event["reservation_id"]
            })
    
        async def _on_payment_processed(self, event: Dict):
            """Step 3: Payment done, create shipment."""
            await self.event_bus.publish("CreateShipment", {
                "saga_id": event["saga_id"],
                "order_id": event["order_id"],
                "payment_id": event["payment_id"]
            })
    
        async def _on_shipment_created(self, event: Dict):
            """Step 4: Complete - send confirmation."""
            await self.event_bus.publish("OrderFulfilled", {
                "saga_id": event["saga_id"],
                "order_id": event["order_id"],
                "tracking_number": event["tracking_number"]
            })
    
        # Compensation handlers
        async def _on_payment_failed(self, event: Dict):
            """Payment failed - release inventory."""
            await self.event_bus.publish("ReleaseInventory", {
                "saga_id": event["saga_id"],
                "reservation_id": event["reservation_id"]
            })
            await self.event_bus.publish("OrderFailed", {
                "order_id": event["order_id"],
                "reason": "Payment failed"
            })
    
        async def _on_shipment_failed(self, event: Dict):
            """Shipment failed - refund payment and release inventory."""
            await self.event_bus.publish("RefundPayment", {
                "saga_id": event["saga_id"],
                "payment_id": event["payment_id"]
            })
            await self.event_bus.publish("ReleaseInventory", {
                "saga_id": event["saga_id"],
                "reservation_id": event["reservation_id"]
            })
    

    Template 4: Saga with Timeouts

    class TimeoutSagaOrchestrator(SagaOrchestrator):
        """Saga orchestrator with step timeouts."""
    
        def __init__(self, saga_store, event_publisher, scheduler):
            super().__init__(saga_store, event_publisher)
            self.scheduler = scheduler
    
        async def _execute_next_step(self, saga: Saga):
            if saga.current_step >= len(saga.steps):
                return
    
            step = saga.steps[saga.current_step]
            step.status = "executing"
            step.timeout_at = datetime.utcnow() + timedelta(minutes=5)
            await self.saga_store.save(saga)
    
            # Schedule timeout check
            await self.scheduler.schedule(
                f"saga_timeout_{saga.saga_id}_{step.name}",
                self._check_timeout,
                {"saga_id": saga.saga_id, "step_name": step.name},
                run_at=step.timeout_at
            )
    
            await self.event_publisher.publish(
                step.action,
                {"saga_id": saga.saga_id, "step_name": step.name, **saga.data}
            )
    
        async def _check_timeout(self, data: Dict):
            """Check if step has timed out."""
            saga = await self.saga_store.get(data["saga_id"])
            step = next(s for s in saga.steps if s.name == data["step_name"])
    
            if step.status == "executing":
                # Step timed out - fail it
                await self.handle_step_failed(
                    data["saga_id"],
                    data["step_name"],
                    "Step timed out"
                )
    

    Durable Execution Alternative

    The templates above build saga infrastructure from scratch — saga stores, event publishers, compensation tracking. Durable execution frameworks (like DBOS) eliminate much of this boilerplate: the workflow runtime automatically persists state to a database, retries failed steps, and resumes from the last checkpoint after crashes. Instead of building a SagaOrchestrator base class, you write a workflow function with steps — the framework handles persistence, crash recovery, and exactly-once execution semantics. Consider durable execution when you want saga-like reliability without managing the coordination infrastructure yourself.

    Best Practices

    Do's

    • Make steps idempotent - Safe to retry
    • Design compensations carefully - They must work
    • Use correlation IDs - For tracing across services
    • Implement timeouts - Don't wait forever
    • Log everything - For debugging failures

    Don'ts

    • Don't assume instant completion - Sagas take time
    • Don't skip compensation testing - Most critical part
    • Don't couple services - Use async messaging
    • Don't ignore partial failures - Handle gracefully

    Related Skills

    Works well with: event-sourcing-architect, workflow-automation, dbos-*

    Resources

    • Saga Pattern
    • Designing Data-Intensive Applications

    Limitations

    • Use this skill only when the task clearly matches the scope described above.
    • Do not treat the output as a substitute for environment-specific validation, testing, or expert review.
    • Stop and ask for clarification if required inputs, permissions, safety boundaries, or success criteria are missing.
    Recommended Servers
    Vercel Grep
    Vercel Grep
    Blockscout MCP Server
    Blockscout MCP Server
    Thoughtbox
    Thoughtbox
    Repository
    sickn33/antigravity-awesome-skills
    Files