Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    Sahib-Sawhney-WH

    dapr-code-generator

    Sahib-Sawhney-WH/dapr-code-generator
    Coding
    3

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    Generate Python boilerplate code for DAPR microservices, workflows, and actors. Creates properly structured services with DAPR SDK integration, health endpoints, logging, and best practices...

    SKILL.md

    DAPR Code Generator

    This skill generates production-ready Python code for DAPR applications following best practices.

    When to Use

    Claude automatically uses this skill when:

    • User creates a new DAPR service
    • Adding DAPR features to existing code
    • Scaffolding microservice projects
    • Creating workflows or actors

    Code Templates

    FastAPI Microservice

    """
    {service_name} - DAPR-enabled FastAPI microservice
    """
    import json
    import logging
    from contextlib import asynccontextmanager
    from typing import Any
    
    from fastapi import FastAPI, HTTPException, Request
    from fastapi.responses import JSONResponse
    from pydantic import BaseModel
    from dapr.clients import DaprClient
    from dapr.ext.fastapi import DaprApp
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger(__name__)
    
    # Constants
    DAPR_STORE_NAME = "statestore"
    DAPR_PUBSUB_NAME = "pubsub"
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        """Application lifespan handler."""
        logger.info("Starting {service_name}")
        yield
        logger.info("Shutting down {service_name}")
    
    
    app = FastAPI(
        title="{service_name}",
        lifespan=lifespan
    )
    dapr_app = DaprApp(app)
    
    
    # Health endpoints
    @app.get("/health")
    async def health():
        """Health check endpoint."""
        return {"status": "healthy"}
    
    
    @app.get("/ready")
    async def ready():
        """Readiness check endpoint."""
        return {"status": "ready"}
    
    
    # DAPR State Management
    async def save_state(key: str, value: Any) -> None:
        """Save state to DAPR state store."""
        async with DaprClient() as client:
            await client.save_state(
                store_name=DAPR_STORE_NAME,
                key=key,
                value=json.dumps(value)
            )
            logger.info(f"State saved: {key}")
    
    
    async def get_state(key: str) -> Any:
        """Get state from DAPR state store."""
        async with DaprClient() as client:
            state = await client.get_state(
                store_name=DAPR_STORE_NAME,
                key=key
            )
            if state.data:
                return json.loads(state.data)
            return None
    
    
    # DAPR Pub/Sub
    async def publish_event(topic: str, data: Any) -> None:
        """Publish event to DAPR pub/sub."""
        async with DaprClient() as client:
            await client.publish_event(
                pubsub_name=DAPR_PUBSUB_NAME,
                topic_name=topic,
                data=json.dumps(data),
                data_content_type="application/json"
            )
            logger.info(f"Event published to {topic}")
    
    
    # DAPR Service Invocation
    async def invoke_service(app_id: str, method: str, data: Any = None) -> Any:
        """Invoke another DAPR service."""
        async with DaprClient() as client:
            response = await client.invoke_method(
                app_id=app_id,
                method_name=method,
                data=json.dumps(data) if data else None,
                content_type="application/json"
            )
            return response.json() if response.data else None
    
    
    # Example API endpoints - customize as needed
    class ItemModel(BaseModel):
        id: str
        name: str
        data: dict = {}
    
    
    @app.post("/items")
    async def create_item(item: ItemModel):
        """Create a new item."""
        await save_state(f"item-{item.id}", item.model_dump())
        await publish_event("items", {"action": "created", "item": item.model_dump()})
        return {"status": "created", "id": item.id}
    
    
    @app.get("/items/{item_id}")
    async def get_item(item_id: str):
        """Get an item by ID."""
        item = await get_state(f"item-{item_id}")
        if not item:
            raise HTTPException(status_code=404, detail="Item not found")
        return item
    
    
    # DAPR Pub/Sub subscription
    @dapr_app.subscribe(pubsub=DAPR_PUBSUB_NAME, topic="items")
    async def handle_item_event(event: dict):
        """Handle item events from pub/sub."""
        logger.info(f"Received event: {event}")
        return {"status": "SUCCESS"}
    
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000)
    

    Workflow Service

    """
    {workflow_name} - DAPR Workflow Service
    """
    import logging
    from datetime import timedelta
    from dapr.ext.workflow import (
        WorkflowRuntime,
        DaprWorkflowClient,
        DaprWorkflowContext,
        WorkflowActivityContext,
        RetryPolicy
    )
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize workflow runtime
    wf_runtime = WorkflowRuntime()
    
    # Retry policy for activities
    default_retry = RetryPolicy(
        max_number_of_attempts=3,
        first_retry_interval=timedelta(seconds=1),
        backoff_coefficient=2.0,
        max_retry_interval=timedelta(seconds=30)
    )
    
    
    # Workflow definition
    @wf_runtime.workflow(name="{workflow_name}")
    def {workflow_name}_workflow(ctx: DaprWorkflowContext, input_data: dict):
        """
        Main workflow definition.
    
        Args:
            ctx: Workflow context
            input_data: Input data for the workflow
    
        Returns:
            Workflow result
        """
        logger.info(f"Starting workflow with input: {input_data}")
    
        # Step 1: First activity
        step1_result = yield ctx.call_activity(
            activity=step1_activity,
            input=input_data
        )
        logger.info(f"Step 1 completed: {step1_result}")
    
        # Step 2: Second activity
        step2_result = yield ctx.call_activity(
            activity=step2_activity,
            input=step1_result
        )
        logger.info(f"Step 2 completed: {step2_result}")
    
        # Step 3: Final activity
        final_result = yield ctx.call_activity(
            activity=step3_activity,
            input=step2_result
        )
    
        return {"status": "completed", "result": final_result}
    
    
    # Activity definitions
    @wf_runtime.activity(name="step1_activity")
    def step1_activity(ctx: WorkflowActivityContext, data: dict) -> dict:
        """First step of the workflow."""
        logger.info(f"Executing step 1 with: {data}")
        # Your logic here
        return {"step": 1, "data": data}
    
    
    @wf_runtime.activity(name="step2_activity")
    def step2_activity(ctx: WorkflowActivityContext, data: dict) -> dict:
        """Second step of the workflow."""
        logger.info(f"Executing step 2 with: {data}")
        # Your logic here
        return {"step": 2, "data": data}
    
    
    @wf_runtime.activity(name="step3_activity")
    def step3_activity(ctx: WorkflowActivityContext, data: dict) -> dict:
        """Third step of the workflow."""
        logger.info(f"Executing step 3 with: {data}")
        # Your logic here
        return {"step": 3, "data": data}
    
    
    # Start workflow runtime
    if __name__ == "__main__":
        logger.info("Starting workflow runtime...")
        wf_runtime.start()
    
        # Keep the runtime running
        import time
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            logger.info("Shutting down workflow runtime...")
            wf_runtime.shutdown()
    

    Actor Service

    """
    {actor_name} - DAPR Virtual Actor
    """
    import logging
    from abc import abstractmethod
    from dapr.actor import Actor, ActorInterface, ActorProxy, Remindable, actormethod
    from dapr.actor.runtime.config import ActorRuntimeConfig, ActorTypeConfig
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    
    # Actor interface
    class {actor_name}Interface(ActorInterface):
        """Interface for {actor_name} actor."""
    
        @abstractmethod
        async def get_state(self) -> dict:
            """Get the current state of the actor."""
            ...
    
        @abstractmethod
        async def set_state(self, data: dict) -> None:
            """Set the state of the actor."""
            ...
    
        @abstractmethod
        async def process(self, input_data: dict) -> dict:
            """Process input data and return result."""
            ...
    
    
    # Actor implementation
    class {actor_name}(Actor, {actor_name}Interface, Remindable):
        """
        {actor_name} virtual actor implementation.
    
        This actor maintains state and can process requests.
        """
    
        def __init__(self, ctx, actor_id):
            super().__init__(ctx, actor_id)
            self._state = {}
    
        async def _on_activate(self) -> None:
            """Called when actor is activated."""
            logger.info(f"Actor {self.id} activated")
            # Load state if exists
            has_state, state = await self._state_manager.try_get_state("actor_state")
            if has_state:
                self._state = state
    
        async def _on_deactivate(self) -> None:
            """Called when actor is deactivated."""
            logger.info(f"Actor {self.id} deactivated")
    
        @actormethod(name="GetState")
        async def get_state(self) -> dict:
            """Get the current state of the actor."""
            return self._state
    
        @actormethod(name="SetState")
        async def set_state(self, data: dict) -> None:
            """Set the state of the actor."""
            self._state = data
            await self._state_manager.set_state("actor_state", self._state)
            await self._state_manager.save_state()
    
        @actormethod(name="Process")
        async def process(self, input_data: dict) -> dict:
            """Process input data and return result."""
            logger.info(f"Processing: {input_data}")
            # Your processing logic here
            result = {"processed": True, "input": input_data}
            return result
    
        async def receive_reminder(self, name: str, state: bytes, due_time, period) -> None:
            """Handle reminder callback."""
            logger.info(f"Reminder received: {name}")
            # Handle the reminder
    
    
    # Actor runtime configuration
    actor_runtime_config = ActorRuntimeConfig()
    actor_runtime_config.register_actor({actor_name})
    
    
    # FastAPI integration
    from fastapi import FastAPI
    from dapr.ext.fastapi import DaprActor
    
    app = FastAPI(title="{actor_name} Service")
    actor = DaprActor(app)
    
    
    @app.on_event("startup")
    async def startup():
        await actor.register_actor({actor_name})
    
    
    @app.get("/health")
    async def health():
        return {"status": "healthy"}
    
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000)
    

    Generated File Structure

    When generating a new service, create:

    {service_name}/
    ├── src/
    │   ├── __init__.py
    │   ├── main.py              # Main application
    │   ├── models.py            # Pydantic models
    │   ├── services.py          # Business logic
    │   └── config.py            # Configuration
    ├── components/
    │   ├── statestore.yaml      # State store component
    │   ├── pubsub.yaml          # Pub/sub component
    │   └── secrets.yaml         # Secrets component
    ├── tests/
    │   ├── __init__.py
    │   └── test_main.py         # Unit tests
    ├── dapr.yaml                # DAPR configuration
    ├── requirements.txt         # Python dependencies
    ├── Dockerfile               # Container image
    └── README.md                # Documentation
    

    Requirements Template

    # DAPR SDK
    dapr>=1.12.0
    dapr-ext-fastapi>=1.12.0
    dapr-ext-workflow>=0.4.0
    
    # Web framework
    fastapi>=0.104.0
    uvicorn>=0.24.0
    
    # Utilities
    pydantic>=2.0.0
    python-dotenv>=1.0.0
    
    # Observability
    opentelemetry-api>=1.20.0
    opentelemetry-sdk>=1.20.0
    opentelemetry-instrumentation-fastapi>=0.41b0
    

    Dockerfile Template

    FROM python:3.11-slim
    
    WORKDIR /app
    
    # Install dependencies
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    # Copy application
    COPY src/ ./src/
    
    # Run application
    EXPOSE 8000
    CMD ["python", "src/main.py"]
    
    Recommended Servers
    OpenZeppelin
    OpenZeppelin
    Apify
    Apify
    Vercel Grep
    Vercel Grep
    Repository
    sahib-sawhney-wh/sahibs-claude-plugin-marketplace
    Files