Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    neversight

    batch-processing

    neversight/batch-processing
    Data & Analytics
    2

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    Message Batches API for Claude with 50% cost savings on bulk processing. Activate for batch jobs, JSONL processing, bulk analysis, and cost optimization.

    SKILL.md

    Batch Processing Skill

    Leverage Anthropic's Message Batches API for 50% cost savings on bulk processing workloads.

    When to Use This Skill

    • Processing 10K+ documents
    • Model evaluation and benchmarks
    • ETL and data enrichment
    • Training data generation
    • Bulk content analysis
    • Any non-time-sensitive workload

    Cost Savings

    Processing Type Cost
    Standard API 100%
    Batch API 50%

    Example: 1M tokens @ $3/1M = $3 (standard) vs $1.50 (batch)

    Batch Lifecycle

    Created → Processing → Ended (completed/failed/expired/canceled)
    
    • Processing time: Up to 24 hours
    • Results retention: 29 days
    • Full feature support: Tools, vision, system prompts

    Core Implementation

    Step 1: Create JSONL File

    import json
    
    # Each line is a complete request
    requests = [
        {
            "custom_id": "request-1",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Summarize: Document 1..."}]
            }
        },
        {
            "custom_id": "request-2",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [{"role": "user", "content": "Summarize: Document 2..."}]
            }
        }
    ]
    
    # Write JSONL file
    with open("batch_requests.jsonl", "w") as f:
        for request in requests:
            f.write(json.dumps(request) + "\n")
    

    Step 2: Submit Batch

    import anthropic
    
    client = anthropic.Anthropic()
    
    # Create batch from file
    with open("batch_requests.jsonl", "rb") as f:
        batch = client.beta.messages.batches.create(
            requests=f
        )
    
    print(f"Batch ID: {batch.id}")
    print(f"Status: {batch.processing_status}")
    

    Step 3: Poll for Completion

    import time
    
    def wait_for_batch(client, batch_id, poll_interval=60):
        """Poll until batch completes"""
        while True:
            batch = client.beta.messages.batches.retrieve(batch_id)
    
            if batch.processing_status == "ended":
                print(f"Batch completed!")
                print(f"  Succeeded: {batch.request_counts.succeeded}")
                print(f"  Errored: {batch.request_counts.errored}")
                print(f"  Expired: {batch.request_counts.expired}")
                return batch
    
            print(f"Status: {batch.processing_status} - waiting...")
            time.sleep(poll_interval)
    

    Step 4: Stream Results

    def process_results(client, batch_id):
        """Stream and process batch results"""
        results = {}
    
        for result in client.beta.messages.batches.results(batch_id):
            custom_id = result.custom_id
    
            if result.result.type == "succeeded":
                message = result.result.message
                results[custom_id] = {
                    "status": "success",
                    "content": message.content[0].text,
                    "usage": message.usage
                }
            elif result.result.type == "errored":
                results[custom_id] = {
                    "status": "error",
                    "error": result.result.error
                }
            elif result.result.type == "expired":
                results[custom_id] = {
                    "status": "expired"
                }
    
        return results
    

    Complete Workflow

    import anthropic
    import json
    import time
    
    def run_batch_job(documents):
        """Complete batch processing workflow"""
        client = anthropic.Anthropic()
    
        # 1. Create requests
        requests = []
        for i, doc in enumerate(documents):
            requests.append({
                "custom_id": f"doc-{i}",
                "params": {
                    "model": "claude-sonnet-4-20250514",
                    "max_tokens": 1024,
                    "messages": [{"role": "user", "content": f"Summarize: {doc}"}]
                }
            })
    
        # 2. Write JSONL
        with open("batch.jsonl", "w") as f:
            for req in requests:
                f.write(json.dumps(req) + "\n")
    
        # 3. Submit batch
        with open("batch.jsonl", "rb") as f:
            batch = client.beta.messages.batches.create(requests=f)
    
        print(f"Batch submitted: {batch.id}")
    
        # 4. Wait for completion
        while True:
            batch = client.beta.messages.batches.retrieve(batch.id)
            if batch.processing_status == "ended":
                break
            time.sleep(60)
    
        # 5. Collect results
        results = {}
        for result in client.beta.messages.batches.results(batch.id):
            if result.result.type == "succeeded":
                results[result.custom_id] = result.result.message.content[0].text
    
        return results
    

    Error Handling

    Retry Failed Requests

    def retry_failed(client, original_batch_id):
        """Retry failed and expired requests"""
        retry_requests = []
    
        for result in client.beta.messages.batches.results(original_batch_id):
            if result.result.type in ["errored", "expired"]:
                # Re-create the request (you'll need to store original params)
                retry_requests.append({
                    "custom_id": result.custom_id,
                    "params": get_original_params(result.custom_id)  # Your implementation
                })
    
        if retry_requests:
            # Write and submit retry batch
            with open("retry.jsonl", "w") as f:
                for req in retry_requests:
                    f.write(json.dumps(req) + "\n")
    
            with open("retry.jsonl", "rb") as f:
                return client.beta.messages.batches.create(requests=f)
    
        return None
    

    Error Types

    Result Type Action
    succeeded Process normally
    errored Check error, may retry
    expired Request took >24h, retry
    canceled Batch was canceled

    API Reference

    Endpoints

    Method Endpoint Purpose
    POST /v1/messages/batches Create batch
    GET /v1/messages/batches/{id} Get batch status
    GET /v1/messages/batches List batches
    POST /v1/messages/batches/{id}/cancel Cancel batch
    GET /v1/messages/batches/{id}/results Stream results

    Request Format

    {
        "custom_id": "unique-identifier",
        "params": {
            "model": "claude-sonnet-4-20250514",
            "max_tokens": 1024,
            "system": "Optional system prompt",
            "messages": [{"role": "user", "content": "..."}],
            "tools": [],
            "temperature": 0.7
        }
    }
    

    Best Practices

    DO:

    • Use for workloads >100 requests
    • Include unique custom_id for tracking
    • Store original params for retry logic
    • Monitor batch status via polling
    • Process results as streaming JSONL

    DON'T:

    • Use for time-sensitive requests
    • Expect immediate results
    • Forget to handle expired requests
    • Ignore error results

    Cost Optimization Tips

    1. Batch similar requests - Same model, similar prompts
    2. Combine with caching - Cache static context before batch
    3. Use appropriate model - Haiku for simple tasks, Sonnet for complex
    4. Optimize prompts - Shorter prompts = lower cost

    Use Case: Document Processing

    # Process 10,000 documents at 50% cost
    documents = load_documents("data/*.txt")  # 10K files
    
    # Create batch with consistent format
    requests = [{
        "custom_id": doc.id,
        "params": {
            "model": "claude-haiku-4-20250514",  # Cheapest for bulk
            "max_tokens": 512,
            "system": "Extract key information as JSON.",
            "messages": [{"role": "user", "content": doc.content}]
        }
    } for doc in documents]
    
    # Run batch job
    results = run_batch_job(requests)
    # Cost: ~50% of standard API!
    

    See Also

    • llm-integration - API basics
    • prompt-caching - Cache context
    • streaming - Real-time output
    Recommended Servers
    Parallel Tasks
    Parallel Tasks
    supermemory
    supermemory
    Repository
    neversight/skills_feed