Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    wshobson

    rust-async-patterns

    wshobson/rust-async-patterns
    Coding
    28,185
    3 installs

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    Master Rust async programming with Tokio, async traits, error handling, and concurrent patterns. Use when building async Rust applications, implementing concurrent systems, or debugging async code.

    SKILL.md

    Rust Async Patterns

    Production patterns for async Rust programming with Tokio runtime, including tasks, channels, streams, and error handling.

    When to Use This Skill

    • Building async Rust applications
    • Implementing concurrent network services
    • Using Tokio for async I/O
    • Handling async errors properly
    • Debugging async code issues
    • Optimizing async performance

    Core Concepts

    1. Async Execution Model

    Future (lazy) → poll() → Ready(value) | Pending
                    ↑           ↓
                  Waker ← Runtime schedules
    

    2. Key Abstractions

    Concept Purpose
    Future Lazy computation that may complete later
    async fn Function returning impl Future
    await Suspend until future completes
    Task Spawned future running concurrently
    Runtime Executor that polls futures

    Quick Start

    # Cargo.toml
    [dependencies]
    tokio = { version = "1", features = ["full"] }
    futures = "0.3"
    async-trait = "0.1"
    anyhow = "1.0"
    tracing = "0.1"
    tracing-subscriber = "0.3"
    
    use tokio::time::{sleep, Duration};
    use anyhow::Result;
    
    #[tokio::main]
    async fn main() -> Result<()> {
        // Initialize tracing
        tracing_subscriber::fmt::init();
    
        // Async operations
        let result = fetch_data("https://api.example.com").await?;
        println!("Got: {}", result);
    
        Ok(())
    }
    
    async fn fetch_data(url: &str) -> Result<String> {
        // Simulated async operation
        sleep(Duration::from_millis(100)).await;
        Ok(format!("Data from {}", url))
    }
    

    Patterns

    Pattern 1: Concurrent Task Execution

    use tokio::task::JoinSet;
    use anyhow::Result;
    
    // Spawn multiple concurrent tasks
    async fn fetch_all_concurrent(urls: Vec<String>) -> Result<Vec<String>> {
        let mut set = JoinSet::new();
    
        for url in urls {
            set.spawn(async move {
                fetch_data(&url).await
            });
        }
    
        let mut results = Vec::new();
        while let Some(res) = set.join_next().await {
            match res {
                Ok(Ok(data)) => results.push(data),
                Ok(Err(e)) => tracing::error!("Task failed: {}", e),
                Err(e) => tracing::error!("Join error: {}", e),
            }
        }
    
        Ok(results)
    }
    
    // With concurrency limit
    use futures::stream::{self, StreamExt};
    
    async fn fetch_with_limit(urls: Vec<String>, limit: usize) -> Vec<Result<String>> {
        stream::iter(urls)
            .map(|url| async move { fetch_data(&url).await })
            .buffer_unordered(limit) // Max concurrent tasks
            .collect()
            .await
    }
    
    // Select first to complete
    use tokio::select;
    
    async fn race_requests(url1: &str, url2: &str) -> Result<String> {
        select! {
            result = fetch_data(url1) => result,
            result = fetch_data(url2) => result,
        }
    }
    

    Pattern 2: Channels for Communication

    use tokio::sync::{mpsc, broadcast, oneshot, watch};
    
    // Multi-producer, single-consumer
    async fn mpsc_example() {
        let (tx, mut rx) = mpsc::channel::<String>(100);
    
        // Spawn producer
        let tx2 = tx.clone();
        tokio::spawn(async move {
            tx2.send("Hello".to_string()).await.unwrap();
        });
    
        // Consume
        while let Some(msg) = rx.recv().await {
            println!("Got: {}", msg);
        }
    }
    
    // Broadcast: multi-producer, multi-consumer
    async fn broadcast_example() {
        let (tx, _) = broadcast::channel::<String>(100);
    
        let mut rx1 = tx.subscribe();
        let mut rx2 = tx.subscribe();
    
        tx.send("Event".to_string()).unwrap();
    
        // Both receivers get the message
        let _ = rx1.recv().await;
        let _ = rx2.recv().await;
    }
    
    // Oneshot: single value, single use
    async fn oneshot_example() -> String {
        let (tx, rx) = oneshot::channel::<String>();
    
        tokio::spawn(async move {
            tx.send("Result".to_string()).unwrap();
        });
    
        rx.await.unwrap()
    }
    
    // Watch: single producer, multi-consumer, latest value
    async fn watch_example() {
        let (tx, mut rx) = watch::channel("initial".to_string());
    
        tokio::spawn(async move {
            loop {
                // Wait for changes
                rx.changed().await.unwrap();
                println!("New value: {}", *rx.borrow());
            }
        });
    
        tx.send("updated".to_string()).unwrap();
    }
    

    Pattern 3: Async Error Handling

    use anyhow::{Context, Result, bail};
    use thiserror::Error;
    
    #[derive(Error, Debug)]
    pub enum ServiceError {
        #[error("Network error: {0}")]
        Network(#[from] reqwest::Error),
    
        #[error("Database error: {0}")]
        Database(#[from] sqlx::Error),
    
        #[error("Not found: {0}")]
        NotFound(String),
    
        #[error("Timeout after {0:?}")]
        Timeout(std::time::Duration),
    }
    
    // Using anyhow for application errors
    async fn process_request(id: &str) -> Result<Response> {
        let data = fetch_data(id)
            .await
            .context("Failed to fetch data")?;
    
        let parsed = parse_response(&data)
            .context("Failed to parse response")?;
    
        Ok(parsed)
    }
    
    // Using custom errors for library code
    async fn get_user(id: &str) -> Result<User, ServiceError> {
        let result = db.query(id).await?;
    
        match result {
            Some(user) => Ok(user),
            None => Err(ServiceError::NotFound(id.to_string())),
        }
    }
    
    // Timeout wrapper
    use tokio::time::timeout;
    
    async fn with_timeout<T, F>(duration: Duration, future: F) -> Result<T, ServiceError>
    where
        F: std::future::Future<Output = Result<T, ServiceError>>,
    {
        timeout(duration, future)
            .await
            .map_err(|_| ServiceError::Timeout(duration))?
    }
    

    Pattern 4: Graceful Shutdown

    use tokio::signal;
    use tokio::sync::broadcast;
    use tokio_util::sync::CancellationToken;
    
    async fn run_server() -> Result<()> {
        // Method 1: CancellationToken
        let token = CancellationToken::new();
        let token_clone = token.clone();
    
        // Spawn task that respects cancellation
        tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = token_clone.cancelled() => {
                        tracing::info!("Task shutting down");
                        break;
                    }
                    _ = do_work() => {}
                }
            }
        });
    
        // Wait for shutdown signal
        signal::ctrl_c().await?;
        tracing::info!("Shutdown signal received");
    
        // Cancel all tasks
        token.cancel();
    
        // Give tasks time to cleanup
        tokio::time::sleep(Duration::from_secs(5)).await;
    
        Ok(())
    }
    
    // Method 2: Broadcast channel for shutdown
    async fn run_with_broadcast() -> Result<()> {
        let (shutdown_tx, _) = broadcast::channel::<()>(1);
    
        let mut rx = shutdown_tx.subscribe();
        tokio::spawn(async move {
            tokio::select! {
                _ = rx.recv() => {
                    tracing::info!("Received shutdown");
                }
                _ = async { loop { do_work().await } } => {}
            }
        });
    
        signal::ctrl_c().await?;
        let _ = shutdown_tx.send(());
    
        Ok(())
    }
    

    Pattern 5: Async Traits

    use async_trait::async_trait;
    
    #[async_trait]
    pub trait Repository {
        async fn get(&self, id: &str) -> Result<Entity>;
        async fn save(&self, entity: &Entity) -> Result<()>;
        async fn delete(&self, id: &str) -> Result<()>;
    }
    
    pub struct PostgresRepository {
        pool: sqlx::PgPool,
    }
    
    #[async_trait]
    impl Repository for PostgresRepository {
        async fn get(&self, id: &str) -> Result<Entity> {
            sqlx::query_as!(Entity, "SELECT * FROM entities WHERE id = $1", id)
                .fetch_one(&self.pool)
                .await
                .map_err(Into::into)
        }
    
        async fn save(&self, entity: &Entity) -> Result<()> {
            sqlx::query!(
                "INSERT INTO entities (id, data) VALUES ($1, $2)
                 ON CONFLICT (id) DO UPDATE SET data = $2",
                entity.id,
                entity.data
            )
            .execute(&self.pool)
            .await?;
            Ok(())
        }
    
        async fn delete(&self, id: &str) -> Result<()> {
            sqlx::query!("DELETE FROM entities WHERE id = $1", id)
                .execute(&self.pool)
                .await?;
            Ok(())
        }
    }
    
    // Trait object usage
    async fn process(repo: &dyn Repository, id: &str) -> Result<()> {
        let entity = repo.get(id).await?;
        // Process...
        repo.save(&entity).await
    }
    

    Pattern 6: Streams and Async Iteration

    use futures::stream::{self, Stream, StreamExt};
    use async_stream::stream;
    
    // Create stream from async iterator
    fn numbers_stream() -> impl Stream<Item = i32> {
        stream! {
            for i in 0..10 {
                tokio::time::sleep(Duration::from_millis(100)).await;
                yield i;
            }
        }
    }
    
    // Process stream
    async fn process_stream() {
        let stream = numbers_stream();
    
        // Map and filter
        let processed: Vec<_> = stream
            .filter(|n| futures::future::ready(*n % 2 == 0))
            .map(|n| n * 2)
            .collect()
            .await;
    
        println!("{:?}", processed);
    }
    
    // Chunked processing
    async fn process_in_chunks() {
        let stream = numbers_stream();
    
        let mut chunks = stream.chunks(3);
    
        while let Some(chunk) = chunks.next().await {
            println!("Processing chunk: {:?}", chunk);
        }
    }
    
    // Merge multiple streams
    async fn merge_streams() {
        let stream1 = numbers_stream();
        let stream2 = numbers_stream();
    
        let merged = stream::select(stream1, stream2);
    
        merged
            .for_each(|n| async move {
                println!("Got: {}", n);
            })
            .await;
    }
    

    Pattern 7: Resource Management

    use std::sync::Arc;
    use tokio::sync::{Mutex, RwLock, Semaphore};
    
    // Shared state with RwLock (prefer for read-heavy)
    struct Cache {
        data: RwLock<HashMap<String, String>>,
    }
    
    impl Cache {
        async fn get(&self, key: &str) -> Option<String> {
            self.data.read().await.get(key).cloned()
        }
    
        async fn set(&self, key: String, value: String) {
            self.data.write().await.insert(key, value);
        }
    }
    
    // Connection pool with semaphore
    struct Pool {
        semaphore: Semaphore,
        connections: Mutex<Vec<Connection>>,
    }
    
    impl Pool {
        fn new(size: usize) -> Self {
            Self {
                semaphore: Semaphore::new(size),
                connections: Mutex::new((0..size).map(|_| Connection::new()).collect()),
            }
        }
    
        async fn acquire(&self) -> PooledConnection<'_> {
            let permit = self.semaphore.acquire().await.unwrap();
            let conn = self.connections.lock().await.pop().unwrap();
            PooledConnection { pool: self, conn: Some(conn), _permit: permit }
        }
    }
    
    struct PooledConnection<'a> {
        pool: &'a Pool,
        conn: Option<Connection>,
        _permit: tokio::sync::SemaphorePermit<'a>,
    }
    
    impl Drop for PooledConnection<'_> {
        fn drop(&mut self) {
            if let Some(conn) = self.conn.take() {
                let pool = self.pool;
                tokio::spawn(async move {
                    pool.connections.lock().await.push(conn);
                });
            }
        }
    }
    

    Debugging Tips

    // Enable tokio-console for runtime debugging
    // Cargo.toml: tokio = { features = ["tracing"] }
    // Run: RUSTFLAGS="--cfg tokio_unstable" cargo run
    // Then: tokio-console
    
    // Instrument async functions
    use tracing::instrument;
    
    #[instrument(skip(pool))]
    async fn fetch_user(pool: &PgPool, id: &str) -> Result<User> {
        tracing::debug!("Fetching user");
        // ...
    }
    
    // Track task spawning
    let span = tracing::info_span!("worker", id = %worker_id);
    tokio::spawn(async move {
        // Enters span when polled
    }.instrument(span));
    

    Best Practices

    Do's

    • Use tokio::select! - For racing futures
    • Prefer channels - Over shared state when possible
    • Use JoinSet - For managing multiple tasks
    • Instrument with tracing - For debugging async code
    • Handle cancellation - Check CancellationToken

    Don'ts

    • Don't block - Never use std::thread::sleep in async
    • Don't hold locks across awaits - Causes deadlocks
    • Don't spawn unboundedly - Use semaphores for limits
    • Don't ignore errors - Propagate with ? or log
    • Don't forget Send bounds - For spawned futures
    Recommended Servers
    Vercel Grep
    Vercel Grep
    Thoughtbox
    Thoughtbox
    Cloudflare
    Cloudflare
    Repository
    wshobson/agents
    Files