Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Give agents more agency

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    Azeem-2

    mcp-server

    Azeem-2/mcp-server
    DevOps

    About

    SKILL.md

    Install

    • Telegram
      Telegram
    • Slack
      Slack
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    • Download skill
    ├─
    ├─
    └─

    About

    Generic MCP (Model Context Protocol) server development patterns...

    SKILL.md

    Generic MCP Server Development

    This skill provides comprehensive patterns and reusable code for building MCP (Model Context Protocol) servers that can expose any domain operations as tools for AI agents. Follows 2025 best practices for performance, security, and maintainability.

    When to Use This Skill

    Use this skill when you need to:

    • Build an MCP server for any domain (not just todos)
    • Expose database operations as MCP tools
    • Create AI-agent accessible APIs
    • Implement async MCP tool handlers
    • Add proper error handling and validation
    • Support rate limiting and caching
    • Build enterprise-grade MCP servers
    • Integrate with multiple storage backends

    1. Generic MCP Server Architecture

    # mcp_server/core.py
    #!/usr/bin/env python3
    """
    Generic MCP Server Base Architecture
    Provides reusable patterns for any MCP server implementation
    """
    
    import asyncio
    import json
    import logging
    from abc import ABC, abstractmethod
    from datetime import datetime, timedelta
    from typing import Any, Dict, List, Optional, Sequence, Union, Callable
    from contextlib import asynccontextmanager
    from dataclasses import dataclass, field
    from enum import Enum
    
    import redis.asyncio as redis
    from mcp.server import Server, NotificationOptions, stdio
    from mcp.server.models import InitializationOptions
    from mcp.server.stdio import stdio_server
    from mcp.types import (
        Resource, Tool, TextContent, ImageContent, EmbeddedResource,
        LoggingLevel, CallToolRequest, EmptyResult,
        ListResourcesRequest, ListToolsRequest, ReadResourceRequest,
        GetPromptRequest, ListPromptsRequest
    )
    from pydantic import BaseModel, Field, validator
    import aiofiles
    import yaml
    from pathlib import Path
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger("mcp_server")
    
    class ServerConfig(BaseModel):
        """MCP Server configuration"""
        name: str
        version: str = "1.0.0"
        description: str
        debug: bool = False
        redis_url: Optional[str] = None
        rate_limit_requests: int = 100
        rate_limit_window: int = 60
        cache_ttl: int = 300
        max_retries: int = 3
        timeout: int = 30
    
        class Config:
            extra = "allow"
    
    @dataclass
    class RequestContext:
        """Request context for tool calls"""
        user_id: str
        session_id: Optional[str] = None
        metadata: Dict[str, Any] = field(default_factory=dict)
        timestamp: datetime = field(default_factory=datetime.utcnow)
    
    class RateLimiter:
        """Redis-based rate limiter for MCP operations"""
    
        def __init__(self, redis_url: str, requests: int, window: int):
            self.redis_url = redis_url
            self.requests = requests
            self.window = window
            self._redis = None
    
        async def _get_redis(self):
            if not self._redis:
                self._redis = await redis.from_url(self.redis_url)
            return self._redis
    
        async def is_allowed(self, key: str) -> bool:
            """Check if request is allowed"""
            r = await self._get_redis()
            current = await r.incr(f"rate_limit:{key}")
    
            if current == 1:
                await r.expire(f"rate_limit:{key}", self.window)
    
            return current <= self.requests
    
        async def get_remaining(self, key: str) -> int:
            """Get remaining requests"""
            r = await self._get_redis()
            current = await r.get(f"rate_limit:{key}")
            return max(0, self.requests - int(current or 0))
    
    class CacheManager:
        """Redis-based caching for MCP responses"""
    
        def __init__(self, redis_url: str, ttl: int = 300):
            self.redis_url = redis_url
            self.ttl = ttl
            self._redis = None
    
        async def _get_redis(self):
            if not self._redis:
                self._redis = await redis.from_url(self.redis_url)
            return self._redis
    
        def _make_key(self, tool_name: str, args: Dict[str, Any]) -> str:
            """Generate cache key from tool name and arguments"""
            import hashlib
            args_str = json.dumps(args, sort_keys=True)
            return f"cache:{tool_name}:{hashlib.md5(args_str.encode()).hexdigest()}"
    
        async def get(self, tool_name: str, args: Dict[str, Any]) -> Optional[Any]:
            """Get cached result"""
            r = await self._get_redis()
            key = self._make_key(tool_name, args)
            result = await r.get(key)
            return json.loads(result) if result else None
    
        async def set(self, tool_name: str, args: Dict[str, Any], value: Any):
            """Cache result"""
            r = await self._get_redis()
            key = self._make_key(tool_name, args)
            await r.setex(key, self.ttl, json.dumps(value))
    
    class BaseMCPServer:
        """Base MCP Server with common functionality"""
    
        def __init__(self, config: ServerConfig):
            self.config = config
            self.server = Server(config.name)
            self.tools: Dict[str, Callable] = {}
            self.rate_limiter: Optional[RateLimiter] = None
            self.cache: Optional[CacheManager] = None
    
            # Setup optional components
            if config.redis_url:
                self.rate_limiter = RateLimiter(
                    config.redis_url,
                    config.rate_limit_requests,
                    config.rate_limit_window
                )
                self.cache = CacheManager(
                    config.redis_url,
                    config.cache_ttl
                )
    
            # Register handlers
            self._register_handlers()
    
            logger.info(f"MCP Server '{config.name}' initialized")
    
        def _register_handlers(self):
            """Register MCP handlers"""
            @self.server.list_tools()
            async def handle_list_tools() -> List[Tool]:
                """Return list of available tools"""
                return await self.list_tools()
    
            @self.server.call_tool()
            async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
                """Handle tool call with rate limiting and caching"""
                return await self.call_tool(name, arguments)
    
        def register_tool(self, name: str, handler: Callable, schema: Dict[str, Any]):
            """Register a new tool"""
            self.tools[name] = {
                "handler": handler,
                "schema": schema
            }
            logger.info(f"Registered tool: {name}")
    
        async def list_tools(self) -> List[Tool]:
            """List all available tools"""
            tools = []
            for name, tool_info in self.tools.items():
                tools.append(Tool(
                    name=name,
                    description=tool_info["schema"].get("description", ""),
                    inputSchema=tool_info["schema"].get("inputSchema", {})
                ))
            return tools
    
        async def call_tool(self, name: str, arguments: Dict[str, Any]) -> List[TextContent]:
            """Execute a tool call with full middleware pipeline"""
            start_time = datetime.utcnow()
    
            try:
                # Extract context from arguments
                context = self._extract_context(arguments)
    
                # Rate limiting check
                if self.rate_limiter:
                    rate_key = f"{context.user_id}:{name}"
                    if not await self.rate_limiter.is_allowed(rate_key):
                        return [TextContent(
                            type="text",
                            text=json.dumps({
                                "status": "error",
                                "error": "Rate limit exceeded",
                                "remaining": await self.rate_limiter.get_remaining(rate_key)
                            })
                        )]
    
                # Check cache
                if self.cache and self._is_cacheable(name):
                    cached_result = await self.cache.get(name, arguments)
                    if cached_result:
                        logger.info(f"Cache hit for tool: {name}")
                        return [TextContent(
                            type="text",
                            text=json.dumps(cached_result)
                        )]
    
                # Validate tool exists
                if name not in self.tools:
                    raise ValueError(f"Unknown tool: {name}")
    
                # Validate arguments
                schema = self.tools[name]["schema"]
                self._validate_arguments(arguments, schema)
    
                # Execute tool
                handler = self.tools[name]["handler"]
                result = await self._execute_tool(handler, arguments, context)
    
                # Cache result if applicable
                if self.cache and self._is_cacheable(name) and result.get("status") != "error":
                    await self.cache.set(name, arguments, result)
    
                # Log execution
                duration = (datetime.utcnow() - start_time).total_seconds()
                logger.info(f"Tool {name} executed in {duration:.2f}s for user {context.user_id}")
    
                return [TextContent(
                    type="text",
                    text=json.dumps(result, default=str)
                )]
    
            except Exception as e:
                logger.error(f"Error executing tool {name}: {str(e)}", exc_info=True)
                duration = (datetime.utcnow() - start_time).total_seconds()
                logger.error(f"Tool {name} failed after {duration:.2f}s")
    
                return [TextContent(
                    type="text",
                    text=json.dumps({
                        "status": "error",
                        "error": str(e),
                        "tool": name,
                        "timestamp": datetime.utcnow().isoformat()
                    })
                )]
    
        def _extract_context(self, arguments: Dict[str, Any]) -> RequestContext:
            """Extract request context from arguments"""
            user_id = arguments.pop("_user_id", "anonymous")
            session_id = arguments.pop("_session_id", None)
            metadata = arguments.pop("_metadata", {})
    
            return RequestContext(
                user_id=user_id,
                session_id=session_id,
                metadata=metadata
            )
    
        def _validate_arguments(self, arguments: Dict[str, Any], schema: Dict[str, Any]):
            """Validate tool arguments against schema"""
            # Basic validation - can be extended with pydantic
            input_schema = schema.get("inputSchema", {})
            required = input_schema.get("required", [])
            properties = input_schema.get("properties", {})
    
            # Check required fields
            for field in required:
                if field not in arguments:
                    raise ValueError(f"Missing required field: {field}")
    
            # Validate field types
            for field, value in arguments.items():
                if field in properties:
                    field_schema = properties[field]
                    expected_type = field_schema.get("type")
    
                    if expected_type == "string" and not isinstance(value, str):
                        raise ValueError(f"Field {field} must be a string")
                    elif expected_type == "integer" and not isinstance(value, int):
                        raise ValueError(f"Field {field} must be an integer")
                    elif expected_type == "array" and not isinstance(value, list):
                        raise ValueError(f"Field {field} must be an array")
    
                    # Check enum values
                    if "enum" in field_schema and value not in field_schema["enum"]:
                        raise ValueError(f"Field {field} must be one of {field_schema['enum']}")
    
        def _is_cacheable(self, tool_name: str) -> bool:
            """Determine if tool result should be cached"""
            # Non-mutating operations are cacheable
            non_mutating = ["get", "list", "search", "find", "read"]
            return any(op in tool_name.lower() for op in non_mutating)
    
        async def _execute_tool(self, handler: Callable, arguments: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """Execute tool handler with error handling"""
            try:
                # Pass context to handler if it accepts it
                import inspect
                sig = inspect.signature(handler)
    
                if 'context' in sig.parameters:
                    result = await handler(arguments, context=context)
                else:
                    result = await handler(arguments)
    
                return result
    
            except Exception as e:
                logger.error(f"Tool handler failed: {str(e)}")
                return {
                    "status": "error",
                    "error": str(e),
                    "timestamp": datetime.utcnow().isoformat()
                }
    
        async def run(self):
            """Start the MCP server"""
            logger.info(f"Starting MCP server: {self.config.name}")
            async with stdio_server() as (read_stream, write_stream):
                await self.server.run(
                    read_stream,
                    write_stream,
                    InitializationOptions(
                        server_name=self.config.name,
                        server_version=self.config.version,
                        capabilities=self.server.get_capabilities(
                            notification_options=NotificationOptions(),
                            experimental_capabilities={},
                        )
                    )
                )
    
    def tool(
        name: Optional[str] = None,
        description: str = "",
        input_schema: Optional[Dict[str, Any]] = None
    ):
        """Decorator for registering MCP tools"""
        def decorator(func):
            tool_name = name or func.__name__
            schema = {
                "description": description or func.__doc__ or "",
                "inputSchema": input_schema or {}
            }
    
            # Store schema on function for later registration
            func._mcp_tool_schema = schema
            func._mcp_tool_name = tool_name
    
            return func
        return decorator
    

    2. Database Integration Patterns

    # mcp_server/database.py
    """
    Generic Database Integration for MCP Servers
    Supports multiple ORMs and connection patterns
    """
    
    import asyncio
    from abc import ABC, abstractmethod
    from contextlib import asynccontextmanager
    from typing import Any, Dict, List, Optional, TypeVar, Generic, Union
    from datetime import datetime
    import json
    
    from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime, Text, Boolean, select, update, delete, insert
    from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
    from sqlalchemy.orm import sessionmaker, declarative_base
    from sqlalchemy.pool import NullPool
    import asyncpg
    import motor.motor_asyncio
    from redis.asyncio import Redis
    
    # Type variables
    T = TypeVar('T')
    
    class DatabaseBackend(ABC):
        """Abstract base for database backends"""
    
        @abstractmethod
        async def connect(self):
            """Establish connection"""
            pass
    
        @abstractmethod
        async def disconnect(self):
            """Close connection"""
            pass
    
        @abstractmethod
        async def execute_query(self, query: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
            """Execute a query and return results"""
            pass
    
        @abstractmethod
        async def execute_command(self, command: str, params: Dict[str, Any] = None) -> Any:
            """Execute a command (INSERT, UPDATE, DELETE)"""
            pass
    
    class PostgresBackend(DatabaseBackend):
        """PostgreSQL backend using asyncpg"""
    
        def __init__(self, connection_string: str):
            self.connection_string = connection_string
            self.pool: Optional[asyncpg.Pool] = None
    
        async def connect(self):
            self.pool = await asyncpg.create_pool(
                self.connection_string,
                min_size=5,
                max_size=20,
                command_timeout=60
            )
    
        async def disconnect(self):
            if self.pool:
                await self.pool.close()
    
        async def execute_query(self, query: str, params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
            async with self.pool.acquire() as conn:
                rows = await conn.fetch(query, *params.values() if params else [])
                return [dict(row) for row in rows]
    
        async def execute_command(self, command: str, params: Dict[str, Any] = None) -> Any:
            async with self.pool.acquire() as conn:
                return await conn.execute(command, *params.values() if params else [])
    
    class SQLAlchemyBackend(DatabaseBackend):
        """SQLAlchemy backend for multiple databases"""
    
        def __init__(self, database_url: str, async_mode: bool = True):
            self.database_url = database_url
            self.async_mode = async_mode
            self.engine = None
            self.session_factory = None
    
        async def connect(self):
            if self.async_mode:
                self.engine = create_async_engine(
                    self.database_url,
                    pool_pre_ping=True,
                    pool_recycle=300,
                    echo=False
                )
                self.session_factory = async_sessionmaker(
                    self.engine,
                    class_=AsyncSession,
                    expire_on_commit=False
                )
            else:
                self.engine = create_engine(
                    self.database_url,
                    pool_pre_ping=True,
                    pool_recycle=300,
                    echo=False
                )
                self.session_factory = sessionmaker(
                    bind=self.engine,
                    expire_on_commit=False
                )
    
        async def disconnect(self):
            if self.engine:
                await self.engine.dispose()
    
        @asynccontextmanager
        async def get_session(self):
            """Get database session"""
            async with self.session_factory() as session:
                try:
                    yield session
                    if self.async_mode:
                        await session.commit()
                    else:
                        session.commit()
                except Exception:
                    if self.async_mode:
                        await session.rollback()
                    else:
                        session.rollback()
                    raise
                finally:
                    if self.async_mode:
                        await session.close()
                    else:
                        session.close()
    
        async def execute_query(self, query: Any, params: Dict[str, Any] = None) -> List[Dict[str, Any]]:
            """Execute SQLAlchemy query"""
            async with self.get_session() as session:
                if isinstance(query, str):
                    # Raw SQL query
                    result = await session.execute(query, params or {})
                    rows = result.fetchall()
                    return [dict(row._mapping) for row in rows]
                else:
                    # SQLAlchemy ORM query
                    result = await session.execute(query)
                    rows = result.fetchall()
                    return [dict(row._mapping) for row in rows]
    
        async def execute_command(self, command: Any, params: Dict[str, Any] = None) -> Any:
            """Execute SQLAlchemy command"""
            async with self.get_session() as session:
                if isinstance(command, str):
                    # Raw SQL command
                    result = await session.execute(command, params or {})
                    await session.commit()
                    return result
                else:
                    # SQLAlchemy ORM command
                    await session.execute(command, params or {})
                    await session.commit()
                    return None
    
    class MongoBackend(DatabaseBackend):
        """MongoDB backend using motor"""
    
        def __init__(self, connection_string: str, database_name: str):
            self.connection_string = connection_string
            self.database_name = database_name
            self.client = None
            self.db = None
    
        async def connect(self):
            self.client = motor.motor_asyncio.AsyncIOMotorClient(self.connection_string)
            self.db = self.client[self.database_name]
    
        async def disconnect(self):
            if self.client:
                self.client.close()
    
        async def execute_query(self, collection: str, query: Dict[str, Any] = None) -> List[Dict[str, Any]]:
            """Execute MongoDB find query"""
            cursor = self.db[collection].find(query or {})
            results = []
            async for document in cursor:
                # Convert ObjectId to string
                if '_id' in document:
                    document['_id'] = str(document['_id'])
                results.append(document)
            return results
    
        async def execute_command(self, operation: str, collection: str, data: Dict[str, Any]) -> Any:
            """Execute MongoDB command"""
            if operation == "insert":
                result = await self.db[collection].insert_one(data)
                return str(result.inserted_id)
            elif operation == "update":
                filter_ = data.pop("_filter")
                update_data = {"$set": data}
                result = await self.db[collection].update_one(filter_, update_data)
                return result.modified_count
            elif operation == "delete":
                result = await self.db[collection].delete_one(data)
                return result.deleted_count
    
    class DatabaseManager(Generic[T]):
        """Generic database manager for MCP servers"""
    
        def __init__(self, backend: DatabaseBackend):
            self.backend = backend
            self._connected = False
    
        async def connect(self):
            """Connect to database"""
            if not self._connected:
                await self.backend.connect()
                self._connected = True
    
        async def disconnect(self):
            """Disconnect from database"""
            if self._connected:
                await self.backend.disconnect()
                self._connected = False
    
        @asynccontextmanager
        async def transaction(self):
            """Database transaction context manager"""
            if hasattr(self.backend, 'get_session'):
                async with self.backend.get_session() as session:
                    yield session
            else:
                # For backends that don't support transactions
                yield self.backend
    
        async def find_one(self, table_or_collection: str, query: Dict[str, Any]) -> Optional[Dict[str, Any]]:
            """Find a single record"""
            if isinstance(self.backend, MongoBackend):
                results = await self.backend.execute_query(table_or_collection, query)
                return results[0] if results else None
            else:
                # SQL implementation
                where_clause = " AND ".join([f"{k} = :{k}" for k in query.keys()])
                sql = f"SELECT * FROM {table_or_collection} WHERE {where_clause} LIMIT 1"
                results = await self.backend.execute_query(sql, query)
                return results[0] if results else None
    
        async def find_many(
            self,
            table_or_collection: str,
            query: Dict[str, Any] = None,
            limit: int = None,
            offset: int = None,
            order_by: str = None
        ) -> List[Dict[str, Any]]:
            """Find multiple records"""
            query = query or {}
    
            if isinstance(self.backend, MongoBackend):
                cursor = self.backend.db[table_or_collection].find(query)
                if limit:
                    cursor = cursor.limit(limit)
                if offset:
                    cursor = cursor.skip(offset)
                if order_by:
                    # MongoDB sort format
                    sort_field, sort_dir = order_by.split()
                    cursor = cursor.sort([(sort_field, 1 if sort_dir == 'ASC' else -1)])
    
                results = []
                async for document in cursor:
                    if '_id' in document:
                        document['_id'] = str(document['_id'])
                    results.append(document)
                return results
            else:
                # SQL implementation
                where_clause = ""
                if query:
                    where_clause = "WHERE " + " AND ".join([f"{k} = :{k}" for k in query.keys()])
    
                sql = f"SELECT * FROM {table_or_collection} {where_clause}"
    
                if order_by:
                    sql += f" ORDER BY {order_by}"
    
                if limit:
                    sql += f" LIMIT {limit}"
    
                if offset:
                    sql += f" OFFSET {offset}"
    
                return await self.backend.execute_query(sql, query)
    
        async def create(self, table_or_collection: str, data: Dict[str, Any]) -> Any:
            """Create a new record"""
            data = data.copy()
    
            # Add timestamps
            data['created_at'] = datetime.utcnow()
            data['updated_at'] = datetime.utcnow()
    
            if isinstance(self.backend, MongoBackend):
                return await self.backend.execute_command("insert", table_or_collection, data)
            else:
                # SQL implementation
                columns = list(data.keys())
                placeholders = [f":{col}" for col in columns]
                sql = f"INSERT INTO {table_or_collection} ({', '.join(columns)}) VALUES ({', '.join(placeholders)})"
                return await self.backend.execute_command(sql, data)
    
        async def update(self, table_or_collection: str, query: Dict[str, Any], data: Dict[str, Any]) -> int:
            """Update records"""
            data = data.copy()
            data['updated_at'] = datetime.utcnow()
    
            if isinstance(self.backend, MongoBackend):
                data['_filter'] = query
                return await self.backend.execute_command("update", table_or_collection, data)
            else:
                # SQL implementation
                where_clause = " AND ".join([f"{k} = :{k}" for k in query.keys()])
                set_clause = ", ".join([f"{k} = :update_{k}" for k in data.keys()])
    
                # Prefix update params to avoid conflicts
                update_params = {f"update_{k}": v for k, v in data.items()}
                params = {**query, **update_params}
    
                sql = f"UPDATE {table_or_collection} SET {set_clause} WHERE {where_clause}"
                result = await self.backend.execute_command(sql, params)
                return result.rowcount if hasattr(result, 'rowcount') else 0
    
        async def delete(self, table_or_collection: str, query: Dict[str, Any]) -> int:
            """Delete records"""
            if isinstance(self.backend, MongoBackend):
                return await self.backend.execute_command("delete", table_or_collection, query)
            else:
                # SQL implementation
                where_clause = " AND ".join([f"{k} = :{k}" for k in query.keys()])
                sql = f"DELETE FROM {table_or_collection} WHERE {where_clause}"
                result = await self.backend.execute_command(sql, query)
                return result.rowcount if hasattr(result, 'rowcount') else 0
    
        async def count(self, table_or_collection: str, query: Dict[str, Any] = None) -> int:
            """Count records"""
            query = query or {}
    
            if isinstance(self.backend, MongoBackend):
                return await self.backend.db[table_or_collection].count_documents(query)
            else:
                # SQL implementation
                where_clause = ""
                if query:
                    where_clause = "WHERE " + " AND ".join([f"{k} = :{k}" for k in query.keys()])
    
                sql = f"SELECT COUNT(*) as count FROM {table_or_collection} {where_clause}"
                results = await self.backend.execute_query(sql, query)
                return results[0]['count'] if results else 0
    
    # Factory function for creating database managers
    def create_database_manager(database_url: str, backend_type: str = "auto") -> DatabaseManager:
        """Create database manager based on URL or backend type"""
    
        if backend_type == "auto":
            if database_url.startswith("postgresql+asyncpg://"):
                backend = SQLAlchemyBackend(database_url, async_mode=True)
            elif database_url.startswith("mongodb://"):
                import re
                match = re.match(r'mongodb://[^/]+/([^?]*)', database_url)
                db_name = match.group(1) if match else "default"
                backend = MongoBackend(database_url, db_name)
            elif database_url.startswith("postgresql://"):
                backend = PostgresBackend(database_url)
            else:
                backend = SQLAlchemyBackend(database_url, async_mode=True)
        else:
            if backend_type == "postgres":
                backend = PostgresBackend(database_url)
            elif backend_type == "mongodb":
                db_name = database_url.split("/")[-1].split("?")[0]
                backend = MongoBackend(database_url, db_name)
            elif backend_type == "sqlalchemy":
                backend = SQLAlchemyBackend(database_url)
            else:
                raise ValueError(f"Unknown backend type: {backend_type}")
    
        return DatabaseManager(backend)
    

    3. Tool Implementation Patterns

    # mcp_server/tools.py
    """
    Generic MCP Tool Implementation Patterns
    """
    
    from typing import Any, Dict, List, Optional, Union, Callable
    from datetime import datetime, timedelta
    import json
    import uuid
    from dataclasses import dataclass, field
    
    from .core import BaseMCPServer, tool, RequestContext
    from .database import DatabaseManager
    
    class BaseTool:
        """Base class for MCP tools"""
    
        def __init__(self, db_manager: DatabaseManager, cache=None):
            self.db_manager = db_manager
            self.cache = cache
    
        async def execute(self, args: Dict[str, Any], context: RequestContext = None) -> Dict[str, Any]:
            """Execute the tool logic"""
            raise NotImplementedError
    
        def _validate_permissions(self, context: RequestContext, required_permission: str = None) -> bool:
            """Validate user permissions"""
            # Implement permission checking logic
            return True
    
    class CRUDBaseTool(BaseTool):
        """Base CRUD tool for any entity"""
    
        def __init__(self, table_name: str, db_manager: DatabaseManager, schema: Dict[str, Any]):
            super().__init__(db_manager)
            self.table_name = table_name
            self.schema = schema
            self.entity_name = table_name.rstrip('s')  # Remove plural 's'
    
        async def create(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """Create entity"""
            try:
                # Validate against schema
                validated_data = self._validate_data(args, for_create=True)
    
                # Add user context
                if context:
                    validated_data['user_id'] = context.user_id
                    if context.session_id:
                        validated_data['session_id'] = context.session_id
    
                # Insert into database
                result = await self.db_manager.create(self.table_name, validated_data)
    
                return {
                    "status": "created",
                    "id": result,
                    "entity": self.entity_name,
                    "timestamp": datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "entity": self.entity_name,
                    "operation": "create"
                }
    
        async def get(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """Get entity by ID"""
            try:
                entity_id = args.get("id")
                if not entity_id:
                    raise ValueError("Missing required field: id")
    
                # Add user filter for security
                query = {"id": entity_id}
                if context and not self._validate_permissions(context, "read_all"):
                    query["user_id"] = context.user_id
    
                result = await self.db_manager.find_one(self.table_name, query)
    
                if not result:
                    return {
                        "status": "not_found",
                        "entity": self.entity_name,
                        "id": entity_id
                    }
    
                return {
                    "status": "success",
                    "entity": self.entity_name,
                    "data": self._serialize_data(result)
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "entity": self.entity_name,
                    "operation": "get"
                }
    
        async def list(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """List entities with filtering"""
            try:
                # Build query from args
                query = {}
                filters = args.get("filters", {})
                limit = args.get("limit", 20)
                offset = args.get("offset", 0)
                order_by = args.get("order_by", "created_at DESC")
    
                # Add user filter for security
                if context and not self._validate_permissions(context, "read_all"):
                    query["user_id"] = context.user_id
    
                # Apply additional filters
                query.update(filters)
    
                # Fetch from database
                results = await self.db_manager.find_many(
                    self.table_name,
                    query=query,
                    limit=limit,
                    offset=offset,
                    order_by=order_by
                )
    
                # Get total count
                total = await self.db_manager.count(self.table_name, query)
    
                return {
                    "status": "success",
                    "entity": self.entity_name,
                    "data": [self._serialize_data(r) for r in results],
                    "pagination": {
                        "total": total,
                        "limit": limit,
                        "offset": offset,
                        "has_more": offset + limit < total
                    }
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "entity": self.entity_name,
                    "operation": "list"
                }
    
        async def update(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """Update entity"""
            try:
                entity_id = args.pop("id", None)
                if not entity_id:
                    raise ValueError("Missing required field: id")
    
                # Validate update data
                update_data = self._validate_data(args, for_create=False)
    
                # Build query filter
                query = {"id": entity_id}
                if context and not self._validate_permissions(context, "update_all"):
                    query["user_id"] = context.user_id
    
                # Update in database
                affected = await self.db_manager.update(self.table_name, query, update_data)
    
                if affected == 0:
                    return {
                        "status": "not_found",
                        "entity": self.entity_name,
                        "id": entity_id
                    }
    
                return {
                    "status": "updated",
                    "entity": self.entity_name,
                    "id": entity_id,
                    "affected_rows": affected,
                    "timestamp": datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "entity": self.entity_name,
                    "operation": "update"
                }
    
        async def delete(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """Delete entity"""
            try:
                entity_id = args.get("id")
                if not entity_id:
                    raise ValueError("Missing required field: id")
    
                # Build query filter
                query = {"id": entity_id}
                if context and not self._validate_permissions(context, "delete_all"):
                    query["user_id"] = context.user_id
    
                # Delete from database
                affected = await self.db_manager.delete(self.table_name, query)
    
                if affected == 0:
                    return {
                        "status": "not_found",
                        "entity": self.entity_name,
                        "id": entity_id
                    }
    
                return {
                    "status": "deleted",
                    "entity": self.entity_name,
                    "id": entity_id,
                    "affected_rows": affected,
                    "timestamp": datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "entity": self.entity_name,
                    "operation": "delete"
                }
    
        def _validate_data(self, data: Dict[str, Any], for_create: bool = False) -> Dict[str, Any]:
            """Validate data against schema"""
            validated = {}
            schema_fields = self.schema.get("properties", {})
            required_fields = self.schema.get("required", [])
    
            # Check required fields for create
            if for_create:
                for field in required_fields:
                    if field not in data:
                        raise ValueError(f"Missing required field: {field}")
    
            # Validate each field
            for field, value in data.items():
                if field not in schema_fields:
                    continue  # Skip unknown fields or raise error based on strictness
    
                field_schema = schema_fields[field]
                field_type = field_schema.get("type")
    
                # Type validation
                if field_type == "string":
                    if not isinstance(value, str):
                        raise ValueError(f"Field {field} must be a string")
                    # Check min/max length
                    if "minLength" in field_schema and len(value) < field_schema["minLength"]:
                        raise ValueError(f"Field {field} is too short")
                    if "maxLength" in field_schema and len(value) > field_schema["maxLength"]:
                        raise ValueError(f"Field {field} is too long")
                elif field_type == "integer":
                    if not isinstance(value, int):
                        raise ValueError(f"Field {field} must be an integer")
                    # Check min/max value
                    if "minimum" in field_schema and value < field_schema["minimum"]:
                        raise ValueError(f"Field {field} is too small")
                    if "maximum" in field_schema and value > field_schema["maximum"]:
                        raise ValueError(f"Field {field} is too large")
                elif field_type == "array":
                    if not isinstance(value, list):
                        raise ValueError(f"Field {field} must be an array")
    
                # Check enum values
                if "enum" in field_schema and value not in field_schema["enum"]:
                    raise ValueError(f"Field {field} must be one of {field_schema['enum']}")
    
                validated[field] = value
    
            return validated
    
        def _serialize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
            """Serialize data for output"""
            serialized = data.copy()
    
            # Handle datetime serialization
            for key, value in serialized.items():
                if isinstance(value, datetime):
                    serialized[key] = value.isoformat()
                elif isinstance(value, dict):
                    # Convert complex types to JSON string
                    try:
                        json.dumps(value)
                    except TypeError:
                        serialized[key] = str(value)
    
            return serialized
    
    class BulkOperationTool(BaseTool):
        """Tool for bulk operations on entities"""
    
        def __init__(self, table_name: str, db_manager: DatabaseManager, schema: Dict[str, Any]):
            super().__init__(db_manager)
            self.table_name = table_name
            self.schema = schema
            self.entity_name = table_name.rstrip('s')
    
        async def bulk_create(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """Bulk create entities"""
            try:
                items = args.get("items", [])
                if not items:
                    raise ValueError("No items provided for bulk create")
    
                # Validate all items
                validated_items = []
                for item in items:
                    validated = self._validate_item(item)
                    if context:
                        validated["user_id"] = context.user_id
                    validated_items.append(validated)
    
                # Insert all items
                results = []
                for item in validated_items:
                    result = await self.db_manager.create(self.table_name, item)
                    results.append(result)
    
                return {
                    "status": "created",
                    "entity": self.entity_name,
                    "count": len(results),
                    "ids": results,
                    "timestamp": datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "entity": self.entity_name,
                    "operation": "bulk_create"
                }
    
        async def bulk_update(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """Bulk update entities"""
            try:
                updates = args.get("updates", [])
                if not updates:
                    raise ValueError("No updates provided")
    
                total_affected = 0
                for update in updates:
                    entity_id = update.get("id")
                    update_data = update.get("data", {})
    
                    if not entity_id:
                        continue
    
                    # Build query
                    query = {"id": entity_id}
                    if context:
                        query["user_id"] = context.user_id
    
                    # Update
                    affected = await self.db_manager.update(self.table_name, query, update_data)
                    total_affected += affected
    
                return {
                    "status": "updated",
                    "entity": self.entity_name,
                    "affected_rows": total_affected,
                    "updates_processed": len(updates),
                    "timestamp": datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "entity": self.entity_name,
                    "operation": "bulk_update"
                }
    
        async def bulk_delete(self, args: Dict[str, Any], context: RequestContext) -> Dict[str, Any]:
            """Bulk delete entities"""
            try:
                ids = args.get("ids", [])
                if not ids:
                    raise ValueError("No IDs provided for bulk delete")
    
                total_affected = 0
                for entity_id in ids:
                    # Build query
                    query = {"id": entity_id}
                    if context:
                        query["user_id"] = context.user_id
    
                    # Delete
                    affected = await self.db_manager.delete(self.table_name, query)
                    total_affected += affected
    
                return {
                    "status": "deleted",
                    "entity": self.entity_name,
                    "affected_rows": total_affected,
                    "ids_processed": len(ids),
                    "timestamp": datetime.utcnow().isoformat()
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "entity": self.entity_name,
                    "operation": "bulk_delete"
                }
    
        def _validate_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
            """Validate a single item"""
            # Use CRUD base tool validation
            crud_tool = CRUDBaseTool(self.table_name, self.db_manager, self.schema)
            return crud_tool._validate_data(item, for_create=True)
    

    4. Example: Building a Generic Task Management MCP Server

    # examples/task_mcp_server.py
    """
    Example: Task Management MCP Server using generic patterns
    """
    
    import os
    from typing import Dict, Any
    
    from mcp_server.core import BaseMCPServer, ServerConfig, tool
    from mcp_server.database import create_database_manager
    from mcp_server.tools import CRUDBaseTool, BulkOperationTool
    
    # Server configuration
    config = ServerConfig(
        name="task-manager",
        version="1.0.0",
        description="Generic task management MCP server",
        redis_url=os.getenv("REDIS_URL", "redis://localhost:6379"),
        database_url=os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost/tasks")
    )
    
    # Task entity schema
    TASK_SCHEMA = {
        "type": "object",
        "properties": {
            "title": {
                "type": "string",
                "minLength": 1,
                "maxLength": 200,
                "description": "Task title"
            },
            "description": {
                "type": "string",
                "maxLength": 1000,
                "description": "Task description"
            },
            "priority": {
                "type": "string",
                "enum": ["low", "medium", "high"],
                "default": "medium",
                "description": "Task priority"
            },
            "status": {
                "type": "string",
                "enum": ["todo", "in_progress", "completed"],
                "default": "todo",
                "description": "Task status"
            },
            "due_date": {
                "type": "string",
                "format": "date-time",
                "description": "Optional due date"
            },
            "tags": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Task tags"
            }
        },
        "required": ["title"]
    }
    
    class TaskMCPServer(BaseMCPServer):
        """Task Management MCP Server"""
    
        def __init__(self, config: ServerConfig):
            super().__init__(config)
    
            # Initialize database
            self.db_manager = create_database_manager(config.database_url)
    
            # Initialize tools
            self.task_tool = CRUDBaseTool("tasks", self.db_manager, TASK_SCHEMA)
            self.bulk_tool = BulkOperationTool("tasks", self.db_manager, TASK_SCHEMA)
    
            # Register tools
            self._register_task_tools()
    
        def _register_task_tools(self):
            """Register all task-related tools"""
    
            # Create task
            self.register_tool(
                "create_task",
                self.task_tool.create,
                {
                    "description": "Create a new task",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "title": {"type": "string", "description": "Task title"},
                            "description": {"type": "string", "description": "Optional description"},
                            "priority": {"type": "string", "enum": ["low", "medium", "high"]},
                            "due_date": {"type": "string", "format": "date-time"},
                            "tags": {"type": "array", "items": {"type": "string"}}
                        },
                        "required": ["title"]
                    }
                }
            )
    
            # Get task
            self.register_tool(
                "get_task",
                self.task_tool.get,
                {
                    "description": "Get a task by ID",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "id": {"type": "integer", "description": "Task ID"}
                        },
                        "required": ["id"]
                    }
                }
            )
    
            # List tasks
            self.register_tool(
                "list_tasks",
                self.task_tool.list,
                {
                    "description": "List tasks with optional filtering",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "filters": {"type": "object", "description": "Filter criteria"},
                            "limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 20},
                            "offset": {"type": "integer", "minimum": 0, "default": 0},
                            "order_by": {"type": "string", "description": "Order by field (e.g., 'created_at DESC')"}
                        }
                    }
                }
            )
    
            # Update task
            self.register_tool(
                "update_task",
                self.task_tool.update,
                {
                    "description": "Update a task",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "id": {"type": "integer", "description": "Task ID"},
                            "title": {"type": "string", "description": "New title"},
                            "description": {"type": "string", "description": "New description"},
                            "priority": {"type": "string", "enum": ["low", "medium", "high"]},
                            "status": {"type": "string", "enum": ["todo", "in_progress", "completed"]},
                            "due_date": {"type": "string", "format": "date-time"},
                            "tags": {"type": "array", "items": {"type": "string"}}
                        },
                        "required": ["id"]
                    }
                }
            )
    
            # Delete task
            self.register_tool(
                "delete_task",
                self.task_tool.delete,
                {
                    "description": "Delete a task",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "id": {"type": "integer", "description": "Task ID"}
                        },
                        "required": ["id"]
                    }
                }
            )
    
            # Bulk create
            self.register_tool(
                "bulk_create_tasks",
                self.bulk_tool.bulk_create,
                {
                    "description": "Create multiple tasks at once",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "items": {
                                "type": "array",
                                "items": {
                                    "type": "object",
                                    "properties": {
                                        "title": {"type": "string"},
                                        "description": {"type": "string"},
                                        "priority": {"type": "string", "enum": ["low", "medium", "high"]},
                                        "tags": {"type": "array", "items": {"type": "string"}}
                                    },
                                    "required": ["title"]
                                }
                            }
                        },
                        "required": ["items"]
                    }
                }
            )
    
            # Search tasks
            self.register_tool(
                "search_tasks",
                self._search_tasks,
                {
                    "description": "Search tasks by text query",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "description": "Search query"},
                            "limit": {"type": "integer", "minimum": 1, "maximum": 50, "default": 20}
                        },
                        "required": ["query"]
                    }
                }
            )
    
        async def _search_tasks(self, args: Dict[str, Any], context) -> Dict[str, Any]:
            """Search tasks by text"""
            try:
                query = args.get("query", "")
                limit = args.get("limit", 20)
    
                # Build search query
                if isinstance(self.db_manager.backend, MongoBackend):
                    # MongoDB text search
                    search_query = {
                        "$text": {"$search": query}
                    }
                    if context and not self.task_tool._validate_permissions(context, "read_all"):
                        search_query["user_id"] = context.user_id
    
                    results = await self.db_manager.find_many("tasks", search_query, limit=limit)
                else:
                    # PostgreSQL full-text search
                    sql = """
                    SELECT * FROM tasks
                    WHERE to_tsvector('english', title || ' ' || COALESCE(description, '')) @@ plainto_tsquery('english', :query)
                    """
                    params = {"query": query}
    
                    if context and not self.task_tool._validate_permissions(context, "read_all"):
                        sql += " AND user_id = :user_id"
                        params["user_id"] = context.user_id
    
                    sql += f" LIMIT {limit}"
    
                    results = await self.db_manager.execute_query(sql, params)
    
                return {
                    "status": "success",
                    "entity": "task",
                    "data": [self.task_tool._serialize_data(r) for r in results],
                    "query": query,
                    "count": len(results)
                }
    
            except Exception as e:
                return {
                    "status": "error",
                    "error": str(e),
                    "operation": "search_tasks"
                }
    
    # Main execution
    async def main():
        """Start the Task MCP Server"""
        server = TaskMCPServer(config)
        await server.run()
    
    if __name__ == "__main__":
        import asyncio
        asyncio.run(main())
    

    5. Testing Patterns

    # tests/test_mcp_server.py
    """
    Generic MCP Server Testing Patterns
    """
    
    import pytest
    import asyncio
    from typing import Dict, Any, List
    from unittest.mock import Mock, AsyncMock
    
    from mcp_server.core import BaseMCPServer, ServerConfig, RequestContext
    from mcp_server.database import DatabaseManager, create_database_manager
    from mcp_server.tools import CRUDBaseTool
    
    class MockDatabaseManager:
        """Mock database manager for testing"""
    
        def __init__(self):
            self.data = {}
            self.next_id = 1
    
        async def create(self, table: str, data: Dict[str, Any]) -> int:
            """Mock create"""
            entity_id = self.next_id
            data['id'] = entity_id
            self.data[f"{table}:{entity_id}"] = data
            self.next_id += 1
            return entity_id
    
        async def find_one(self, table: str, query: Dict[str, Any]) -> Dict[str, Any]:
            """Mock find one"""
            for key, value in self.data.items():
                table_name, entity_id = key.split(":")
                if table_name == table:
                    match = True
                    for k, v in query.items():
                        if value.get(k) != v:
                            match = False
                            break
                    if match:
                        return value
            return None
    
        async def find_many(self, table: str, query: Dict[str, Any], limit: int = None) -> List[Dict[str, Any]]:
            """Mock find many"""
            results = []
            for key, value in self.data.items():
                table_name, entity_id = key.split(":")
                if table_name == table:
                    match = True
                    for k, v in query.items():
                        if value.get(k) != v:
                            match = False
                            break
                    if match:
                        results.append(value)
                        if limit and len(results) >= limit:
                            break
            return results
    
        async def update(self, table: str, query: Dict[str, Any], data: Dict[str, Any]) -> int:
            """Mock update"""
            count = 0
            for key, value in self.data.items():
                table_name, entity_id = key.split(":")
                if table_name == table:
                    match = True
                    for k, v in query.items():
                        if value.get(k) != v:
                            match = False
                            break
                    if match:
                        value.update(data)
                        count += 1
            return count
    
        async def delete(self, table: str, query: Dict[str, Any]) -> int:
            """Mock delete"""
            to_delete = []
            for key, value in self.data.items():
                table_name, entity_id = key.split(":")
                if table_name == table:
                    match = True
                    for k, v in query.items():
                        if value.get(k) != v:
                            match = False
                            break
                    if match:
                        to_delete.append(key)
    
            for key in to_delete:
                del self.data[key]
    
            return len(to_delete)
    
    @pytest.fixture
    async def mock_db():
        """Mock database manager fixture"""
        return MockDatabaseManager()
    
    @pytest.fixture
    def test_schema():
        """Test entity schema"""
        return {
            "type": "object",
            "properties": {
                "name": {"type": "string", "minLength": 1},
                "value": {"type": "integer", "minimum": 0},
                "status": {"type": "string", "enum": ["active", "inactive"]},
                "tags": {"type": "array", "items": {"type": "string"}}
            },
            "required": ["name"]
        }
    
    @pytest.fixture
    def crud_tool(mock_db, test_schema):
        """CRUD tool fixture"""
        return CRUDBaseTool("test_entities", mock_db, test_schema)
    
    @pytest.fixture
    def user_context():
        """User context fixture"""
        return RequestContext(user_id="test_user", session_id="test_session")
    
    class TestCRUDTool:
        """Test CRUD tool operations"""
    
        @pytest.mark.asyncio
        async def test_create_entity(self, crud_tool, user_context):
            """Test entity creation"""
            args = {
                "name": "Test Entity",
                "value": 100,
                "status": "active",
                "tags": ["test", "example"]
            }
    
            result = await crud_tool.create(args, user_context)
    
            assert result["status"] == "created"
            assert "id" in result
            assert result["entity"] == "test_entity"
            assert "timestamp" in result
    
        @pytest.mark.asyncio
        async def test_create_missing_required(self, crud_tool, user_context):
            """Test creation with missing required field"""
            args = {"value": 100}  # Missing 'name'
    
            result = await crud_tool.create(args, user_context)
    
            assert result["status"] == "error"
            assert "Missing required field" in result["error"]
    
        @pytest.mark.asyncio
        async def test_get_entity(self, crud_tool, user_context):
            """Test getting an entity"""
            # First create an entity
            create_args = {"name": "Test Get"}
            create_result = await crud_tool.create(create_args, user_context)
            entity_id = create_result["id"]
    
            # Get the entity
            get_args = {"id": entity_id}
            result = await crud_tool.get(get_args, user_context)
    
            assert result["status"] == "success"
            assert result["data"]["name"] == "Test Get"
            assert result["data"]["id"] == entity_id
    
        @pytest.mark.asyncio
        async def test_get_not_found(self, crud_tool, user_context):
            """Test getting non-existent entity"""
            args = {"id": 99999}
            result = await crud_tool.get(args, user_context)
    
            assert result["status"] == "not_found"
    
        @pytest.mark.asyncio
        async def test_list_entities(self, crud_tool, user_context):
            """Test listing entities"""
            # Create a few entities
            for i in range(3):
                args = {"name": f"Entity {i}"}
                await crud_tool.create(args, user_context)
    
            # List entities
            result = await crud_tool.list({}, user_context)
    
            assert result["status"] == "success"
            assert len(result["data"]) == 3
            assert "pagination" in result
            assert result["pagination"]["total"] == 3
    
        @pytest.mark.asyncio
        async def test_list_with_filters(self, crud_tool, user_context):
            """Test listing with filters"""
            # Create entities with different statuses
            await crud_tool.create({"name": "Active 1", "status": "active"}, user_context)
            await crud_tool.create({"name": "Inactive 1", "status": "inactive"}, user_context)
            await crud_tool.create({"name": "Active 2", "status": "active"}, user_context)
    
            # Filter by status
            result = await crud_tool.list(
                {"filters": {"status": "active"}},
                user_context
            )
    
            assert result["status"] == "success"
            assert all(entity["status"] == "active" for entity in result["data"])
    
        @pytest.mark.asyncio
        async def test_update_entity(self, crud_tool, user_context):
            """Test updating an entity"""
            # Create entity
            create_result = await crud_tool.create({"name": "Original"}, user_context)
            entity_id = create_result["id"]
    
            # Update entity
            update_args = {
                "id": entity_id,
                "name": "Updated",
                "value": 200
            }
            result = await crud_tool.update(update_args, user_context)
    
            assert result["status"] == "updated"
            assert result["affected_rows"] == 1
    
        @pytest.mark.asyncio
        async def test_delete_entity(self, crud_tool, user_context):
            """Test deleting an entity"""
            # Create entity
            create_result = await crud_tool.create({"name": "To Delete"}, user_context)
            entity_id = create_result["id"]
    
            # Delete entity
            delete_args = {"id": entity_id}
            result = await crud_tool.delete(delete_args, user_context)
    
            assert result["status"] == "deleted"
            assert result["affected_rows"] == 1
    
            # Verify deletion
            get_result = await crud_tool.get({"id": entity_id}, user_context)
            assert get_result["status"] == "not_found"
    
    class TestMCPServer:
        """Test MCP Server functionality"""
    
        @pytest.mark.asyncio
        async def test_server_initialization(self):
            """Test server initialization"""
            config = ServerConfig(
                name="test-server",
                description="Test MCP Server"
            )
    
            server = BaseMCPServer(config)
    
            assert server.config.name == "test-server"
            assert server.server.name == "test-server"
            assert len(server.tools) == 0
    
        @pytest.mark.asyncio
        async def test_tool_registration(self):
            """Test tool registration"""
            config = ServerConfig(name="test-server", description="Test")
            server = BaseMCPServer(config)
    
            # Register a test tool
            async def test_tool(args: Dict[str, Any]) -> Dict[str, Any]:
                return {"status": "success", "data": args}
    
            schema = {
                "description": "Test tool",
                "inputSchema": {
                    "type": "object",
                    "properties": {
                        "message": {"type": "string"}
                    }
                }
            }
    
            server.register_tool("test_tool", test_tool, schema)
    
            assert "test_tool" in server.tools
            assert server.tools["test_tool"]["schema"] == schema
    
        @pytest.mark.asyncio
        async def test_rate_limiting(self):
            """Test rate limiting functionality"""
            # This would require mocking Redis or using a test instance
            # Implementation would depend on your rate limiting strategy
            pass
    
        @pytest.mark.asyncio
        async def test_caching(self):
            """Test caching functionality"""
            # This would require mocking Redis or using a test instance
            # Implementation would depend on your caching strategy
            pass
    
    # Integration test example
    class TestMCPServerIntegration:
        """Integration tests for MCP Server"""
    
        @pytest.mark.asyncio
        async def test_full_crud_workflow(self):
            """Test full CRUD workflow through MCP interface"""
            # Create mock server
            config = ServerConfig(
                name="test-integration",
                description="Integration test server"
            )
    
            # Use in-memory database
            mock_db = MockDatabaseManager()
    
            # Create CRUD tool
            test_schema = {
                "type": "object",
                "properties": {
                    "name": {"type": "string"},
                    "value": {"type": "integer"}
                },
                "required": ["name"]
            }
    
            crud_tool = CRUDBaseTool("items", mock_db, test_schema)
    
            # Register tools
            server = BaseMCPServer(config)
            server.register_tool(
                "create_item",
                crud_tool.create,
                {
                    "description": "Create item",
                    "inputSchema": {
                        "type": "object",
                        "properties": {
                            "name": {"type": "string"},
                            "value": {"type": "integer"}
                        },
                        "required": ["name"]
                    }
                }
            )
    
            # Test create
            result = await server.call_tool(
                "create_item",
                {"name": "Test Item", "value": 123}
            )
    
            response = json.loads(result[0].text)
            assert response["status"] == "created"
    
            # Test get (would need get_item tool)
            # Test list
            # Test update
            # Test delete
    

    This generic MCP server skill provides a reusable foundation for building any MCP server, not just for todos. It includes:

    1. Core Server Architecture - Base class with rate limiting, caching, and error handling
    2. Database Integration - Support for PostgreSQL, MongoDB, and SQLAlchemy with async operations
    3. Tool Patterns - Generic CRUD and bulk operation patterns that work with any entity
    4. Example Implementation - Shows how to build a task management server using the generic patterns
    5. Testing Framework - Comprehensive testing patterns and mocks

    ★ Insight ───────────────────────────────────── The key architectural pattern here is the separation of concerns between:

    • The MCP protocol handling (BaseMCPServer)
    • The data access layer (DatabaseManager with multiple backends)
    • The business logic layer (CRUDBaseTool and BulkOperationTool)
    • The specific implementation (TaskMCPServer combining the components)

    This makes the system highly reusable and maintainable. Any developer can quickly build a new MCP server by defining their entity schema and combining the generic tools. ─────────────────────────────────────────────────

    Recommended Servers
    MCP Hive
    MCP Hive
    EasyWeek
    EasyWeek
    Local Model Suitability MCP
    Local Model Suitability MCP
    Repository
    azeem-2/hackthonii
    Files