Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    kriegcloud

    effect-concurrency-testing

    kriegcloud/effect-concurrency-testing
    Coding
    35

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    Test Effect concurrency primitives including PubSub, Deferred, Latch, Fiber coordination, SubscriptionRef, and Stream...

    SKILL.md

    Effect Concurrency Testing Skill

    This skill provides patterns for testing Effect's concurrency primitives: fibers, latches, deferreds, PubSub, SubscriptionRef, and streams.

    Core Principles

    CRITICAL: Choose the correct coordination primitive based on what you need to synchronize.

    Need Use
    Simple fiber yield Effect.yieldNow
    Forked PubSub subscriber ready yieldNow after fork, yieldNow after each publish
    Wait for subscriber ready Deferred.make() + Deferred.await
    Wait for stream element Effect.makeLatch() + Stream.tap(() => latch.open)
    Time-dependent behavior TestClock.adjust
    Verify events published PubSub.subscribe + PubSub.takeAll
    Check fiber status fiber.unsafePoll()

    Fiber Coordination Patterns

    Effect.yieldNow - Simple Fiber Scheduling

    Use Effect.yieldNow when you need to allow other fibers to execute. This is preferred over TestClock.adjust for non-time-dependent code.

    import { it } from "@effect/vitest"
    import { Effect, Exit, Fiber } from "effect"
    
    it.effect("fiber polling with yieldNow", () =>
      Effect.gen(function* () {
        const latch = yield* Effect.makeLatch()
    
        const fiber = yield* latch.await.pipe(Effect.fork)
    
        yield* Effect.yieldNow()
    
        expect(fiber.unsafePoll()).toBeNull()
    
        yield* latch.open
    
        expect(yield* fiber.await).toEqual(Exit.void)
      })
    )
    

    Latch - Explicit Coordination

    Effect.makeLatch() creates a gate that blocks fibers until opened:

    import { it } from "@effect/vitest"
    import { Effect, Fiber } from "effect"
    
    it.effect("latch coordination", () =>
      Effect.gen(function* () {
        const latch = yield* Effect.makeLatch()
    
        const fiber = yield* Effect.gen(function* () {
          yield* latch.await
          return "completed"
        }).pipe(Effect.fork)
    
        yield* Effect.yieldNow()
        expect(fiber.unsafePoll()).toBeNull()
    
        yield* latch.open
    
        const result = yield* Fiber.join(fiber)
        expect(result).toBe("completed")
      })
    )
    

    Latch Operations

    import { Effect } from "effect"
    
    declare const latch: Effect.Effect.Success<ReturnType<typeof Effect.makeLatch>>
    
    latch.await       // Wait until latch is open
    latch.open        // Open the latch (allows waiters through)
    latch.close       // Close the latch (blocks future waiters)
    latch.release     // Open once, then close
    latch.whenOpen    // Run effect only when latch is open
    

    Deferred - Signal Readiness Between Fibers

    Use Deferred when one fiber needs to signal another with a value:

    import { it } from "@effect/vitest"
    import { Effect, Deferred, Fiber } from "effect"
    
    it.effect("deferred signaling", () =>
      Effect.gen(function* () {
        const signal = yield* Deferred.make<number>()
    
        const consumer = yield* Effect.gen(function* () {
          const value = yield* Deferred.await(signal)
          return value * 2
        }).pipe(Effect.fork)
    
        yield* Deferred.succeed(signal, 21)
    
        const result = yield* Fiber.join(consumer)
        expect(result).toBe(42)
      })
    )
    

    fiber.unsafePoll() - Check Completion Without Blocking

    import { Effect, Exit, Fiber } from "effect"
    
    declare const fiber: Fiber.RuntimeFiber<string>
    
    fiber.unsafePoll()
    // Returns null if running
    // Returns Exit<A, E> if completed (success, failure, or interrupted)
    
    // Check if still running
    expect(fiber.unsafePoll()).toBeNull()
    
    // Check if completed
    expect(fiber.unsafePoll()).toBeDefined()
    
    // Check specific completion
    expect(fiber.unsafePoll()).toEqual(Exit.succeed("result"))
    

    PubSub Event Testing

    Direct Event Verification

    Use Effect.scoped to manage PubSub subscription lifecycle:

    import { it } from "@effect/vitest"
    import { Effect, PubSub } from "effect"
    
    it.effect("verify published events", () =>
      Effect.gen(function* () {
        const pubsub = yield* PubSub.unbounded<string>()
    
        yield* Effect.scoped(
          Effect.gen(function* () {
            const sub = yield* PubSub.subscribe(pubsub)
    
            yield* PubSub.publish(pubsub, "event-1")
            yield* PubSub.publish(pubsub, "event-2")
    
            const events = yield* PubSub.takeAll(sub)
    
            expect(events).toEqual(["event-1", "event-2"])
          })
        )
      })
    )
    

    Testing Event Publishers

    When testing a service that publishes events:

    import { it } from "@effect/vitest"
    import { Effect, PubSub, Context, Layer } from "effect"
    
    interface UserEvent {
      readonly type: "created" | "deleted"
      readonly userId: string
    }
    
    class EventBus extends Context.Service<
      EventBus,
      PubSub.PubSub<UserEvent>
    >()("EventBus") {}
    
    class UserService extends Context.Service<
      UserService,
      { readonly createUser: (id: string) => Effect.Effect<void> }
    >()("UserService") {}
    
    declare const UserServiceLive: Layer.Layer<UserService, never, EventBus>
    
    it.effect("should publish user created event", () =>
      Effect.gen(function* () {
        const pubsub = yield* PubSub.unbounded<UserEvent>()
    
        yield* Effect.scoped(
          Effect.gen(function* () {
            const sub = yield* PubSub.subscribe(pubsub)
    
            const service = yield* UserService
            yield* service.createUser("user-123")
    
            const events = yield* PubSub.takeAll(sub)
    
            expect(events).toHaveLength(1)
            expect(events[0]).toEqual({
              type: "created",
              userId: "user-123"
            })
          })
        )
      }).pipe(
        Effect.provide(UserServiceLive),
        Effect.provide(Layer.succeed(EventBus, pubsub))
      )
    )
    

    Concurrent Publisher/Subscriber Testing

    import { it } from "@effect/vitest"
    import { Effect, PubSub, Fiber } from "effect"
    import * as A from "effect/Array"
    
    it.effect("concurrent publishers and subscribers", () =>
      Effect.gen(function* () {
        const values = A.range(0, 9)
        const latch = yield* Effect.makeLatch()
        const pubsub = yield* PubSub.bounded<number>(10)
    
        const subscriber = yield* PubSub.subscribe(pubsub).pipe(
          Effect.flatMap((sub) =>
            latch.await.pipe(
              Effect.andThen(
                Effect.forEach(values, () => PubSub.take(sub))
              )
            )
          ),
          Effect.scoped,
          Effect.forkScoped
        )
    
        yield* PubSub.publishAll(pubsub, values)
        yield* latch.open
    
        const result = yield* Fiber.join(subscriber)
        expect(result).toEqual(values)
      })
    )
    

    Forked Fiber PubSub Subscriptions

    When testing forked fibers that subscribe to a PubSub, proper yield ordering is critical to avoid losing events.

    Correct Order: yieldNow After Subscribe, Then After Each Publish

    import { it, expect } from "@effect/vitest"
    import { Effect, PubSub, Ref } from "effect"
    import * as A from "effect/Array"
    
    it.effect("forked subscriber receives all events", () =>
      Effect.gen(function* () {
        const pubsub = yield* PubSub.unbounded<string>()
        const received = yield* Ref.make<string[]>([])
    
        yield* Effect.scoped(
          Effect.gen(function* () {
            const sub = yield* PubSub.subscribe(pubsub)
    
            yield* Effect.fork(
              Effect.forever(
                Effect.gen(function* () {
                  const msg = yield* PubSub.take(sub)
                  yield* Ref.update(received, A.append(msg))
                })
              )
            )
    
            yield* Effect.yieldNow()  // Let forked fiber start and become ready
    
            yield* PubSub.publish(pubsub, "event-1")
            yield* Effect.yieldNow()  // Let fiber process event-1
    
            yield* PubSub.publish(pubsub, "event-2")
            yield* Effect.yieldNow()  // Let fiber process event-2
    
            const events = yield* Ref.get(received)
            expect(events).toEqual(["event-1", "event-2"])
          })
        )
      })
    )
    

    Why This Order Matters

    The fiber scheduling model requires explicit yields at specific points:

    1. yieldNow after subscribe/fork: The forked fiber needs a chance to execute its first instruction (the PubSub.take) before any events are published. Without this yield, the fiber hasn't started yet.

    2. yieldNow after each publish: After publishing, the subscriber fiber needs a turn to process the event. Without yielding, you may publish multiple events before the fiber processes any.

    Common Mistake: Events Lost

    import { Effect, PubSub, Ref } from "effect"
    
    // BAD - Events are lost because fiber hasn't started
    Effect.gen(function* () {
      const pubsub = yield* PubSub.unbounded<string>()
      const received = yield* Ref.make<string[]>([])
    
      yield* Effect.scoped(
        Effect.gen(function* () {
          const sub = yield* PubSub.subscribe(pubsub)
    
          yield* Effect.fork(/* subscriber logic */)
    
          // WRONG: Publishing immediately - fiber not ready yet!
          yield* PubSub.publish(pubsub, "event-1")
          yield* PubSub.publish(pubsub, "event-2")
    
          yield* Effect.yieldNow()  // Too late - events already missed
    
          const events = yield* Ref.get(received)
          // events may be [] or incomplete!
        })
      )
    })
    

    Single yieldNow Is Sufficient

    Unlike sleep(0) patterns in other runtimes, Effect's yieldNow is deterministic within the fiber scheduler. A single yieldNow is sufficient at each synchronization point - no need for multiple yields or retry loops.

    import { Effect, PubSub } from "effect"
    
    // GOOD - Single yield at each point
    yield* Effect.fork(subscriber)
    yield* Effect.yieldNow()  // One yield is enough
    
    yield* PubSub.publish(pubsub, "event")
    yield* Effect.yieldNow()  // One yield is enough
    
    // BAD - Unnecessary multiple yields
    yield* Effect.fork(subscriber)
    yield* Effect.yieldNow()
    yield* Effect.yieldNow()  // Redundant
    yield* Effect.yieldNow()  // Redundant
    

    Testing Observer Pattern with Session

    This pattern applies to any forked subscriber, including observer patterns:

    import { it, expect } from "@effect/vitest"
    import { Effect } from "effect"
    
    declare const Observer: {
      attach: (
        session: unknown,
        observer: unknown,
        args: unknown
      ) => Effect.Effect<void>
    }
    
    declare const Session: {
      publish: (session: unknown, event: unknown) => Effect.Effect<void>
    }
    
    declare const session: unknown
    declare const observer: unknown
    declare const args: unknown
    declare const event1: unknown
    declare const event2: unknown
    declare const getResults: () => Effect.Effect<unknown[]>
    
    it.effect("observer receives session events", () =>
      Effect.gen(function* () {
        yield* Observer.attach(session, observer, args)  // Forks subscriber
        yield* Effect.yieldNow()                         // Let fiber start
    
        yield* Session.publish(session, event1)
        yield* Effect.yieldNow()                         // Let event process
    
        yield* Session.publish(session, event2)
        yield* Effect.yieldNow()                         // Let event process
    
        const results = yield* getResults()
        expect(results).toHaveLength(2)
      })
    )
    

    SubscriptionRef Testing

    Testing Stream Changes with Latches

    The latch pattern ensures the stream subscription is ready before mutations:

    import { it } from "@effect/vitest"
    import { Effect, Fiber, Number } from "effect"
    import { Stream, SubscriptionRef } from "effect/stream"
    
    it.effect("multiple subscribers can receive changes", () =>
      Effect.gen(function* () {
        const ref = yield* SubscriptionRef.make(0)
        const latch1 = yield* Effect.makeLatch()
        const latch2 = yield* Effect.makeLatch()
    
        const fiber1 = yield* SubscriptionRef.changes(ref).pipe(
          Stream.tap(() => latch1.open),
          Stream.take(3),
          Stream.runCollect,
          Effect.forkScoped
        )
    
        yield* latch1.await
        yield* SubscriptionRef.update(ref, Number.increment)
    
        const fiber2 = yield* SubscriptionRef.changes(ref).pipe(
          Stream.tap(() => latch2.open),
          Stream.take(2),
          Stream.runCollect,
          Effect.forkScoped
        )
    
        yield* latch2.await
        yield* SubscriptionRef.update(ref, Number.increment)
    
        const result1 = yield* Fiber.join(fiber1)
        const result2 = yield* Fiber.join(fiber2)
    
        expect(result1).toEqual([0, 1, 2])
        expect(result2).toEqual([1, 2])
      })
    )
    

    Testing Subscription Interruption

    import { it } from "@effect/vitest"
    import { Effect, Exit, Fiber, Number, Cause } from "effect"
    import { Pull, Stream, SubscriptionRef } from "effect/stream"
    
    it.effect("subscriptions are interruptible", () =>
      Effect.gen(function* () {
        const ref = yield* SubscriptionRef.make(0)
        const latch = yield* Effect.makeLatch()
    
        const fiber = yield* SubscriptionRef.changes(ref).pipe(
          Stream.tap(() => latch.open),
          Stream.take(10),
          Stream.runCollect,
          Effect.forkScoped
        )
    
        yield* latch.await
        yield* SubscriptionRef.update(ref, Number.increment)
        yield* Fiber.interrupt(fiber)
    
        const result = yield* Fiber.await(fiber)
    
        expect(
          Exit.isFailure(result) && Pull.isHaltCause(result.cause)
        ).toBe(true)
      })
    )
    

    Stream Testing

    Collecting Stream Results

    import { it } from "@effect/vitest"
    import { Effect } from "effect"
    import { Stream } from "effect/stream"
    
    it.effect("should collect stream elements", () =>
      Effect.gen(function* () {
        const result = yield* Stream.make(1, 2, 3, 4, 5).pipe(
          Stream.filter((n) => n % 2 === 0),
          Stream.runCollect
        )
    
        expect(result).toEqual([2, 4])
      })
    )
    

    Testing Stream Side Effects

    import { it } from "@effect/vitest"
    import { Effect, Ref } from "effect"
    import { Stream } from "effect/stream"
    
    it.effect("should track side effects", () =>
      Effect.gen(function* () {
        const log = yield* Ref.make<string[]>([])
    
        yield* Stream.make("a", "b", "c").pipe(
          Stream.tap((item) => Ref.update(log, (items) => [...items, item])),
          Stream.runDrain
        )
    
        const logged = yield* Ref.get(log)
        expect(logged).toEqual(["a", "b", "c"])
      })
    )
    

    Testing Stream Errors

    import { it } from "@effect/vitest"
    import { Effect, Exit, Data } from "effect"
    import { Stream } from "effect/stream"
    
    class StreamError extends Data.TaggedError("StreamError")<{
      readonly message: string
    }> {}
    
    it.effect("should handle stream errors", () =>
      Effect.gen(function* () {
        const result = yield* Stream.make(1, 2, 3).pipe(
          Stream.mapEffect((n) =>
            n === 2
              ? Effect.fail(new StreamError({ message: "boom" }))
              : Effect.succeed(n)
          ),
          Stream.runCollect,
          Effect.exit
        )
    
        expect(Exit.isFailure(result)).toBe(true)
      })
    )
    

    Testing Stream Finalization

    import { it } from "@effect/vitest"
    import { Effect, Ref } from "effect"
    import { Stream } from "effect/stream"
    
    it.effect("should run finalizers", () =>
      Effect.gen(function* () {
        const finalized = yield* Ref.make(false)
    
        yield* Stream.make(1, 2, 3).pipe(
          Stream.ensuring(Ref.set(finalized, true)),
          Stream.take(1),
          Stream.runDrain
        )
    
        expect(yield* Ref.get(finalized)).toBe(true)
      })
    )
    

    Interruption Testing

    Testing Fiber Interruption

    import { it } from "@effect/vitest"
    import { Effect, Exit, Fiber, Cause } from "effect"
    
    it.effect("should handle interruption", () =>
      Effect.gen(function* () {
        const fiber = yield* Effect.never.pipe(Effect.fork)
    
        yield* Fiber.interrupt(fiber)
    
        const result = yield* Fiber.await(fiber)
    
        expect(Exit.isInterrupted(result)).toBe(true)
      })
    )
    

    Testing Interrupted-Only Cause

    import { it } from "@effect/vitest"
    import { Effect, Exit, Fiber, Cause } from "effect"
    
    it.effect("should have interrupted-only cause", () =>
      Effect.gen(function* () {
        const fiber = yield* Effect.never.pipe(Effect.fork)
    
        yield* Fiber.interrupt(fiber)
    
        const result = yield* Fiber.await(fiber)
    
        expect(
          Exit.isFailure(result) && Cause.isInterruptedOnly(result.cause)
        ).toBe(true)
      })
    )
    

    Time-Dependent Concurrency Testing

    Use TestClock only when testing time-dependent behavior like delays, timeouts, or schedules.

    import { it } from "@effect/vitest"
    import { Effect, Fiber, TestClock, Duration } from "effect"
    
    it.effect("should handle delayed concurrent operations", () =>
      Effect.gen(function* () {
        const fiber = yield* Effect.gen(function* () {
          yield* Effect.sleep(Duration.seconds(5))
          return "done"
        }).pipe(Effect.fork)
    
        yield* TestClock.adjust(Duration.seconds(5))
    
        const result = yield* Fiber.join(fiber)
        expect(result).toBe("done")
      })
    )
    

    Anti-Patterns

    DON'T use TestClock for non-time-dependent code

    import { Effect, TestClock, Duration } from "effect"
    
    // BAD - Using TestClock when not needed
    Effect.gen(function* () {
      const fiber = yield* someEffect.pipe(Effect.fork)
      yield* TestClock.adjust(Duration.millis(100))
      yield* Fiber.join(fiber)
    })
    
    // GOOD - Use yieldNow for simple yielding
    Effect.gen(function* () {
      const fiber = yield* someEffect.pipe(Effect.fork)
      yield* Effect.yieldNow()
      yield* Fiber.join(fiber)
    })
    

    DON'T poll in a loop without yieldNow

    import { Effect, Fiber } from "effect"
    
    declare const fiber: Fiber.RuntimeFiber<void>
    
    // BAD - Busy loop
    while (fiber.unsafePoll() === null) {
      // Spins forever!
    }
    
    // GOOD - Yield between polls or use Fiber.await
    Effect.gen(function* () {
      while (fiber.unsafePoll() === null) {
        yield* Effect.yieldNow()
      }
    })
    
    // BETTER - Just await the fiber
    Effect.gen(function* () {
      yield* Fiber.await(fiber)
    })
    

    DON'T forget Effect.scoped for PubSub subscriptions

    import { Effect, PubSub } from "effect"
    
    declare const pubsub: PubSub.PubSub<string>
    
    // BAD - Subscription leaks
    Effect.gen(function* () {
      const sub = yield* PubSub.subscribe(pubsub)
      // Sub is never cleaned up!
    })
    
    // GOOD - Scoped subscription
    Effect.gen(function* () {
      yield* Effect.scoped(
        Effect.gen(function* () {
          const sub = yield* PubSub.subscribe(pubsub)
          const events = yield* PubSub.takeAll(sub)
          // Sub cleaned up when scope closes
        })
      )
    })
    

    DON'T start subscriptions after mutations

    import { Effect } from "effect"
    import { Stream, SubscriptionRef } from "effect/stream"
    
    declare const ref: SubscriptionRef.SubscriptionRef<number>
    
    // BAD - May miss events
    Effect.gen(function* () {
      yield* SubscriptionRef.update(ref, (n) => n + 1)
      const fiber = yield* SubscriptionRef.changes(ref).pipe(
        Stream.take(1),
        Stream.runCollect,
        Effect.fork
      )
      // Subscription started after mutation - may miss it!
    })
    
    // GOOD - Use latch to ensure subscription is ready
    Effect.gen(function* () {
      const latch = yield* Effect.makeLatch()
    
      const fiber = yield* SubscriptionRef.changes(ref).pipe(
        Stream.tap(() => latch.open),
        Stream.take(2),
        Stream.runCollect,
        Effect.forkScoped
      )
    
      yield* latch.await
      yield* SubscriptionRef.update(ref, (n) => n + 1)
    
      const result = yield* Fiber.join(fiber)
    })
    

    Quality Checklist

    • Using correct coordination primitive for the use case
    • Effect.scoped wraps PubSub subscriptions
    • Latches ensure stream subscriptions are ready before mutations
    • Effect.yieldNow after fork to let subscriber fiber start
    • Effect.yieldNow after each publish to let fiber process event
    • Effect.yieldNow used instead of TestClock for non-time-dependent code
    • Fiber interruption tested with Exit.isInterrupted or Cause.isInterruptedOnly
    • Stream finalizers verified with Stream.ensuring
    • No busy polling without yields
    • Test is deterministic (no race conditions)
    Recommended Servers
    Postman
    Postman
    OpenZeppelin
    OpenZeppelin
    Repository
    kriegcloud/beep-effect
    Files