Smithery Logo
MCPsSkillsDocsPricing
Login
NewFlame, an assistant that learns and improves. Available onTelegramSlack
    manutej

    vector-database-management

    manutej/vector-database-management
    Productivity
    38

    About

    SKILL.md

    Install

    • Telegram
      Telegram
    • Slack
      Slack
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    • Download skill
    ├─
    ├─
    └─
    Smithery Logo

    Give agents more agency

    Resources

    DocumentationPrivacy PolicySystem Status

    Company

    PricingAboutBlog

    Connect

    © 2026 Smithery. All rights reserved.

    About

    Comprehensive guide for managing vector databases including Pinecone, Weaviate, and Chroma for semantic search, RAG systems, and similarity-based applications

    SKILL.md

    Vector Database Management

    Table of Contents

    1. Introduction
    2. Vector Embeddings Fundamentals
    3. Database Setup & Configuration
    4. Index Operations
    5. Vector Operations
    6. Similarity Search
    7. Metadata Filtering
    8. Hybrid Search
    9. Namespace & Collection Management
    10. Performance & Scaling
    11. Production Best Practices
    12. Cost Optimization

    Introduction

    Vector databases are specialized systems designed to store, index, and query high-dimensional vector embeddings efficiently. They power modern AI applications including semantic search, recommendation systems, RAG (Retrieval Augmented Generation), and similarity-based matching.

    Key Concepts

    • Vector Embeddings: Numerical representations of data (text, images, audio) in high-dimensional space
    • Similarity Search: Finding vectors that are "close" to a query vector using distance metrics
    • Metadata Filtering: Combining vector similarity with structured data filtering
    • Indexing: Optimization structures (HNSW, IVF, etc.) for fast approximate nearest neighbor search

    Database Comparison

    Feature Pinecone Weaviate Chroma
    Deployment Fully managed Managed or self-hosted Self-hosted or cloud
    Index Types Serverless, Pods HNSW HNSW
    Metadata Filtering Advanced GraphQL-based Simple
    Hybrid Search Sparse-Dense Built-in Limited
    Scale Massive Large Small-Medium
    Best For Production RAG Knowledge graphs Local development

    Vector Embeddings Fundamentals

    Understanding Vector Representations

    Vector embeddings transform unstructured data into numerical arrays that capture semantic meaning:

    # Text to embeddings using OpenAI
    from openai import OpenAI
    
    client = OpenAI(api_key="YOUR_API_KEY")
    
    def generate_embedding(text: str, model: str = "text-embedding-3-small") -> list[float]:
        """Generate embeddings from text using OpenAI."""
        response = client.embeddings.create(
            input=text,
            model=model
        )
        return response.data[0].embedding
    
    # Example usage
    text = "Vector databases enable semantic search capabilities"
    embedding = generate_embedding(text)
    print(f"Embedding dimension: {len(embedding)}")  # 1536 dimensions
    print(f"First 5 values: {embedding[:5]}")
    

    Popular Embedding Models

    # 1. OpenAI Embeddings (Production-grade)
    from openai import OpenAI
    
    def openai_embeddings(texts: list[str]) -> list[list[float]]:
        """Batch generate OpenAI embeddings."""
        client = OpenAI(api_key="YOUR_API_KEY")
        response = client.embeddings.create(
            input=texts,
            model="text-embedding-3-large"  # 3072 dimensions
        )
        return [item.embedding for item in response.data]
    
    # 2. Sentence Transformers (Open-source)
    from sentence_transformers import SentenceTransformer
    
    def sentence_transformer_embeddings(texts: list[str]) -> list[list[float]]:
        """Generate embeddings using Sentence Transformers."""
        model = SentenceTransformer('all-MiniLM-L6-v2')  # 384 dimensions
        embeddings = model.encode(texts)
        return embeddings.tolist()
    
    # 3. Cohere Embeddings
    import cohere
    
    def cohere_embeddings(texts: list[str]) -> list[list[float]]:
        """Generate embeddings using Cohere."""
        co = cohere.Client("YOUR_API_KEY")
        response = co.embed(
            texts=texts,
            model="embed-english-v3.0",
            input_type="search_document"
        )
        return response.embeddings
    

    Embedding Dimensions & Trade-offs

    # Different embedding models for different use cases
    EMBEDDING_CONFIGS = {
        "openai-small": {
            "model": "text-embedding-3-small",
            "dimensions": 1536,
            "cost_per_1m": 0.02,
            "use_case": "General purpose, cost-effective"
        },
        "openai-large": {
            "model": "text-embedding-3-large",
            "dimensions": 3072,
            "cost_per_1m": 0.13,
            "use_case": "High accuracy requirements"
        },
        "sentence-transformers": {
            "model": "all-MiniLM-L6-v2",
            "dimensions": 384,
            "cost_per_1m": 0.00,  # Open-source
            "use_case": "Local development, privacy-sensitive"
        },
        "cohere-multilingual": {
            "model": "embed-multilingual-v3.0",
            "dimensions": 1024,
            "cost_per_1m": 0.10,
            "use_case": "Multi-language applications"
        }
    }
    

    Database Setup & Configuration

    Pinecone Setup

    # Install Pinecone SDK
    # pip install pinecone-client
    
    from pinecone import Pinecone, ServerlessSpec
    
    # Initialize Pinecone client
    pc = Pinecone(api_key="YOUR_API_KEY")
    
    # List existing indexes
    indexes = pc.list_indexes()
    print(f"Existing indexes: {[idx.name for idx in indexes]}")
    
    # Create serverless index (recommended for production)
    index_name = "production-search"
    
    if index_name not in [idx.name for idx in pc.list_indexes()]:
        pc.create_index(
            name=index_name,
            dimension=1536,  # Match your embedding model
            metric="cosine",  # cosine, dotproduct, or euclidean
            spec=ServerlessSpec(
                cloud="aws",
                region="us-east-1"
            ),
            deletion_protection="enabled",  # Prevent accidental deletion
            tags={
                "environment": "production",
                "team": "ml",
                "project": "semantic-search"
            }
        )
        print(f"Created index: {index_name}")
    
    # Connect to index
    index = pc.Index(index_name)
    
    # Get index stats
    stats = index.describe_index_stats()
    print(f"Index stats: {stats}")
    

    Selective Metadata Indexing (Pinecone)

    # Configure which metadata fields to index for filtering
    # This optimizes memory usage and query performance
    
    from pinecone import Pinecone, ServerlessSpec
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    
    # Create index with metadata configuration
    pc.create_index(
        name="optimized-index",
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1",
            schema={
                "fields": {
                    # Index these fields for filtering
                    "document_id": {"filterable": True},
                    "category": {"filterable": True},
                    "created_at": {"filterable": True},
                    "tags": {"filterable": True},
                    # Store but don't index (saves memory)
                    "document_title": {"filterable": False},
                    "document_url": {"filterable": False},
                    "full_content": {"filterable": False}
                }
            }
        )
    )
    
    # This configuration allows you to:
    # 1. Filter by document_id, category, created_at, tags
    # 2. Retrieve document_title, document_url, full_content in results
    # 3. Save memory by not indexing non-filterable fields
    

    Weaviate Setup

    # Install Weaviate client
    # pip install weaviate-client
    
    import weaviate
    from weaviate.classes.config import Configure, Property, DataType
    
    # Connect to Weaviate
    client = weaviate.connect_to_local()
    
    # Or connect to Weaviate Cloud
    # client = weaviate.connect_to_wcs(
    #     cluster_url="YOUR_WCS_URL",
    #     auth_credentials=weaviate.auth.AuthApiKey("YOUR_API_KEY")
    # )
    
    # Create collection (schema)
    try:
        collection = client.collections.create(
            name="Documents",
            vectorizer_config=Configure.Vectorizer.text2vec_openai(
                model="text-embedding-3-small"
            ),
            properties=[
                Property(name="title", data_type=DataType.TEXT),
                Property(name="content", data_type=DataType.TEXT),
                Property(name="category", data_type=DataType.TEXT),
                Property(name="created_at", data_type=DataType.DATE),
                Property(name="tags", data_type=DataType.TEXT_ARRAY)
            ]
        )
        print(f"Created collection: Documents")
    except Exception as e:
        print(f"Collection exists or error: {e}")
    
    # Get collection
    documents = client.collections.get("Documents")
    
    # Check collection info
    print(documents.config.get())
    

    Chroma Setup

    # Install Chroma
    # pip install chromadb
    
    import chromadb
    from chromadb.config import Settings
    
    # Initialize Chroma client (persistent)
    client = chromadb.PersistentClient(path="./chroma_db")
    
    # Or use ephemeral (in-memory)
    # client = chromadb.EphemeralClient()
    
    # Create or get collection
    collection = client.get_or_create_collection(
        name="documents",
        metadata={
            "description": "Document collection for semantic search",
            "hnsw:space": "cosine"  # cosine, l2, or ip (inner product)
        }
    )
    
    # List all collections
    collections = client.list_collections()
    print(f"Available collections: {[c.name for c in collections]}")
    
    # Get collection info
    print(f"Collection count: {collection.count()}")
    

    Index Operations

    Creating Indexes with Different Configurations

    from pinecone import Pinecone, ServerlessSpec, PodSpec
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    
    # 1. Serverless index (auto-scaling, pay-per-use)
    pc.create_index(
        name="serverless-index",
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )
    
    # 2. Pod-based index (dedicated resources)
    pc.create_index(
        name="pod-index",
        dimension=1536,
        metric="dotproduct",
        spec=PodSpec(
            environment="us-east-1-aws",
            pod_type="p1.x1",  # Performance tier
            pods=2,  # Number of pods
            replicas=2,  # Replicas for high availability
            shards=1
        )
    )
    
    # 3. Sparse index (for BM25-like search)
    pc.create_index(
        name="sparse-index",
        dimension=None,  # Sparse vectors don't have fixed dimension
        metric="dotproduct",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )
    

    Index Management Operations

    from pinecone import Pinecone
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    
    # List all indexes
    indexes = pc.list_indexes()
    for idx in indexes:
        print(f"Name: {idx.name}, Status: {idx.status.state}, Host: {idx.host}")
    
    # Describe specific index
    index_info = pc.describe_index("production-search")
    print(f"Dimension: {index_info.dimension}")
    print(f"Metric: {index_info.metric}")
    print(f"Status: {index_info.status}")
    
    # Connect to index
    index = pc.Index("production-search")
    
    # Get index statistics
    stats = index.describe_index_stats()
    print(f"Total vectors: {stats.total_vector_count}")
    print(f"Namespaces: {stats.namespaces}")
    print(f"Index fullness: {stats.index_fullness}")
    
    # Delete index (be careful!)
    # pc.delete_index("test-index")
    

    Configuring Index for Optimal Performance

    # Configuration for different use cases
    
    # 1. High-throughput search (many queries/second)
    pc.create_index(
        name="high-throughput",
        dimension=1536,
        metric="cosine",
        spec=PodSpec(
            environment="us-east-1-aws",
            pod_type="p2.x1",  # Higher performance tier
            pods=4,
            replicas=3  # More replicas = higher query throughput
        )
    )
    
    # 2. Large-scale storage (billions of vectors)
    pc.create_index(
        name="large-scale",
        dimension=1536,
        metric="cosine",
        spec=PodSpec(
            environment="us-east-1-aws",
            pod_type="s1.x1",  # Storage-optimized
            pods=8,
            shards=4  # More shards = more storage capacity
        )
    )
    
    # 3. Cost-optimized development
    pc.create_index(
        name="dev-environment",
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )  # Serverless = pay only for what you use
    )
    

    Vector Operations

    Upserting Vectors (Pinecone)

    from pinecone import Pinecone
    import uuid
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    index = pc.Index("production-search")
    
    # 1. Single vector upsert
    vector_id = str(uuid.uuid4())
    index.upsert(
        vectors=[
            {
                "id": vector_id,
                "values": [0.1, 0.2, 0.3, ...],  # 1536 dimensions
                "metadata": {
                    "title": "Introduction to Vector Databases",
                    "category": "education",
                    "author": "John Doe",
                    "created_at": "2024-01-15",
                    "tags": ["ml", "ai", "databases"]
                }
            }
        ],
        namespace="documents"
    )
    
    # 2. Batch upsert (efficient for large datasets)
    batch_size = 100
    vectors = []
    
    for i, (doc_id, embedding, metadata) in enumerate(documents):
        vectors.append({
            "id": doc_id,
            "values": embedding,
            "metadata": metadata
        })
    
        # Upsert in batches
        if len(vectors) >= batch_size or i == len(documents) - 1:
            index.upsert(vectors=vectors, namespace="documents")
            print(f"Upserted batch of {len(vectors)} vectors")
            vectors = []
    
    # 3. Upsert with async for better performance
    from pinecone import Pinecone
    import asyncio
    
    async def upsert_vectors_async(vectors_batch):
        """Async upsert for parallel processing."""
        index.upsert(vectors=vectors_batch, namespace="documents", async_req=True)
    
    # Parallel upsert
    tasks = []
    for batch in batches:
        tasks.append(upsert_vectors_async(batch))
    await asyncio.gather(*tasks)
    

    Sparse Vector Operations (Pinecone)

    # Sparse vectors are useful for keyword-based search (like BM25)
    # Combined with dense vectors for hybrid search
    
    from pinecone import Pinecone
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    index = pc.Index("hybrid-search-index")
    
    # Upsert vector with both dense and sparse components
    index.upsert(
        vectors=[
            {
                "id": "doc1",
                "values": [0.1, 0.2, ..., 0.5],  # Dense vector
                "sparse_values": {
                    "indices": [10, 45, 123, 234, 678],  # Token IDs
                    "values": [0.8, 0.6, 0.9, 0.7, 0.5]  # TF-IDF weights
                },
                "metadata": {"title": "Hybrid Search Document"}
            }
        ],
        namespace="hybrid"
    )
    
    # Query with hybrid search
    results = index.query(
        vector=[0.1, 0.2, ..., 0.5],  # Dense query vector
        sparse_vector={
            "indices": [10, 45, 123],
            "values": [0.8, 0.7, 0.9]
        },
        top_k=10,
        namespace="hybrid",
        include_metadata=True
    )
    

    Vector Operations (Weaviate)

    import weaviate
    from weaviate.classes.query import MetadataQuery
    
    client = weaviate.connect_to_local()
    documents = client.collections.get("Documents")
    
    # 1. Insert single object
    doc_uuid = documents.data.insert(
        properties={
            "title": "Vector Database Guide",
            "content": "A comprehensive guide to vector databases...",
            "category": "tutorial",
            "created_at": "2024-01-15T10:00:00Z",
            "tags": ["database", "ml", "ai"]
        }
    )
    print(f"Inserted: {doc_uuid}")
    
    # 2. Batch insert
    with documents.batch.dynamic() as batch:
        for doc in document_list:
            batch.add_object(
                properties={
                    "title": doc["title"],
                    "content": doc["content"],
                    "category": doc["category"],
                    "created_at": doc["created_at"],
                    "tags": doc["tags"]
                }
            )
    
    # 3. Insert with custom vector
    documents.data.insert(
        properties={"title": "Custom Vector Doc", "content": "..."},
        vector=[0.1, 0.2, 0.3, ...]  # Your pre-computed vector
    )
    
    # 4. Update object
    documents.data.update(
        uuid=doc_uuid,
        properties={"title": "Updated Title"}
    )
    
    # 5. Delete object
    documents.data.delete_by_id(uuid=doc_uuid)
    

    Vector Operations (Chroma)

    import chromadb
    
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_collection("documents")
    
    # 1. Add documents with auto-embedding
    collection.add(
        documents=[
            "This is document 1",
            "This is document 2",
            "This is document 3"
        ],
        metadatas=[
            {"category": "tech", "author": "Alice"},
            {"category": "science", "author": "Bob"},
            {"category": "tech", "author": "Charlie"}
        ],
        ids=["doc1", "doc2", "doc3"]
    )
    
    # 2. Add with custom embeddings
    collection.add(
        embeddings=[
            [0.1, 0.2, 0.3, ...],
            [0.4, 0.5, 0.6, ...]
        ],
        metadatas=[
            {"title": "Doc 1"},
            {"title": "Doc 2"}
        ],
        ids=["custom1", "custom2"]
    )
    
    # 3. Update documents
    collection.update(
        ids=["doc1"],
        documents=["Updated document content"],
        metadatas=[{"category": "tech", "updated": True}]
    )
    
    # 4. Delete documents
    collection.delete(ids=["doc1", "doc2"])
    
    # 5. Get documents by IDs
    results = collection.get(
        ids=["doc1", "doc2"],
        include=["documents", "metadatas", "embeddings"]
    )
    

    Similarity Search

    Basic Similarity Search (Pinecone)

    from pinecone import Pinecone
    from openai import OpenAI
    
    # Initialize clients
    pc = Pinecone(api_key="PINECONE_API_KEY")
    openai_client = OpenAI(api_key="OPENAI_API_KEY")
    
    index = pc.Index("production-search")
    
    # 1. Generate query embedding
    query_text = "What are the benefits of vector databases?"
    response = openai_client.embeddings.create(
        input=query_text,
        model="text-embedding-3-small"
    )
    query_embedding = response.data[0].embedding
    
    # 2. Search for similar vectors
    results = index.query(
        vector=query_embedding,
        top_k=10,
        namespace="documents",
        include_values=False,
        include_metadata=True
    )
    
    # 3. Process results
    print(f"Found {len(results.matches)} results")
    for match in results.matches:
        print(f"ID: {match.id}")
        print(f"Score: {match.score:.4f}")
        print(f"Title: {match.metadata.get('title')}")
        print(f"Category: {match.metadata.get('category')}")
        print("---")
    

    Search by ID (Query by Example)

    # Search using an existing vector as query
    results = index.query(
        id="existing-doc-id",  # Use this document as the query
        top_k=10,
        namespace="documents",
        include_metadata=True
    )
    
    # Useful for "find similar items" features
    print(f"Documents similar to {results.matches[0].metadata.get('title')}:")
    for match in results.matches[1:]:  # Skip first (self)
        print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
    

    Multi-vector Search (Pinecone)

    # Search multiple query vectors in one request
    query_embeddings = [
        [0.1, 0.2, ...],  # Query 1
        [0.3, 0.4, ...],  # Query 2
        [0.5, 0.6, ...]   # Query 3
    ]
    
    results = index.query(
        queries=query_embeddings,
        top_k=5,
        namespace="documents",
        include_metadata=True
    )
    
    # Process results for each query
    for i, query_results in enumerate(results):
        print(f"\nResults for query {i+1}:")
        for match in query_results.matches:
            print(f"- {match.metadata.get('title')} (score: {match.score:.4f})")
    

    Similarity Search (Weaviate)

    import weaviate
    from weaviate.classes.query import MetadataQuery
    
    client = weaviate.connect_to_local()
    documents = client.collections.get("Documents")
    
    # 1. Near text search (semantic)
    response = documents.query.near_text(
        query="vector database performance optimization",
        limit=10,
        return_metadata=MetadataQuery(distance=True, certainty=True)
    )
    
    for obj in response.objects:
        print(f"Title: {obj.properties['title']}")
        print(f"Distance: {obj.metadata.distance:.4f}")
        print(f"Certainty: {obj.metadata.certainty:.4f}")
        print("---")
    
    # 2. Near vector search (with custom embedding)
    response = documents.query.near_vector(
        near_vector=[0.1, 0.2, 0.3, ...],
        limit=10
    )
    
    # 3. Near object search (find similar to existing object)
    response = documents.query.near_object(
        near_object="uuid-of-reference-object",
        limit=10
    )
    

    Similarity Search (Chroma)

    import chromadb
    
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_collection("documents")
    
    # 1. Query with text (auto-embedding)
    results = collection.query(
        query_texts=["What is machine learning?"],
        n_results=10,
        include=["documents", "metadatas", "distances"]
    )
    
    print(f"Found {len(results['ids'][0])} results")
    for i, doc_id in enumerate(results['ids'][0]):
        print(f"ID: {doc_id}")
        print(f"Distance: {results['distances'][0][i]:.4f}")
        print(f"Document: {results['documents'][0][i][:100]}...")
        print(f"Metadata: {results['metadatas'][0][i]}")
        print("---")
    
    # 2. Query with custom embedding
    results = collection.query(
        query_embeddings=[[0.1, 0.2, 0.3, ...]],
        n_results=10
    )
    

    Metadata Filtering

    Pinecone Metadata Filters

    from pinecone import Pinecone
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    index = pc.Index("production-search")
    
    # 1. Equality filter
    results = index.query(
        vector=query_embedding,
        top_k=10,
        filter={"category": {"$eq": "education"}},
        include_metadata=True
    )
    
    # 2. Inequality filter
    results = index.query(
        vector=query_embedding,
        top_k=10,
        filter={"year": {"$ne": 2023}},
        include_metadata=True
    )
    
    # 3. Range filters
    results = index.query(
        vector=query_embedding,
        top_k=10,
        filter={
            "$and": [
                {"year": {"$gte": 2020}},
                {"year": {"$lte": 2024}}
            ]
        },
        include_metadata=True
    )
    
    # 4. In/Not-in filters
    results = index.query(
        vector=query_embedding,
        top_k=10,
        filter={
            "category": {"$in": ["education", "tutorial", "guide"]}
        },
        include_metadata=True
    )
    
    # 5. Existence check
    results = index.query(
        vector=query_embedding,
        top_k=10,
        filter={"author": {"$exists": True}},
        include_metadata=True
    )
    
    # 6. Complex AND/OR queries
    results = index.query(
        vector=query_embedding,
        top_k=10,
        filter={
            "$and": [
                {"category": {"$eq": "education"}},
                {
                    "$or": [
                        {"year": {"$eq": 2024}},
                        {"featured": {"$eq": True}}
                    ]
                },
                {"tags": {"$in": ["ml", "ai"]}}
            ]
        },
        include_metadata=True
    )
    
    # 7. Greater than/Less than
    results = index.query(
        vector=query_embedding,
        top_k=10,
        filter={
            "view_count": {"$gt": 1000},
            "rating": {"$gte": 4.5}
        },
        include_metadata=True
    )
    

    Production Metadata Filter Patterns

    # Pattern 1: Time-based filtering (recent content)
    from datetime import datetime, timedelta
    
    def search_recent_documents(query_text: str, days: int = 30):
        """Search only documents from last N days."""
        cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
    
        results = index.query(
            vector=generate_embedding(query_text),
            top_k=10,
            filter={
                "created_at": {"$gte": cutoff_date}
            },
            include_metadata=True
        )
        return results
    
    # Pattern 2: User permission filtering
    def search_with_permissions(query_text: str, user_id: str, user_roles: list):
        """Search only documents user has access to."""
        results = index.query(
            vector=generate_embedding(query_text),
            top_k=10,
            filter={
                "$or": [
                    {"owner_id": {"$eq": user_id}},
                    {"shared_with": {"$in": [user_id]}},
                    {"public": {"$eq": True}},
                    {"required_roles": {"$in": user_roles}}
                ]
            },
            include_metadata=True
        )
        return results
    
    # Pattern 3: Multi-tenant filtering
    def search_tenant_documents(query_text: str, tenant_id: str, category: str = None):
        """Search within a specific tenant's data."""
        filter_dict = {"tenant_id": {"$eq": tenant_id}}
    
        if category:
            filter_dict["category"] = {"$eq": category}
    
        results = index.query(
            vector=generate_embedding(query_text),
            top_k=10,
            filter=filter_dict,
            include_metadata=True
        )
        return results
    
    # Pattern 4: Faceted search
    def faceted_search(query_text: str, facets: dict):
        """Search with multiple facet filters."""
        filter_conditions = []
    
        for field, values in facets.items():
            if isinstance(values, list):
                filter_conditions.append({field: {"$in": values}})
            else:
                filter_conditions.append({field: {"$eq": values}})
    
        results = index.query(
            vector=generate_embedding(query_text),
            top_k=10,
            filter={"$and": filter_conditions} if filter_conditions else {},
            include_metadata=True
        )
        return results
    
    # Usage
    results = faceted_search(
        "machine learning tutorials",
        facets={
            "category": ["education", "tutorial"],
            "difficulty": "beginner",
            "language": ["english", "spanish"]
        }
    )
    

    Weaviate Metadata Filtering

    import weaviate
    from weaviate.classes.query import Filter
    
    client = weaviate.connect_to_local()
    documents = client.collections.get("Documents")
    
    # 1. Simple equality filter
    response = documents.query.near_text(
        query="vector databases",
        limit=10,
        filters=Filter.by_property("category").equal("education")
    )
    
    # 2. Greater than filter
    response = documents.query.near_text(
        query="machine learning",
        limit=10,
        filters=Filter.by_property("year").greater_than(2020)
    )
    
    # 3. Contains any filter
    response = documents.query.near_text(
        query="AI tutorials",
        limit=10,
        filters=Filter.by_property("tags").contains_any(["ml", "ai", "deep-learning"])
    )
    
    # 4. Complex AND/OR filters
    response = documents.query.near_text(
        query="database optimization",
        limit=10,
        filters=(
            Filter.by_property("category").equal("tutorial") &
            (Filter.by_property("difficulty").equal("beginner") |
             Filter.by_property("featured").equal(True))
        )
    )
    

    Chroma Metadata Filtering

    import chromadb
    
    client = chromadb.PersistentClient(path="./chroma_db")
    collection = client.get_collection("documents")
    
    # 1. Simple equality filter
    results = collection.query(
        query_texts=["vector databases"],
        n_results=10,
        where={"category": "education"}
    )
    
    # 2. AND conditions
    results = collection.query(
        query_texts=["machine learning"],
        n_results=10,
        where={
            "$and": [
                {"category": "education"},
                {"difficulty": "beginner"}
            ]
        }
    )
    
    # 3. OR conditions
    results = collection.query(
        query_texts=["AI tutorials"],
        n_results=10,
        where={
            "$or": [
                {"category": "education"},
                {"category": "tutorial"}
            ]
        }
    )
    
    # 4. Greater than/Less than
    results = collection.query(
        query_texts=["recent content"],
        n_results=10,
        where={"year": {"$gte": 2023}}
    )
    
    # 5. In operator
    results = collection.query(
        query_texts=["programming guides"],
        n_results=10,
        where={"language": {"$in": ["python", "javascript", "go"]}}
    )
    

    Hybrid Search

    Pinecone Hybrid Search (Dense + Sparse)

    from pinecone import Pinecone
    from typing import Dict, List
    import re
    from collections import Counter
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    index = pc.Index("hybrid-search-index")
    
    def create_sparse_vector(text: str, top_k: int = 100) -> Dict:
        """Create sparse vector using simple TF approach."""
        # Tokenize
        tokens = re.findall(r'\w+', text.lower())
    
        # Calculate term frequencies
        tf = Counter(tokens)
    
        # Create vocabulary mapping
        vocab = {word: hash(word) % 10000 for word in set(tokens)}
    
        # Get top-k terms
        top_terms = tf.most_common(top_k)
    
        # Create sparse vector
        indices = [vocab[term] for term, _ in top_terms]
        values = [float(freq) / len(tokens) for _, freq in top_terms]
    
        return {
            "indices": indices,
            "values": values
        }
    
    def hybrid_search(query_text: str, top_k: int = 10, alpha: float = 0.5):
        """
        Perform hybrid search combining dense and sparse vectors.
        alpha: weight for dense search (0.0 = sparse only, 1.0 = dense only)
        """
        # Generate dense vector
        dense_vector = generate_embedding(query_text)
    
        # Generate sparse vector
        sparse_vector = create_sparse_vector(query_text)
    
        # Hybrid query
        results = index.query(
            vector=dense_vector,
            sparse_vector=sparse_vector,
            top_k=top_k,
            include_metadata=True
        )
    
        return results
    
    # Example usage
    results = hybrid_search("machine learning vector databases", top_k=10)
    for match in results.matches:
        print(f"{match.metadata['title']}: {match.score:.4f}")
    

    Weaviate Hybrid Search

    import weaviate
    
    client = weaviate.connect_to_local()
    documents = client.collections.get("Documents")
    
    # Hybrid search (combines dense vector + BM25 keyword search)
    response = documents.query.hybrid(
        query="vector database performance",
        limit=10,
        alpha=0.5,  # 0 = pure keyword, 1 = pure vector, 0.5 = balanced
        fusion_type="rankedFusion"  # or "relativeScore"
    )
    
    for obj in response.objects:
        print(f"Title: {obj.properties['title']}")
        print(f"Score: {obj.metadata.score}")
        print("---")
    
    # Hybrid search with filters
    response = documents.query.hybrid(
        query="machine learning tutorials",
        limit=10,
        alpha=0.7,  # Favor semantic search
        filters=Filter.by_property("category").equal("education")
    )
    
    # Hybrid search with custom vector
    response = documents.query.hybrid(
        query="custom query",
        vector=[0.1, 0.2, 0.3, ...],  # Your pre-computed vector
        limit=10,
        alpha=0.5
    )
    

    BM25 + Vector Hybrid (Custom Implementation)

    from rank_bm25 import BM25Okapi
    import numpy as np
    
    class HybridSearchEngine:
        """Custom hybrid search combining BM25 and vector search."""
    
        def __init__(self, index, documents: List[Dict]):
            self.index = index
            self.documents = documents
    
            # Build BM25 index
            tokenized_docs = [doc['content'].lower().split() for doc in documents]
            self.bm25 = BM25Okapi(tokenized_docs)
            self.doc_ids = [doc['id'] for doc in documents]
    
        def search(self, query: str, top_k: int = 10, alpha: float = 0.5):
            """
            Hybrid search with custom score fusion.
            alpha: weight for vector search (1-alpha for BM25)
            """
            # 1. Vector search
            query_embedding = generate_embedding(query)
            vector_results = self.index.query(
                vector=query_embedding,
                top_k=top_k * 2,  # Get more candidates
                include_metadata=True
            )
    
            # 2. BM25 search
            tokenized_query = query.lower().split()
            bm25_scores = self.bm25.get_scores(tokenized_query)
    
            # 3. Normalize scores
            vector_scores = {
                m.id: m.score for m in vector_results.matches
            }
            max_vec_score = max(vector_scores.values()) if vector_scores else 1.0
    
            max_bm25_score = max(bm25_scores) if max(bm25_scores) > 0 else 1.0
    
            # 4. Combine scores
            hybrid_scores = {}
            all_ids = set(vector_scores.keys()) | set(self.doc_ids)
    
            for doc_id in all_ids:
                vec_score = vector_scores.get(doc_id, 0) / max_vec_score
                idx = self.doc_ids.index(doc_id) if doc_id in self.doc_ids else -1
                bm25_score = bm25_scores[idx] / max_bm25_score if idx >= 0 else 0
    
                hybrid_scores[doc_id] = (alpha * vec_score) + ((1 - alpha) * bm25_score)
    
            # 5. Rank and return top-k
            ranked = sorted(hybrid_scores.items(), key=lambda x: x[1], reverse=True)
            return ranked[:top_k]
    
    # Usage
    engine = HybridSearchEngine(index, documents)
    results = engine.search("machine learning databases", top_k=10, alpha=0.7)
    

    Namespace & Collection Management

    Pinecone Namespaces

    from pinecone import Pinecone
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    index = pc.Index("production-search")
    
    # 1. Upsert to specific namespace
    index.upsert(
        vectors=[
            {"id": "doc1", "values": [...], "metadata": {...}}
        ],
        namespace="production"
    )
    
    # 2. Query specific namespace
    results = index.query(
        vector=[...],
        top_k=10,
        namespace="production",
        include_metadata=True
    )
    
    # 3. Get namespace statistics
    stats = index.describe_index_stats()
    for namespace, info in stats.namespaces.items():
        print(f"Namespace: {namespace}")
        print(f"  Vector count: {info.vector_count}")
    
    # 4. Delete all vectors in namespace
    index.delete(delete_all=True, namespace="test")
    
    # 5. Multi-namespace architecture
    NAMESPACES = {
        "production": "Live user-facing data",
        "staging": "Testing before production",
        "development": "Development and experiments",
        "archive": "Historical data"
    }
    
    def upsert_with_environment(vectors, environment="production"):
        """Upsert to appropriate namespace."""
        namespace = environment if environment in NAMESPACES else "development"
        index.upsert(vectors=vectors, namespace=namespace)
    
    def search_across_namespaces(query_vector, namespaces=["production", "archive"]):
        """Search multiple namespaces and combine results."""
        all_results = []
    
        for ns in namespaces:
            results = index.query(
                vector=query_vector,
                top_k=10,
                namespace=ns,
                include_metadata=True
            )
            for match in results.matches:
                match.metadata["source_namespace"] = ns
                all_results.append(match)
    
        # Sort by score
        all_results.sort(key=lambda x: x.score, reverse=True)
        return all_results[:10]
    

    Weaviate Collections

    import weaviate
    from weaviate.classes.config import Configure
    
    client = weaviate.connect_to_local()
    
    # 1. Create multiple collections
    collections_config = [
        {
            "name": "Products",
            "properties": ["name", "description", "category", "price"]
        },
        {
            "name": "Users",
            "properties": ["username", "bio", "interests"]
        },
        {
            "name": "Reviews",
            "properties": ["content", "rating", "product_id", "user_id"]
        }
    ]
    
    for config in collections_config:
        try:
            client.collections.create(
                name=config["name"],
                vectorizer_config=Configure.Vectorizer.text2vec_openai()
            )
        except Exception as e:
            print(f"Collection {config['name']} exists: {e}")
    
    # 2. Cross-collection references
    client.collections.create(
        name="Orders",
        references=[
            weaviate.classes.config.ReferenceProperty(
                name="hasProduct",
                target_collection="Products"
            ),
            weaviate.classes.config.ReferenceProperty(
                name="byUser",
                target_collection="Users"
            )
        ]
    )
    
    # 3. Multi-collection search
    def search_all_collections(query: str):
        """Search across multiple collections."""
        results = {}
    
        for collection_name in ["Products", "Users", "Reviews"]:
            collection = client.collections.get(collection_name)
            response = collection.query.near_text(
                query=query,
                limit=5
            )
            results[collection_name] = response.objects
    
        return results
    
    # 4. Delete collection
    client.collections.delete("TestCollection")
    

    Chroma Collections

    import chromadb
    
    client = chromadb.PersistentClient(path="./chroma_db")
    
    # 1. Create multiple collections
    collections = {
        "documents": {
            "metadata": {"description": "Document embeddings"},
            "embedding_function": None  # Use default
        },
        "images": {
            "metadata": {"description": "Image embeddings"},
            "embedding_function": None
        },
        "code": {
            "metadata": {"description": "Code snippets"},
            "embedding_function": None
        }
    }
    
    for name, config in collections.items():
        collection = client.get_or_create_collection(
            name=name,
            metadata=config["metadata"]
        )
    
    # 2. List all collections
    all_collections = client.list_collections()
    for coll in all_collections:
        print(f"Collection: {coll.name}")
        print(f"  Count: {coll.count()}")
        print(f"  Metadata: {coll.metadata}")
    
    # 3. Collection-specific operations
    docs_collection = client.get_collection("documents")
    docs_collection.add(
        documents=["Document text..."],
        metadatas=[{"type": "article"}],
        ids=["doc1"]
    )
    
    # 4. Delete collection
    client.delete_collection("test_collection")
    
    # 5. Multi-collection search
    def search_all_collections(query: str, n_results: int = 5):
        """Search across all collections."""
        results = {}
    
        for collection in client.list_collections():
            try:
                collection_results = collection.query(
                    query_texts=[query],
                    n_results=n_results
                )
                results[collection.name] = collection_results
            except Exception as e:
                print(f"Error searching {collection.name}: {e}")
    
        return results
    

    Performance & Scaling

    Batch Operations Best Practices

    from pinecone import Pinecone
    from typing import List, Dict
    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    pc = Pinecone(api_key="YOUR_API_KEY")
    index = pc.Index("production-search")
    
    # 1. Optimal batch size
    OPTIMAL_BATCH_SIZE = 100  # Pinecone recommendation
    
    def batch_upsert(vectors: List[Dict], batch_size: int = OPTIMAL_BATCH_SIZE):
        """Efficiently upsert vectors in batches."""
        total_batches = (len(vectors) + batch_size - 1) // batch_size
    
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            index.upsert(vectors=batch, namespace="documents")
    
            if (i // batch_size + 1) % 10 == 0:
                print(f"Processed {i // batch_size + 1}/{total_batches} batches")
    
    # 2. Parallel batch upsert
    def parallel_batch_upsert(vectors: List[Dict], num_workers: int = 4):
        """Parallel upsert using thread pool."""
        batch_size = 100
        batches = [
            vectors[i:i + batch_size]
            for i in range(0, len(vectors), batch_size)
        ]
    
        def upsert_batch(batch):
            try:
                index.upsert(vectors=batch, namespace="documents")
                return len(batch)
            except Exception as e:
                print(f"Error upserting batch: {e}")
                return 0
    
        with ThreadPoolExecutor(max_workers=num_workers) as executor:
            results = list(executor.map(upsert_batch, batches))
    
        print(f"Successfully upserted {sum(results)} vectors")
    
    # 3. Rate limiting for API calls
    class RateLimiter:
        """Simple rate limiter for API calls."""
    
        def __init__(self, max_calls: int, time_window: float):
            self.max_calls = max_calls
            self.time_window = time_window
            self.calls = []
    
        def wait_if_needed(self):
            """Wait if rate limit would be exceeded."""
            now = time.time()
            # Remove old calls outside time window
            self.calls = [call_time for call_time in self.calls
                          if now - call_time < self.time_window]
    
            if len(self.calls) >= self.max_calls:
                sleep_time = self.time_window - (now - self.calls[0])
                if sleep_time > 0:
                    time.sleep(sleep_time)
                    self.calls = []
    
            self.calls.append(now)
    
    # Usage
    rate_limiter = RateLimiter(max_calls=100, time_window=60)  # 100 calls/minute
    
    for batch in batches:
        rate_limiter.wait_if_needed()
        index.upsert(vectors=batch)
    
    # 4. Bulk delete optimization
    def bulk_delete_by_filter(filter_dict: Dict, namespace: str = "documents"):
        """Delete vectors matching filter (more efficient than individual deletes)."""
        # First, get IDs matching filter
        results = index.query(
            vector=[0] * 1536,  # Dummy vector
            top_k=10000,  # Max allowed
            filter=filter_dict,
            namespace=namespace,
            include_values=False
        )
    
        ids_to_delete = [match.id for match in results.matches]
    
        # Delete in batches
        batch_size = 1000
        for i in range(0, len(ids_to_delete), batch_size):
            batch = ids_to_delete[i:i + batch_size]
            index.delete(ids=batch, namespace=namespace)
            print(f"Deleted {len(batch)} vectors")
    

    Query Optimization

    # 1. Minimize data transfer
    results = index.query(
        vector=query_vector,
        top_k=10,
        include_values=False,  # Don't return vectors if not needed
        include_metadata=False,  # Don't return metadata if not needed
        namespace="documents"
    )
    
    # 2. Use appropriate top_k
    # Smaller top_k = faster queries
    results_small = index.query(vector=query_vector, top_k=10)  # Fast
    results_large = index.query(vector=query_vector, top_k=1000)  # Slower
    
    # 3. Filter before vector search when possible
    # Good: Reduces search space
    results = index.query(
        vector=query_vector,
        top_k=10,
        filter={"category": "education"},  # Reduces candidates
        namespace="documents"
    )
    
    # 4. Batch queries when possible
    # More efficient than individual queries
    queries = [embedding1, embedding2, embedding3]
    results = index.query(
        queries=queries,
        top_k=10,
        namespace="documents"
    )
    
    # 5. Cache frequent queries
    from functools import lru_cache
    import hashlib
    import json
    
    def vector_hash(vector: List[float]) -> str:
        """Create hash of vector for caching."""
        return hashlib.md5(json.dumps(vector).encode()).hexdigest()
    
    class CachedIndex:
        """Wrapper with query caching."""
    
        def __init__(self, index, cache_size: int = 1000):
            self.index = index
            self.cache = {}
            self.cache_size = cache_size
    
        def query(self, vector: List[float], top_k: int = 10, **kwargs):
            """Query with caching."""
            cache_key = f"{vector_hash(vector)}_{top_k}_{json.dumps(kwargs)}"
    
            if cache_key in self.cache:
                return self.cache[cache_key]
    
            results = self.index.query(vector=vector, top_k=top_k, **kwargs)
    
            if len(self.cache) >= self.cache_size:
                # Remove oldest entry
                self.cache.pop(next(iter(self.cache)))
    
            self.cache[cache_key] = results
            return results
    
    # Usage
    cached_index = CachedIndex(index)
    results = cached_index.query(query_vector, top_k=10)  # Cached on subsequent calls
    

    Scaling Strategies

    # 1. Index sizing for scale
    def calculate_index_requirements(
        num_vectors: int,
        dimension: int,
        metadata_size_per_vector: int = 1024  # bytes
    ) -> Dict:
        """Calculate storage and cost for index."""
        # Approximate calculations
        vector_size = dimension * 4  # 4 bytes per float32
        total_vector_storage = num_vectors * vector_size
        total_metadata_storage = num_vectors * metadata_size_per_vector
        total_storage = total_vector_storage + total_metadata_storage
    
        # Pinecone pricing (approximate)
        storage_cost_per_gb_month = 0.095  # Serverless pricing
        total_gb = total_storage / (1024 ** 3)
        monthly_storage_cost = total_gb * storage_cost_per_gb_month
    
        return {
            "num_vectors": num_vectors,
            "total_storage_gb": round(total_gb, 2),
            "monthly_storage_cost_usd": round(monthly_storage_cost, 2),
            "recommended_pod_type": "s1.x1" if num_vectors > 10_000_000 else "p1.x1"
        }
    
    # Example
    reqs = calculate_index_requirements(
        num_vectors=10_000_000,
        dimension=1536
    )
    print(f"10M vectors storage: {reqs['total_storage_gb']} GB")
    print(f"Monthly cost: ${reqs['monthly_storage_cost_usd']}")
    
    # 2. Sharding strategy for massive scale
    def create_sharded_indexes(
        base_name: str,
        num_shards: int,
        dimension: int,
        metric: str = "cosine"
    ):
        """Create multiple indexes for horizontal scaling."""
        indexes = []
    
        for shard_id in range(num_shards):
            index_name = f"{base_name}-shard-{shard_id}"
    
            pc.create_index(
                name=index_name,
                dimension=dimension,
                metric=metric,
                spec=ServerlessSpec(cloud="aws", region="us-east-1")
            )
    
            indexes.append(index_name)
    
        return indexes
    
    def route_to_shard(vector_id: str, num_shards: int) -> int:
        """Determine which shard a vector belongs to."""
        return hash(vector_id) % num_shards
    
    def query_sharded_indexes(query_vector: List[float], indexes: List, top_k: int = 10):
        """Query all shards and merge results."""
        all_results = []
    
        for index_name in indexes:
            idx = pc.Index(index_name)
            results = idx.query(
                vector=query_vector,
                top_k=top_k,
                include_metadata=True
            )
            all_results.extend(results.matches)
    
        # Sort by score and return top_k
        all_results.sort(key=lambda x: x.score, reverse=True)
        return all_results[:top_k]
    

    Production Best Practices

    Error Handling & Retries

    import time
    from typing import Optional, Callable
    import logging
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    class PineconeRetryHandler:
        """Robust error handling for Pinecone operations."""
    
        def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
            self.max_retries = max_retries
            self.base_delay = base_delay
    
        def retry_with_backoff(
            self,
            operation: Callable,
            *args,
            **kwargs
        ) -> Optional[any]:
            """Retry operation with exponential backoff."""
            for attempt in range(self.max_retries):
                try:
                    return operation(*args, **kwargs)
                except Exception as e:
                    if attempt == self.max_retries - 1:
                        logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
                        raise
    
                    delay = self.base_delay * (2 ** attempt)
                    logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                    time.sleep(delay)
    
            return None
    
    # Usage
    retry_handler = PineconeRetryHandler(max_retries=3)
    
    # Upsert with retry
    def safe_upsert(vectors, namespace="documents"):
        return retry_handler.retry_with_backoff(
            index.upsert,
            vectors=vectors,
            namespace=namespace
        )
    
    # Query with retry
    def safe_query(vector, top_k=10, **kwargs):
        return retry_handler.retry_with_backoff(
            index.query,
            vector=vector,
            top_k=top_k,
            **kwargs
        )
    
    # Example
    try:
        results = safe_query(query_vector, top_k=10, include_metadata=True)
    except Exception as e:
        logger.error(f"Query failed permanently: {e}")
    

    Monitoring & Observability

    import time
    from dataclasses import dataclass
    from typing import Dict, List
    from datetime import datetime
    
    @dataclass
    class QueryMetrics:
        """Track query performance metrics."""
        query_time: float
        result_count: int
        top_score: float
        timestamp: datetime
        namespace: str
        filter_used: bool
    
    class VectorDBMonitor:
        """Monitor vector database operations."""
    
        def __init__(self):
            self.metrics: List[QueryMetrics] = []
    
        def track_query(
            self,
            query_func: Callable,
            *args,
            **kwargs
        ):
            """Track query execution and metrics."""
            start_time = time.time()
            results = query_func(*args, **kwargs)
            elapsed = time.time() - start_time
    
            metrics = QueryMetrics(
                query_time=elapsed,
                result_count=len(results.matches),
                top_score=results.matches[0].score if results.matches else 0.0,
                timestamp=datetime.now(),
                namespace=kwargs.get('namespace', 'default'),
                filter_used='filter' in kwargs
            )
    
            self.metrics.append(metrics)
    
            # Alert on slow queries
            if elapsed > 1.0:  # 1 second threshold
                logger.warning(f"Slow query detected: {elapsed:.2f}s")
    
            return results
    
        def get_stats(self) -> Dict:
            """Get aggregate statistics."""
            if not self.metrics:
                return {}
    
            query_times = [m.query_time for m in self.metrics]
    
            return {
                "total_queries": len(self.metrics),
                "avg_query_time": sum(query_times) / len(query_times),
                "p95_query_time": sorted(query_times)[int(len(query_times) * 0.95)],
                "p99_query_time": sorted(query_times)[int(len(query_times) * 0.99)],
                "avg_results": sum(m.result_count for m in self.metrics) / len(self.metrics),
                "filtered_queries_pct": sum(1 for m in self.metrics if m.filter_used) / len(self.metrics) * 100
            }
    
    # Usage
    monitor = VectorDBMonitor()
    
    # Wrap queries
    results = monitor.track_query(
        index.query,
        vector=query_vector,
        top_k=10,
        namespace="documents",
        filter={"category": "education"}
    )
    
    # Get statistics
    stats = monitor.get_stats()
    print(f"Average query time: {stats['avg_query_time']:.3f}s")
    print(f"P95 query time: {stats['p95_query_time']:.3f}s")
    

    Data Validation

    from typing import List, Dict
    import numpy as np
    
    class VectorValidator:
        """Validate vectors and metadata before operations."""
    
        def __init__(self, expected_dimension: int):
            self.expected_dimension = expected_dimension
    
        def validate_vector(self, vector: List[float]) -> tuple[bool, str]:
            """Validate vector format and content."""
            # Check type
            if not isinstance(vector, (list, np.ndarray)):
                return False, "Vector must be list or numpy array"
    
            # Check dimension
            if len(vector) != self.expected_dimension:
                return False, f"Expected {self.expected_dimension} dimensions, got {len(vector)}"
    
            # Check for NaN or Inf
            if any(not np.isfinite(v) for v in vector):
                return False, "Vector contains NaN or Inf values"
    
            # Check for zero vector
            if all(v == 0 for v in vector):
                return False, "Zero vector not allowed"
    
            return True, "Valid"
    
        def validate_metadata(self, metadata: Dict) -> tuple[bool, str]:
            """Validate metadata format."""
            # Check type
            if not isinstance(metadata, dict):
                return False, "Metadata must be dictionary"
    
            # Check metadata size (Pinecone limit: 40KB)
            metadata_str = str(metadata)
            if len(metadata_str.encode('utf-8')) > 40_000:
                return False, "Metadata exceeds 40KB limit"
    
            # Check for required fields (customize as needed)
            required_fields = ["title", "category"]
            for field in required_fields:
                if field not in metadata:
                    return False, f"Missing required field: {field}"
    
            return True, "Valid"
    
        def validate_batch(
            self,
            vectors: List[Dict]
        ) -> tuple[List[Dict], List[str]]:
            """Validate batch of vectors, return valid ones and errors."""
            valid_vectors = []
            errors = []
    
            for i, item in enumerate(vectors):
                # Validate vector
                is_valid, error = self.validate_vector(item.get('values', []))
                if not is_valid:
                    errors.append(f"Vector {i} ({item.get('id', 'unknown')}): {error}")
                    continue
    
                # Validate metadata
                if 'metadata' in item:
                    is_valid, error = self.validate_metadata(item['metadata'])
                    if not is_valid:
                        errors.append(f"Metadata {i} ({item.get('id', 'unknown')}): {error}")
                        continue
    
                valid_vectors.append(item)
    
            return valid_vectors, errors
    
    # Usage
    validator = VectorValidator(expected_dimension=1536)
    
    # Validate single vector
    is_valid, error = validator.validate_vector(embedding)
    if not is_valid:
        print(f"Invalid vector: {error}")
    
    # Validate batch
    valid_vectors, errors = validator.validate_batch(vectors_to_upsert)
    if errors:
        for error in errors:
            logger.error(error)
    
    # Upsert only valid vectors
    if valid_vectors:
        index.upsert(vectors=valid_vectors)
    

    Backup & Disaster Recovery

    import json
    import gzip
    from datetime import datetime
    from pathlib import Path
    
    class VectorDBBackup:
        """Backup and restore vector database data."""
    
        def __init__(self, index, backup_dir: str = "./backups"):
            self.index = index
            self.backup_dir = Path(backup_dir)
            self.backup_dir.mkdir(exist_ok=True)
    
        def backup_namespace(
            self,
            namespace: str = "documents",
            compress: bool = True
        ) -> str:
            """Backup all vectors in a namespace."""
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"backup_{namespace}_{timestamp}.json"
    
            if compress:
                filename += ".gz"
    
            filepath = self.backup_dir / filename
    
            # Fetch all vectors (in batches)
            all_vectors = []
            batch_size = 100
    
            # Get all IDs first (would need to be tracked separately)
            # This is a simplified example
            stats = self.index.describe_index_stats()
    
            # For actual implementation, you'd need to track IDs
            # or use fetch with known IDs
    
            # Save to file
            data = {
                "namespace": namespace,
                "timestamp": timestamp,
                "vector_count": len(all_vectors),
                "vectors": all_vectors
            }
    
            if compress:
                with gzip.open(filepath, 'wt', encoding='utf-8') as f:
                    json.dump(data, f)
            else:
                with open(filepath, 'w') as f:
                    json.dump(data, f, indent=2)
    
            logger.info(f"Backed up {len(all_vectors)} vectors to {filepath}")
            return str(filepath)
    
        def restore_from_backup(
            self,
            backup_file: str,
            target_namespace: str = None
        ):
            """Restore vectors from backup file."""
            filepath = Path(backup_file)
    
            # Load backup
            if filepath.suffix == '.gz':
                with gzip.open(filepath, 'rt', encoding='utf-8') as f:
                    data = json.load(f)
            else:
                with open(filepath, 'r') as f:
                    data = json.load(f)
    
            namespace = target_namespace or data['namespace']
            vectors = data['vectors']
    
            # Restore in batches
            batch_size = 100
            for i in range(0, len(vectors), batch_size):
                batch = vectors[i:i + batch_size]
                self.index.upsert(vectors=batch, namespace=namespace)
                logger.info(f"Restored {len(batch)} vectors")
    
            logger.info(f"Restored {len(vectors)} vectors to namespace '{namespace}'")
    
    # Usage
    backup_manager = VectorDBBackup(index)
    
    # Backup
    backup_file = backup_manager.backup_namespace("production")
    
    # Restore
    backup_manager.restore_from_backup(backup_file, target_namespace="production-restored")
    

    Cost Optimization

    Storage Optimization

    # 1. Reduce metadata size
    # Bad: Storing full content in metadata
    bad_metadata = {
        "title": "Long document title",
        "full_content": "...<entire document>...",  # Wastes space
        "description": "...<long description>...",
        "extra_field_1": "...",
        "extra_field_2": "..."
    }
    
    # Good: Store only necessary metadata
    good_metadata = {
        "title": "Long document title",
        "doc_id": "doc-123",  # Reference to external store
        "category": "education",
        "created_at": "2024-01-15"
    }
    
    # 2. Use selective metadata indexing
    # Only index fields you'll filter on
    pc.create_index(
        name="optimized-index",
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1",
            schema={
                "fields": {
                    "category": {"filterable": True},  # Need to filter
                    "created_at": {"filterable": True},  # Need to filter
                    "title": {"filterable": False},  # Just for display
                    "description": {"filterable": False}  # Just for display
                }
            }
        )
    )
    
    # 3. Regular cleanup of unused vectors
    def cleanup_old_vectors(days_old: int = 90):
        """Delete vectors older than specified days."""
        from datetime import datetime, timedelta
    
        cutoff_date = (datetime.now() - timedelta(days=days_old)).isoformat()
    
        # Delete by filter
        index.delete(
            filter={"created_at": {"$lt": cutoff_date}},
            namespace="documents"
        )
    
    # 4. Compress dimensions for smaller models
    # text-embedding-3-small: 1536 dimensions
    # all-MiniLM-L6-v2: 384 dimensions (75% storage reduction)
    
    # Trade-off: slightly lower accuracy for significant cost savings
    

    Query Cost Optimization

    # 1. Batch queries instead of individual
    # Bad: Multiple individual queries
    for query in queries:
        results = index.query(vector=query, top_k=10)  # N API calls
    
    # Good: Single batch query
    results = index.query(
        queries=query_vectors,  # 1 API call
        top_k=10
    )
    
    # 2. Use appropriate top_k
    # Larger top_k = more expensive
    results = index.query(
        vector=query_vector,
        top_k=10,  # Usually sufficient
        # top_k=1000  # Much more expensive
    )
    
    # 3. Minimize data transfer
    results = index.query(
        vector=query_vector,
        top_k=10,
        include_values=False,  # Save bandwidth
        include_metadata=False  # Save bandwidth if not needed
    )
    
    # 4. Use caching for repeated queries
    from functools import lru_cache
    
    @lru_cache(maxsize=1000)
    def cached_search(query_text: str, top_k: int = 10):
        """Cache search results for identical queries."""
        embedding = generate_embedding(query_text)
        results = index.query(
            vector=embedding,
            top_k=top_k,
            include_metadata=True
        )
        return results
    
    # 5. Choose serverless vs pods appropriately
    # Serverless: Low/variable traffic (pay per query)
    # Pods: High consistent traffic (fixed cost)
    
    def choose_deployment_type(
        queries_per_month: int,
        avg_response_time_requirement: float = 100  # ms
    ) -> str:
        """Recommend deployment type based on usage."""
        # Rough cost calculations (update with current pricing)
        serverless_cost_per_query = 0.0001  # Example
        pod_cost_per_month = 70  # p1.x1 pod
    
        serverless_monthly_cost = queries_per_month * serverless_cost_per_query
    
        if serverless_monthly_cost < pod_cost_per_month:
            return "serverless"
        else:
            return "pods"
    

    Cost Monitoring

    import json
    from datetime import datetime, timedelta
    from collections import defaultdict
    
    class CostMonitor:
        """Monitor and estimate vector database costs."""
    
        def __init__(self):
            self.operations = defaultdict(int)
            self.pricing = {
                "serverless_write_units": 0.0000025,  # per write unit
                "serverless_read_units": 0.00000625,  # per read unit
                "serverless_storage_gb": 0.095,  # per GB per month
                "p1_x1_pod": 0.096,  # per hour
                "p2_x1_pod": 0.240,  # per hour
            }
    
        def track_operation(self, operation_type: str, units: int = 1):
            """Track database operations."""
            self.operations[operation_type] += units
    
        def estimate_monthly_cost(
            self,
            deployment_type: str,
            storage_gb: float = 0,
            pod_type: str = None
        ) -> Dict:
            """Estimate monthly costs."""
            costs = {}
    
            if deployment_type == "serverless":
                # Storage cost
                storage_cost = storage_gb * self.pricing["serverless_storage_gb"]
    
                # Operation costs
                write_cost = (
                    self.operations["upsert"] *
                    self.pricing["serverless_write_units"]
                )
                read_cost = (
                    self.operations["query"] *
                    self.pricing["serverless_read_units"]
                )
    
                costs = {
                    "storage": storage_cost,
                    "writes": write_cost,
                    "reads": read_cost,
                    "total": storage_cost + write_cost + read_cost
                }
    
            elif deployment_type == "pods":
                # Fixed pod cost
                hours_per_month = 730
                pod_cost = self.pricing.get(f"{pod_type}_pod", 0) * hours_per_month
    
                costs = {
                    "pod": pod_cost,
                    "total": pod_cost
                }
    
            return costs
    
        def get_cost_report(self) -> str:
            """Generate cost report."""
            report = f"\n{'=' * 50}\n"
            report += "VECTOR DATABASE COST REPORT\n"
            report += f"{'=' * 50}\n\n"
            report += "Operations Summary:\n"
    
            for operation, count in self.operations.items():
                report += f"  {operation}: {count:,}\n"
    
            report += f"\n{'=' * 50}\n"
            return report
    
    # Usage
    cost_monitor = CostMonitor()
    
    # Track operations
    def monitored_upsert(vectors, **kwargs):
        cost_monitor.track_operation("upsert", len(vectors))
        return index.upsert(vectors=vectors, **kwargs)
    
    def monitored_query(vector, **kwargs):
        cost_monitor.track_operation("query", 1)
        return index.query(vector=vector, **kwargs)
    
    # Get cost estimate
    monthly_cost = cost_monitor.estimate_monthly_cost(
        deployment_type="serverless",
        storage_gb=10.5
    )
    
    print(f"Estimated monthly cost: ${monthly_cost['total']:.2f}")
    print(cost_monitor.get_cost_report())
    

    Summary

    This comprehensive guide covers all aspects of vector database management across Pinecone, Weaviate, and Chroma. Key takeaways:

    1. Choose the right database: Pinecone for production scale, Weaviate for knowledge graphs, Chroma for local development
    2. Optimize embeddings: Balance dimension size with accuracy and cost
    3. Use metadata filtering: Combine vector similarity with structured filtering for powerful search
    4. Implement hybrid search: Combine dense and sparse vectors for best results
    5. Scale efficiently: Use batching, caching, and appropriate index configurations
    6. Monitor and optimize costs: Track usage and choose the right deployment type

    For more information:

    • Pinecone Documentation: https://docs.pinecone.io
    • Weaviate Documentation: https://weaviate.io/developers/weaviate
    • Chroma Documentation: https://docs.trychroma.com
    Recommended Servers
    GroundRoute: Web Search for AI Agents across 6 Engines ($10 free credit)
    GroundRoute: Web Search for AI Agents across 6 Engines ($10 free credit)
    ThinAir Data
    ThinAir Data
    Memory Tool
    Memory Tool
    Repository
    manutej/luxor-claude-marketplace
    Files