Smithery Logo
MCPsSkillsDocsPricing
Login
Smithery Logo

Accelerating the Agent Economy

Resources

DocumentationPrivacy PolicySystem Status

Company

PricingAboutBlog

Connect

© 2026 Smithery. All rights reserved.

    doanchienthangdev

    event-driven-architecture

    doanchienthangdev/event-driven-architecture
    DevOps
    2
    1 installs

    About

    SKILL.md

    Install

    Install via Skills CLI

    or add to your agent
    • Claude Code
      Claude Code
    • Codex
      Codex
    • OpenClaw
      OpenClaw
    • Cursor
      Cursor
    • Amp
      Amp
    • GitHub Copilot
      GitHub Copilot
    • Gemini CLI
      Gemini CLI
    • Kilo Code
      Kilo Code
    • Junie
      Junie
    • Replit
      Replit
    • Windsurf
      Windsurf
    • Cline
      Cline
    • Continue
      Continue
    • OpenCode
      OpenCode
    • OpenHands
      OpenHands
    • Roo Code
      Roo Code
    • Augment
      Augment
    • Goose
      Goose
    • Trae
      Trae
    • Zencoder
      Zencoder
    • Antigravity
      Antigravity
    ├─
    ├─
    └─

    About

    Event sourcing, CQRS, and message queue patterns with RabbitMQ and Kafka for distributed systems

    SKILL.md

    Event-Driven Architecture

    Implement event-driven systems with event sourcing, CQRS, and message queues. This skill covers distributed patterns for scalable, resilient applications.

    Purpose

    Build loosely coupled, scalable systems:

    • Implement event sourcing for audit trails
    • Apply CQRS for read/write optimization
    • Use message queues for async processing
    • Handle distributed transactions with sagas
    • Ensure eventual consistency
    • Build replay and recovery capabilities

    Features

    1. Event Sourcing

    // Event definitions
    interface DomainEvent {
      eventId: string;
      eventType: string;
      aggregateId: string;
      aggregateType: string;
      timestamp: Date;
      version: number;
      data: Record<string, any>;
      metadata: {
        userId?: string;
        correlationId?: string;
        causationId?: string;
      };
    }
    
    // Order aggregate events
    type OrderEvent =
      | { type: 'OrderCreated'; data: { customerId: string; items: OrderItem[] } }
      | { type: 'OrderItemAdded'; data: { item: OrderItem } }
      | { type: 'OrderItemRemoved'; data: { itemId: string } }
      | { type: 'OrderSubmitted'; data: { submittedAt: Date } }
      | { type: 'PaymentReceived'; data: { paymentId: string; amount: number } }
      | { type: 'OrderShipped'; data: { trackingNumber: string; carrier: string } }
      | { type: 'OrderDelivered'; data: { deliveredAt: Date } }
      | { type: 'OrderCancelled'; data: { reason: string } };
    
    // Event store
    class EventStore {
      async append(
        aggregateId: string,
        events: DomainEvent[],
        expectedVersion: number
      ): Promise<void> {
        // Optimistic concurrency check
        const currentVersion = await this.getVersion(aggregateId);
    
        if (currentVersion !== expectedVersion) {
          throw new ConcurrencyError(
            `Expected version ${expectedVersion}, but found ${currentVersion}`
          );
        }
    
        // Append events atomically
        await db.$transaction(async (tx) => {
          for (let i = 0; i < events.length; i++) {
            await tx.event.create({
              data: {
                ...events[i],
                version: expectedVersion + i + 1,
              },
            });
          }
        });
    
        // Publish to event bus
        for (const event of events) {
          await eventBus.publish(event);
        }
      }
    
      async getEvents(
        aggregateId: string,
        fromVersion?: number
      ): Promise<DomainEvent[]> {
        return db.event.findMany({
          where: {
            aggregateId,
            version: fromVersion ? { gt: fromVersion } : undefined,
          },
          orderBy: { version: 'asc' },
        });
      }
    
      async getVersion(aggregateId: string): Promise<number> {
        const lastEvent = await db.event.findFirst({
          where: { aggregateId },
          orderBy: { version: 'desc' },
        });
    
        return lastEvent?.version ?? 0;
      }
    }
    
    // Aggregate with event sourcing
    class OrderAggregate {
      private id: string;
      private state: OrderState;
      private version: number = 0;
      private uncommittedEvents: OrderEvent[] = [];
    
      static async load(eventStore: EventStore, id: string): Promise<OrderAggregate> {
        const aggregate = new OrderAggregate(id);
        const events = await eventStore.getEvents(id);
    
        for (const event of events) {
          aggregate.apply(event, false);
        }
    
        return aggregate;
      }
    
      // Command handlers
      create(customerId: string, items: OrderItem[]): void {
        if (this.state) {
          throw new Error('Order already exists');
        }
    
        this.applyChange({
          type: 'OrderCreated',
          data: { customerId, items },
        });
      }
    
      addItem(item: OrderItem): void {
        this.ensureState(['draft']);
    
        this.applyChange({
          type: 'OrderItemAdded',
          data: { item },
        });
      }
    
      submit(): void {
        this.ensureState(['draft']);
    
        if (this.state.items.length === 0) {
          throw new Error('Cannot submit empty order');
        }
    
        this.applyChange({
          type: 'OrderSubmitted',
          data: { submittedAt: new Date() },
        });
      }
    
      // Event application
      private apply(event: OrderEvent, isNew: boolean): void {
        switch (event.type) {
          case 'OrderCreated':
            this.state = {
              status: 'draft',
              customerId: event.data.customerId,
              items: event.data.items,
              total: this.calculateTotal(event.data.items),
            };
            break;
    
          case 'OrderItemAdded':
            this.state.items.push(event.data.item);
            this.state.total = this.calculateTotal(this.state.items);
            break;
    
          case 'OrderSubmitted':
            this.state.status = 'submitted';
            this.state.submittedAt = event.data.submittedAt;
            break;
    
          // ... other event handlers
        }
    
        this.version++;
    
        if (isNew) {
          this.uncommittedEvents.push(event);
        }
      }
    
      private applyChange(event: OrderEvent): void {
        this.apply(event, true);
      }
    
      async save(eventStore: EventStore): Promise<void> {
        const domainEvents = this.uncommittedEvents.map((e, i) => ({
          eventId: uuid(),
          eventType: e.type,
          aggregateId: this.id,
          aggregateType: 'Order',
          timestamp: new Date(),
          version: this.version - this.uncommittedEvents.length + i + 1,
          data: e.data,
          metadata: {},
        }));
    
        await eventStore.append(
          this.id,
          domainEvents,
          this.version - this.uncommittedEvents.length
        );
    
        this.uncommittedEvents = [];
      }
    }
    

    2. CQRS Pattern

    // Command side (writes)
    interface Command {
      type: string;
      payload: any;
      metadata: {
        userId: string;
        timestamp: Date;
        correlationId: string;
      };
    }
    
    class CommandBus {
      private handlers = new Map<string, CommandHandler>();
    
      register(commandType: string, handler: CommandHandler): void {
        this.handlers.set(commandType, handler);
      }
    
      async dispatch(command: Command): Promise<void> {
        const handler = this.handlers.get(command.type);
    
        if (!handler) {
          throw new Error(`No handler for command: ${command.type}`);
        }
    
        await handler.handle(command);
      }
    }
    
    // Command handler
    class CreateOrderHandler implements CommandHandler {
      constructor(
        private eventStore: EventStore,
        private orderRepository: OrderRepository
      ) {}
    
      async handle(command: CreateOrderCommand): Promise<void> {
        const order = new OrderAggregate(uuid());
        order.create(command.payload.customerId, command.payload.items);
        await order.save(this.eventStore);
      }
    }
    
    // Query side (reads)
    interface Query {
      type: string;
      params: any;
    }
    
    class QueryBus {
      private handlers = new Map<string, QueryHandler>();
    
      register(queryType: string, handler: QueryHandler): void {
        this.handlers.set(queryType, handler);
      }
    
      async execute<T>(query: Query): Promise<T> {
        const handler = this.handlers.get(query.type);
    
        if (!handler) {
          throw new Error(`No handler for query: ${query.type}`);
        }
    
        return handler.handle(query);
      }
    }
    
    // Read model projection
    class OrderReadModel {
      async project(event: DomainEvent): Promise<void> {
        switch (event.eventType) {
          case 'OrderCreated':
            await db.orderView.create({
              data: {
                id: event.aggregateId,
                customerId: event.data.customerId,
                status: 'draft',
                itemCount: event.data.items.length,
                total: event.data.total,
                createdAt: event.timestamp,
              },
            });
            break;
    
          case 'OrderSubmitted':
            await db.orderView.update({
              where: { id: event.aggregateId },
              data: {
                status: 'submitted',
                submittedAt: event.data.submittedAt,
              },
            });
            break;
    
          case 'OrderShipped':
            await db.orderView.update({
              where: { id: event.aggregateId },
              data: {
                status: 'shipped',
                trackingNumber: event.data.trackingNumber,
              },
            });
            break;
        }
      }
    
      // Rebuild projection from events
      async rebuild(): Promise<void> {
        // Clear existing read model
        await db.orderView.deleteMany();
    
        // Replay all events
        const events = await eventStore.getAllEvents();
    
        for (const event of events) {
          await this.project(event);
        }
      }
    }
    

    3. Message Queues with RabbitMQ

    import amqp from 'amqplib';
    
    class RabbitMQBroker {
      private connection: amqp.Connection;
      private channel: amqp.Channel;
    
      async connect(): Promise<void> {
        this.connection = await amqp.connect(process.env.RABBITMQ_URL!);
        this.channel = await this.connection.createChannel();
    
        // Setup exchanges
        await this.channel.assertExchange('events', 'topic', { durable: true });
        await this.channel.assertExchange('commands', 'direct', { durable: true });
        await this.channel.assertExchange('dlx', 'fanout', { durable: true });
      }
    
      async publish(exchange: string, routingKey: string, message: any): Promise<void> {
        const content = Buffer.from(JSON.stringify(message));
    
        this.channel.publish(exchange, routingKey, content, {
          persistent: true,
          contentType: 'application/json',
          messageId: uuid(),
          timestamp: Date.now(),
        });
      }
    
      async subscribe(
        queue: string,
        exchange: string,
        routingKey: string,
        handler: (message: any) => Promise<void>
      ): Promise<void> {
        // Setup queue with dead letter exchange
        await this.channel.assertQueue(queue, {
          durable: true,
          deadLetterExchange: 'dlx',
          deadLetterRoutingKey: `${queue}.dlq`,
        });
    
        await this.channel.bindQueue(queue, exchange, routingKey);
    
        // Consume messages
        await this.channel.consume(queue, async (msg) => {
          if (!msg) return;
    
          try {
            const content = JSON.parse(msg.content.toString());
            await handler(content);
            this.channel.ack(msg);
          } catch (error) {
            console.error('Message processing failed:', error);
    
            // Retry or dead-letter
            const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
    
            if (retryCount < 3) {
              // Retry with exponential backoff
              setTimeout(() => {
                this.channel.publish(exchange, routingKey, msg.content, {
                  ...msg.properties,
                  headers: {
                    ...msg.properties.headers,
                    'x-retry-count': retryCount,
                  },
                });
                this.channel.ack(msg);
              }, Math.pow(2, retryCount) * 1000);
            } else {
              // Send to dead letter queue
              this.channel.reject(msg, false);
            }
          }
        });
      }
    }
    
    // Event publishing
    class EventPublisher {
      constructor(private broker: RabbitMQBroker) {}
    
      async publish(event: DomainEvent): Promise<void> {
        const routingKey = `${event.aggregateType}.${event.eventType}`;
        await this.broker.publish('events', routingKey, event);
      }
    }
    
    // Event consumer
    class OrderEventConsumer {
      constructor(
        private broker: RabbitMQBroker,
        private readModel: OrderReadModel
      ) {}
    
      async start(): Promise<void> {
        await this.broker.subscribe(
          'order-projector',
          'events',
          'Order.*',
          async (event) => {
            await this.readModel.project(event);
          }
        );
      }
    }
    

    4. Saga Pattern for Distributed Transactions

    // Saga orchestrator
    interface SagaStep {
      name: string;
      execute: (context: SagaContext) => Promise<void>;
      compensate: (context: SagaContext) => Promise<void>;
    }
    
    class SagaOrchestrator {
      private steps: SagaStep[] = [];
      private executedSteps: SagaStep[] = [];
    
      addStep(step: SagaStep): this {
        this.steps.push(step);
        return this;
      }
    
      async execute(context: SagaContext): Promise<void> {
        try {
          for (const step of this.steps) {
            console.log(`Executing step: ${step.name}`);
            await step.execute(context);
            this.executedSteps.push(step);
          }
        } catch (error) {
          console.error('Saga failed, compensating...', error);
          await this.compensate(context);
          throw error;
        }
      }
    
      private async compensate(context: SagaContext): Promise<void> {
        // Execute compensation in reverse order
        for (const step of this.executedSteps.reverse()) {
          try {
            console.log(`Compensating step: ${step.name}`);
            await step.compensate(context);
          } catch (error) {
            console.error(`Compensation failed for ${step.name}:`, error);
            // Log for manual intervention
            await this.logCompensationFailure(step, context, error);
          }
        }
      }
    }
    
    // Order saga example
    const createOrderSaga = new SagaOrchestrator()
      .addStep({
        name: 'Reserve Inventory',
        execute: async (ctx) => {
          const reservation = await inventoryService.reserve(ctx.items);
          ctx.reservationId = reservation.id;
        },
        compensate: async (ctx) => {
          if (ctx.reservationId) {
            await inventoryService.releaseReservation(ctx.reservationId);
          }
        },
      })
      .addStep({
        name: 'Process Payment',
        execute: async (ctx) => {
          const payment = await paymentService.charge(ctx.customerId, ctx.total);
          ctx.paymentId = payment.id;
        },
        compensate: async (ctx) => {
          if (ctx.paymentId) {
            await paymentService.refund(ctx.paymentId);
          }
        },
      })
      .addStep({
        name: 'Create Order',
        execute: async (ctx) => {
          const order = await orderService.create({
            customerId: ctx.customerId,
            items: ctx.items,
            paymentId: ctx.paymentId,
            reservationId: ctx.reservationId,
          });
          ctx.orderId = order.id;
        },
        compensate: async (ctx) => {
          if (ctx.orderId) {
            await orderService.cancel(ctx.orderId);
          }
        },
      })
      .addStep({
        name: 'Send Confirmation',
        execute: async (ctx) => {
          await notificationService.sendOrderConfirmation(ctx.orderId);
        },
        compensate: async (ctx) => {
          // No compensation needed for notifications
        },
      });
    
    // Execute saga
    async function handleCreateOrder(command: CreateOrderCommand): Promise<void> {
      const context: SagaContext = {
        customerId: command.customerId,
        items: command.items,
        total: calculateTotal(command.items),
      };
    
      await createOrderSaga.execute(context);
    }
    

    5. Kafka Streaming

    import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';
    
    class KafkaService {
      private kafka: Kafka;
      private producer: Producer;
      private consumers: Map<string, Consumer> = new Map();
    
      constructor() {
        this.kafka = new Kafka({
          clientId: process.env.SERVICE_NAME,
          brokers: (process.env.KAFKA_BROKERS || '').split(','),
        });
      }
    
      async connect(): Promise<void> {
        this.producer = this.kafka.producer({
          idempotent: true,
          maxInFlightRequests: 5,
        });
    
        await this.producer.connect();
      }
    
      async publish(topic: string, messages: KafkaMessage[]): Promise<void> {
        await this.producer.send({
          topic,
          messages: messages.map(m => ({
            key: m.key,
            value: JSON.stringify(m.value),
            headers: m.headers,
            partition: m.partition,
          })),
        });
      }
    
      async subscribe(
        groupId: string,
        topics: string[],
        handler: (payload: EachMessagePayload) => Promise<void>
      ): Promise<void> {
        const consumer = this.kafka.consumer({
          groupId,
          sessionTimeout: 30000,
          heartbeatInterval: 3000,
        });
    
        await consumer.connect();
        await consumer.subscribe({ topics, fromBeginning: false });
    
        await consumer.run({
          eachMessage: async (payload) => {
            try {
              await handler(payload);
            } catch (error) {
              console.error('Message processing failed:', error);
              // Implement retry/DLQ logic
            }
          },
        });
    
        this.consumers.set(groupId, consumer);
      }
    
      async disconnect(): Promise<void> {
        await this.producer.disconnect();
        for (const consumer of this.consumers.values()) {
          await consumer.disconnect();
        }
      }
    }
    
    // Stream processing
    class OrderStreamProcessor {
      constructor(private kafka: KafkaService) {}
    
      async start(): Promise<void> {
        await this.kafka.subscribe(
          'order-processor',
          ['order-events'],
          async ({ topic, partition, message }) => {
            const event = JSON.parse(message.value?.toString() || '{}');
    
            switch (event.type) {
              case 'OrderCreated':
                await this.handleOrderCreated(event);
                break;
              case 'OrderCompleted':
                await this.handleOrderCompleted(event);
                break;
            }
          }
        );
      }
    
      private async handleOrderCreated(event: any): Promise<void> {
        // Update analytics
        await analyticsService.recordOrder(event.data);
    
        // Trigger downstream processes
        await this.kafka.publish('inventory-commands', [{
          key: event.aggregateId,
          value: {
            type: 'ReserveInventory',
            orderId: event.aggregateId,
            items: event.data.items,
          },
        }]);
      }
    }
    

    Use Cases

    1. Order Processing System

    // Complete order workflow
    async function processOrder(orderId: string): Promise<void> {
      const saga = new SagaOrchestrator()
        .addStep(reserveInventoryStep)
        .addStep(processPaymentStep)
        .addStep(createShipmentStep)
        .addStep(sendNotificationStep);
    
      await saga.execute({ orderId });
    }
    

    2. Real-time Analytics

    // Stream aggregation
    const orderTotalsStream = kafka.subscribe(
      'analytics-aggregator',
      ['order-events'],
      async (event) => {
        await updateDailySales(event.data.total);
        await updateProductMetrics(event.data.items);
      }
    );
    

    Best Practices

    Do's

    • Design events as facts - Immutable, past-tense naming
    • Implement idempotent handlers - Handle duplicates gracefully
    • Plan for event versioning - Schema evolution
    • Use dead letter queues - Handle failures
    • Monitor queue depths - Alert on backlogs
    • Test with chaos - Simulate failures

    Don'ts

    • Don't couple services through shared databases
    • Don't ignore message ordering requirements
    • Don't skip compensation logic
    • Don't forget about exactly-once semantics
    • Don't over-engineer for simple use cases
    • Don't ignore backpressure

    Related Skills

    • redis - Pub/sub and caching
    • real-time-systems - WebSocket integration
    • backend-development - Service architecture

    Reference Resources

    • Event Sourcing Pattern
    • CQRS Pattern
    • RabbitMQ Documentation
    • Apache Kafka Documentation
    Recommended Servers
    Google Calendar
    Google Calendar
    Outlook
    Outlook
    Vercel Grep
    Vercel Grep
    Repository
    doanchienthangdev/omgkit
    Files