Message Queues & Event Systems - 1/2

The 3 AM Phone Call That Changes Everything

You’re sleeping peacefully when your phone buzzes with a critical alert: “Order processing down, revenue dropping by the minute.” You stumble to your laptop to find:

  • 50,000 orders stuck in pending state
  • Payment confirmations missing while charges went through
  • Inventory reservations expired, creating overselling
  • Customer service drowning in “Where’s my order?” tickets
  • Your message queue is “healthy” but no messages are processing

As you dig deeper, you discover the horror: your carefully designed async system has a few tiny flaws that just created a $2 million problem. The payment processor went down for 30 seconds three hours ago. Your retry logic gave up. Your dead letter queue is full. Your monitoring missed the cascading failures.

Welcome to the harsh reality of production message systems.

The Uncomfortable Truth About Async Systems

Here’s what the tutorials don’t tell you: Building async systems is the easy part. Making them bulletproof in production is where most developers fail spectacularly.

Your beautiful event-driven architecture becomes a nightmare when:

  • Messages get lost in network failures
  • Services crash mid-processing
  • External APIs have intermittent failures
  • Database deadlocks occur during high load
  • Memory leaks cause gradual slowdowns

The difference between systems that survive chaos and systems that crumble lies in one critical principle: Expecting failure and designing recovery strategies from day one.

Ready to build async systems that actually work when things go wrong? Let’s dive into the operational reality of production message systems.


Asynchronous Processing Patterns: Beyond Fire and Forget

The Worker Pool Pattern

Instead of processing messages sequentially, distribute work across multiple workers for parallel processing.

// Naive single-threaded processing - bottleneck waiting to happen
class SingleThreadOrderProcessor {
  private isProcessing = false;

  async processOrders(): Promise<void> {
    if (this.isProcessing) return; // Skip if already processing

    this.isProcessing = true;

    while (true) {
      const message = await this.orderQueue.receive();
      if (!message) break;

      // Process one order at a time (slow!)
      await this.processOrder(message.data);
      await this.orderQueue.acknowledge(message);
    }

    this.isProcessing = false;
  }
}

// Scalable worker pool pattern
class WorkerPoolOrderProcessor {
  private workers: Worker[] = [];
  private workerCount: number;
  private isRunning = false;

  constructor(workerCount = os.cpus().length) {
    this.workerCount = workerCount;
  }

  async start(): Promise<void> {
    this.isRunning = true;

    // Start multiple workers
    for (let i = 0; i < this.workerCount; i++) {
      const worker = new Worker(i);
      this.workers.push(worker);
      this.startWorker(worker);
    }
  }

  private async startWorker(worker: Worker): Promise<void> {
    while (this.isRunning) {
      try {
        const message = await this.orderQueue.receive({
          timeout: 5000, // Don't block forever
          workerId: worker.id,
        });

        if (!message) continue;

        // Process with timeout and error handling
        await this.processWithTimeout(worker, message);
      } catch (error) {
        console.error(`Worker ${worker.id} error:`, error);
        await this.handleWorkerError(worker, error);
      }
    }
  }

  private async processWithTimeout(
    worker: Worker,
    message: QueueMessage
  ): Promise<void> {
    const timeoutPromise = new Promise((_, reject) =>
      setTimeout(() => reject(new Error("Processing timeout")), 30000)
    );

    try {
      await Promise.race([
        this.processOrder(worker, message.data),
        timeoutPromise,
      ]);

      // Success - acknowledge the message
      await this.orderQueue.acknowledge(message);

      this.updateWorkerMetrics(worker, "success");
    } catch (error) {
      // Failure - handle retry or dead letter
      await this.handleProcessingFailure(message, error);

      this.updateWorkerMetrics(worker, "failure");
    }
  }

  private async processOrder(worker: Worker, orderData: any): Promise<void> {
    worker.currentOrder = orderData.orderId;
    worker.startTime = Date.now();

    // Your actual order processing logic here
    await this.validateOrder(orderData);
    await this.chargePayment(orderData);
    await this.reserveInventory(orderData);
    await this.createShipment(orderData);

    worker.currentOrder = null;
    worker.completedOrders++;
  }

  // Graceful shutdown with in-flight message completion
  async shutdown(): Promise<void> {
    console.log("Initiating graceful shutdown...");
    this.isRunning = false;

    // Wait for workers to complete current messages
    const shutdownPromises = this.workers.map(async (worker) => {
      if (worker.currentOrder) {
        console.log(
          `Waiting for worker ${worker.id} to finish order ${worker.currentOrder}`
        );

        // Wait up to 60 seconds for completion
        const startTime = Date.now();
        while (worker.currentOrder && Date.now() - startTime < 60000) {
          await new Promise((resolve) => setTimeout(resolve, 1000));
        }
      }
    });

    await Promise.all(shutdownPromises);
    console.log("All workers shut down gracefully");
  }
}

The Circuit Breaker Pattern for External Dependencies

Prevent cascading failures when external services become unavailable.

// Circuit breaker for payment processing
class PaymentCircuitBreaker {
  private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";
  private failureCount = 0;
  private lastFailureTime: number = 0;
  private readonly failureThreshold: number;
  private readonly recoveryTimeout: number;
  private readonly monitoringWindow: number;

  constructor(
    failureThreshold = 5, // Open after 5 failures
    recoveryTimeout = 60000, // Try again after 1 minute
    monitoringWindow = 300000 // Reset count every 5 minutes
  ) {
    this.failureThreshold = failureThreshold;
    this.recoveryTimeout = recoveryTimeout;
    this.monitoringWindow = monitoringWindow;
  }

  async executePayment<T>(paymentOperation: () => Promise<T>): Promise<T> {
    if (this.state === "OPEN") {
      if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
        this.state = "HALF_OPEN";
      } else {
        throw new Error(
          "Circuit breaker is OPEN - payment service unavailable"
        );
      }
    }

    try {
      const result = await paymentOperation();

      // Success - reset if we were in HALF_OPEN
      if (this.state === "HALF_OPEN") {
        this.state = "CLOSED";
        this.failureCount = 0;
      }

      return result;
    } catch (error) {
      this.recordFailure();
      throw error;
    }
  }

  private recordFailure(): void {
    this.failureCount++;
    this.lastFailureTime = Date.now();

    // Reset failure count if monitoring window has passed
    if (Date.now() - this.lastFailureTime > this.monitoringWindow) {
      this.failureCount = 1;
    }

    // Open circuit if threshold reached
    if (this.failureCount >= this.failureThreshold) {
      this.state = "OPEN";
    }
  }

  getState(): { state: string; failureCount: number; lastFailureTime: number } {
    return {
      state: this.state,
      failureCount: this.failureCount,
      lastFailureTime: this.lastFailureTime,
    };
  }
}

// Usage in order processing
class ResilientOrderProcessor {
  private paymentBreaker = new PaymentCircuitBreaker();
  private inventoryBreaker = new PaymentCircuitBreaker();
  private shippingBreaker = new PaymentCircuitBreaker();

  async processOrder(orderData: OrderData): Promise<void> {
    const orderId = orderData.orderId;

    try {
      // Payment with circuit breaker
      const payment = await this.paymentBreaker.executePayment(async () => {
        return await this.paymentService.charge(orderData.payment);
      });

      // Inventory with circuit breaker and fallback
      let inventoryReserved = false;
      try {
        await this.inventoryBreaker.executePayment(async () => {
          await this.inventoryService.reserve(orderData.items);
          inventoryReserved = true;
        });
      } catch (error) {
        // Fallback: Allow overselling with manual review
        await this.scheduleInventoryReview(orderId, orderData.items);
        inventoryReserved = true; // Continue processing
      }

      // Shipping with circuit breaker
      await this.shippingBreaker.executePayment(async () => {
        return await this.shippingService.createShipment(orderData);
      });

      // Success - publish completion event
      await this.eventBus.publish("order.completed", {
        orderId,
        paymentId: payment.id,
        inventoryReserved,
        timestamp: new Date().toISOString(),
      });
    } catch (error) {
      // Handle partial failures
      await this.handlePartialFailure(orderId, error);
    }
  }

  private async handlePartialFailure(
    orderId: string,
    error: Error
  ): Promise<void> {
    // Implement compensation patterns
    if (error.message.includes("payment")) {
      // Payment failed - no compensation needed
      await this.eventBus.publish("order.payment_failed", {
        orderId,
        error: error.message,
      });
    } else if (error.message.includes("inventory")) {
      // Inventory failed after payment - refund
      await this.eventBus.publish("order.refund_required", {
        orderId,
        reason: "inventory_unavailable",
      });
    } else if (error.message.includes("shipping")) {
      // Shipping failed - retry later or manual intervention
      await this.eventBus.publish("order.shipping_failed", {
        orderId,
        requiresManualShipping: true,
      });
    }
  }
}

Job Queues and Background Tasks: The Engine Behind User Experience

Priority Queue Implementation

Not all tasks are created equal. Critical operations should jump the line.

// Priority-based job processing
enum JobPriority {
  CRITICAL = 0, // Payment failures, security alerts
  HIGH = 1, // Order processing, customer requests
  MEDIUM = 2, // Email notifications, report generation
  LOW = 3, // Data cleanup, analytics processing
}

interface Job {
  id: string;
  type: string;
  priority: JobPriority;
  data: any;
  attempts: number;
  maxAttempts: number;
  createdAt: Date;
  scheduledFor?: Date;
  timeout?: number;
}

class PriorityJobQueue {
  private queues: Map<JobPriority, Job[]> = new Map();
  private processing: Map<string, Job> = new Map();
  private failed: Map<string, Job> = new Map();

  constructor() {
    // Initialize priority queues
    Object.values(JobPriority)
      .filter((p) => typeof p === "number")
      .forEach((priority) => {
        this.queues.set(priority as JobPriority, []);
      });
  }

  // Add job with priority
  async addJob(
    job: Omit<Job, "id" | "attempts" | "createdAt">
  ): Promise<string> {
    const fullJob: Job = {
      ...job,
      id: uuidv4(),
      attempts: 0,
      createdAt: new Date(),
    };

    const queue = this.queues.get(job.priority);
    if (!queue) {
      throw new Error(`Invalid priority: ${job.priority}`);
    }

    // Insert maintaining time-based order within priority level
    const insertIndex = queue.findIndex(
      (existingJob) => existingJob.createdAt > fullJob.createdAt
    );

    if (insertIndex === -1) {
      queue.push(fullJob);
    } else {
      queue.splice(insertIndex, 0, fullJob);
    }

    return fullJob.id;
  }

  // Get next job respecting priority
  async getNextJob(): Promise<Job | null> {
    // Check each priority level from highest to lowest
    for (const priority of [
      JobPriority.CRITICAL,
      JobPriority.HIGH,
      JobPriority.MEDIUM,
      JobPriority.LOW,
    ]) {
      const queue = this.queues.get(priority);
      if (!queue || queue.length === 0) continue;

      // Find first job that's ready to run
      const jobIndex = queue.findIndex(
        (job) => !job.scheduledFor || job.scheduledFor <= new Date()
      );

      if (jobIndex !== -1) {
        const job = queue.splice(jobIndex, 1)[0];
        job.attempts++;
        this.processing.set(job.id, job);
        return job;
      }
    }

    return null;
  }

  // Mark job as completed
  async completeJob(jobId: string): Promise<void> {
    this.processing.delete(jobId);
  }

  // Handle job failure with retry logic
  async failJob(jobId: string, error: Error): Promise<void> {
    const job = this.processing.get(jobId);
    if (!job) return;

    this.processing.delete(jobId);

    if (job.attempts < job.maxAttempts) {
      // Exponential backoff retry
      const backoffMs = Math.min(1000 * Math.pow(2, job.attempts), 300000); // Max 5 minutes
      job.scheduledFor = new Date(Date.now() + backoffMs);

      // Re-queue for retry
      const queue = this.queues.get(job.priority);
      queue?.push(job);
    } else {
      // Max attempts reached - send to dead letter queue
      this.failed.set(jobId, job);
      await this.handleDeadLetter(job, error);
    }
  }

  private async handleDeadLetter(job: Job, error: Error): Promise<void> {
    console.error(`Job ${job.id} failed permanently:`, error);

    // Send to monitoring/alerting system
    await this.alertingService.sendAlert({
      type: "job_failed_permanently",
      jobId: job.id,
      jobType: job.type,
      error: error.message,
      attempts: job.attempts,
    });

    // Store for manual review
    await this.deadLetterStore.save({
      ...job,
      failedAt: new Date(),
      error: error.message,
    });
  }
}

// Job processors with different strategies
class JobProcessorRegistry {
  private processors: Map<string, JobProcessor> = new Map();

  register(jobType: string, processor: JobProcessor): void {
    this.processors.set(jobType, processor);
  }

  async processJob(job: Job): Promise<void> {
    const processor = this.processors.get(job.type);
    if (!processor) {
      throw new Error(`No processor registered for job type: ${job.type}`);
    }

    const timeoutPromise = job.timeout
      ? new Promise<never>((_, reject) =>
          setTimeout(() => reject(new Error("Job timeout")), job.timeout)
        )
      : new Promise<never>(() => {}); // Never resolves if no timeout

    await Promise.race([processor.process(job.data), timeoutPromise]);
  }
}

// Example job processors
class EmailJobProcessor implements JobProcessor {
  async process(data: any): Promise<void> {
    const { recipient, subject, template, templateData } = data;

    const emailContent = await this.templateEngine.render(
      template,
      templateData
    );

    await this.emailService.send({
      to: recipient,
      subject,
      html: emailContent,
    });
  }
}

class PaymentRetryProcessor implements JobProcessor {
  async process(data: any): Promise<void> {
    const { orderId, paymentMethodId, amount } = data;

    // Retry failed payment
    const payment = await this.paymentService.retry(paymentMethodId, amount);

    if (payment.status === "succeeded") {
      await this.eventBus.publish("payment.retry.succeeded", {
        orderId,
        paymentId: payment.id,
      });
    } else {
      throw new Error(`Payment retry failed: ${payment.failureReason}`);
    }
  }
}

Scheduled and Recurring Jobs

Handle cron-like functionality with database-backed scheduling.

// Scheduled job system
class JobScheduler {
  private scheduledJobs: Map<string, ScheduledJob> = new Map();
  private isRunning = false;

  async start(): Promise<void> {
    this.isRunning = true;
    await this.loadScheduledJobs();
    this.startSchedulerLoop();
  }

  // Add recurring job (cron-like)
  async scheduleRecurring(
    name: string,
    cronExpression: string,
    jobType: string,
    jobData: any,
    options: ScheduleOptions = {}
  ): Promise<void> {
    const scheduledJob: ScheduledJob = {
      id: uuidv4(),
      name,
      cronExpression,
      jobType,
      jobData,
      nextRun: this.calculateNextRun(cronExpression),
      isActive: true,
      ...options,
    };

    this.scheduledJobs.set(name, scheduledJob);
    await this.persistScheduledJob(scheduledJob);
  }

  // Add one-time delayed job
  async scheduleDelayed(
    jobType: string,
    jobData: any,
    delayMs: number,
    options: ScheduleOptions = {}
  ): Promise<string> {
    const runAt = new Date(Date.now() + delayMs);
    const jobId = uuidv4();

    const scheduledJob: ScheduledJob = {
      id: jobId,
      name: `delayed-${jobId}`,
      jobType,
      jobData,
      nextRun: runAt,
      isActive: true,
      isOneTime: true,
      ...options,
    };

    this.scheduledJobs.set(jobId, scheduledJob);
    await this.persistScheduledJob(scheduledJob);

    return jobId;
  }

  private async startSchedulerLoop(): Promise<void> {
    while (this.isRunning) {
      try {
        await this.processScheduledJobs();
        await new Promise((resolve) => setTimeout(resolve, 60000)); // Check every minute
      } catch (error) {
        console.error("Scheduler loop error:", error);
      }
    }
  }

  private async processScheduledJobs(): Promise<void> {
    const now = new Date();

    for (const [name, scheduledJob] of this.scheduledJobs) {
      if (!scheduledJob.isActive || scheduledJob.nextRun > now) {
        continue;
      }

      try {
        // Queue the job for execution
        await this.jobQueue.addJob({
          type: scheduledJob.jobType,
          priority: scheduledJob.priority || JobPriority.MEDIUM,
          data: scheduledJob.jobData,
          maxAttempts: scheduledJob.maxAttempts || 3,
          timeout: scheduledJob.timeout,
        });

        // Update next run time
        if (scheduledJob.isOneTime) {
          // Remove one-time jobs after execution
          this.scheduledJobs.delete(name);
          await this.removeScheduledJob(scheduledJob.id);
        } else {
          // Calculate next run for recurring jobs
          scheduledJob.nextRun = this.calculateNextRun(
            scheduledJob.cronExpression
          );
          await this.persistScheduledJob(scheduledJob);
        }
      } catch (error) {
        console.error(`Failed to queue scheduled job ${name}:`, error);
      }
    }
  }

  // Example scheduled job types
  async setupCommonJobs(): Promise<void> {
    // Daily cleanup of old logs
    await this.scheduleRecurring(
      "daily-log-cleanup",
      "0 2 * * *", // 2 AM daily
      "cleanup-logs",
      { olderThanDays: 30 }
    );

    // Weekly user engagement emails
    await this.scheduleRecurring(
      "weekly-engagement-emails",
      "0 9 * * MON", // 9 AM every Monday
      "send-engagement-emails",
      { segmentType: "inactive_users" }
    );

    // Monthly billing cycle
    await this.scheduleRecurring(
      "monthly-billing",
      "0 0 1 * *", // First day of every month at midnight
      "process-monthly-billing",
      {}
    );

    // Hourly health checks
    await this.scheduleRecurring(
      "health-check-alerts",
      "0 * * * *", // Every hour
      "system-health-check",
      { alertThreshold: 95 }
    );
  }
}

Message Delivery Guarantees: The Promise You Must Keep

At-Most-Once vs. At-Least-Once vs. Exactly-Once

Understanding delivery semantics is crucial for building reliable systems.

// At-most-once delivery - fire and forget
class AtMostOnceProcessor {
  async processMessage(message: Message): Promise<void> {
    // Acknowledge first, then process
    await this.messageQueue.acknowledge(message);

    try {
      await this.processPayment(message.data);
    } catch (error) {
      // Message is lost if processing fails!
      console.error(
        "Processing failed, but message already acknowledged:",
        error
      );
    }
  }
}

// At-least-once delivery - guarantee processing but risk duplicates
class AtLeastOnceProcessor {
  async processMessage(message: Message): Promise<void> {
    try {
      // Process first
      await this.processPayment(message.data);

      // Acknowledge only after successful processing
      await this.messageQueue.acknowledge(message);
    } catch (error) {
      // Message will be redelivered (potential duplicate processing)
      console.error("Processing failed, message will retry:", error);
      throw error;
    }
  }
}

// Exactly-once delivery - the holy grail (complex but bulletproof)
class ExactlyOnceProcessor {
  private processedMessages: Set<string> = new Set();
  private processingLock: Map<string, Promise<void>> = new Map();

  async processMessage(message: Message): Promise<void> {
    const messageId = message.id;

    // Check if already processed (idempotency)
    if (await this.isAlreadyProcessed(messageId)) {
      await this.messageQueue.acknowledge(message);
      return;
    }

    // Check if currently being processed by another worker
    const existingProcess = this.processingLock.get(messageId);
    if (existingProcess) {
      await existingProcess;
      return;
    }

    // Create processing lock
    const processingPromise = this.doExactlyOnceProcessing(message);
    this.processingLock.set(messageId, processingPromise);

    try {
      await processingPromise;
    } finally {
      this.processingLock.delete(messageId);
    }
  }

  private async doExactlyOnceProcessing(message: Message): Promise<void> {
    const messageId = message.id;

    // Use database transaction for atomicity
    await this.database.transaction(async (tx) => {
      // Double-check within transaction
      const alreadyProcessed = await tx.query(
        "SELECT 1 FROM processed_messages WHERE message_id = $1",
        [messageId]
      );

      if (alreadyProcessed.length > 0) {
        return; // Already processed
      }

      // Process the message
      await this.processPayment(message.data, tx);

      // Mark as processed atomically
      await tx.query(
        "INSERT INTO processed_messages (message_id, processed_at) VALUES ($1, $2)",
        [messageId, new Date()]
      );
    });

    // Acknowledge only after successful transaction
    await this.messageQueue.acknowledge(message);
  }

  private async isAlreadyProcessed(messageId: string): Promise<boolean> {
    if (this.processedMessages.has(messageId)) {
      return true;
    }

    const result = await this.database.query(
      "SELECT 1 FROM processed_messages WHERE message_id = $1",
      [messageId]
    );

    const processed = result.length > 0;
    if (processed) {
      this.processedMessages.add(messageId);
    }

    return processed;
  }
}

Implementing Idempotent Operations

Make operations safe to retry without side effects.

// Non-idempotent operation (dangerous)
class BadPaymentProcessor {
  async processRefund(orderId: string, amount: number): Promise<void> {
    // This creates problems if called multiple times!
    const currentBalance = await this.getCustomerBalance(orderId);
    const newBalance = currentBalance + amount;

    await this.updateCustomerBalance(orderId, newBalance);
    await this.createRefundTransaction(orderId, amount);
  }
}

// Idempotent operation (safe)
class SafePaymentProcessor {
  async processRefund(
    orderId: string,
    amount: number,
    idempotencyKey: string
  ): Promise<RefundResult> {
    // Check if refund already processed
    const existingRefund = await this.getRefundByKey(idempotencyKey);
    if (existingRefund) {
      return existingRefund; // Return same result, no side effects
    }

    // Process refund atomically
    return await this.database.transaction(async (tx) => {
      // Lock the order to prevent concurrent processing
      const order = await tx.query(
        "SELECT * FROM orders WHERE id = $1 FOR UPDATE",
        [orderId]
      );

      if (order.refund_status === "refunded") {
        throw new Error("Order already refunded");
      }

      // Create refund record with idempotency key
      const refund = await tx.query(
        `
        INSERT INTO refunds (order_id, amount, idempotency_key, status, created_at)
        VALUES ($1, $2, $3, 'completed', NOW())
        RETURNING *
      `,
        [orderId, amount, idempotencyKey]
      );

      // Update order status
      await tx.query("UPDATE orders SET refund_status = $1 WHERE id = $2", [
        "refunded",
        orderId,
      ]);

      // Update customer balance
      await tx.query(
        `
        UPDATE customer_balances 
        SET balance = balance + $1 
        WHERE customer_id = (SELECT customer_id FROM orders WHERE id = $2)
      `,
        [amount, orderId]
      );

      return refund[0];
    });
  }

  private async getRefundByKey(
    idempotencyKey: string
  ): Promise<RefundResult | null> {
    const result = await this.database.query(
      "SELECT * FROM refunds WHERE idempotency_key = $1",
      [idempotencyKey]
    );

    return result.length > 0 ? result[0] : null;
  }
}

// Usage with message processing
class IdempotentMessageProcessor {
  async processMessage(message: Message): Promise<void> {
    // Use message ID as idempotency key
    const idempotencyKey = `msg-${message.id}`;

    switch (message.type) {
      case "PROCESS_REFUND":
        await this.paymentProcessor.processRefund(
          message.data.orderId,
          message.data.amount,
          idempotencyKey
        );
        break;

      case "SEND_EMAIL":
        await this.emailService.sendEmail(
          message.data.recipient,
          message.data.template,
          message.data.data,
          idempotencyKey
        );
        break;
    }
  }
}

Handling Failures and Dead Letters: When Things Go Wrong

Comprehensive Error Handling Strategy

// Dead letter queue implementation
class DeadLetterQueueManager {
  private dlqStorage: DeadLetterStorage;
  private alerting: AlertingService;
  private metrics: MetricsService;

  async handleFailedMessage(
    originalMessage: Message,
    error: Error,
    attempts: number,
    context: ProcessingContext
  ): Promise<void> {
    const deadLetter: DeadLetter = {
      id: uuidv4(),
      originalMessageId: originalMessage.id,
      originalQueue: context.queueName,
      message: originalMessage,
      error: {
        message: error.message,
        stack: error.stack,
        code: (error as any).code,
      },
      attempts,
      firstFailedAt: context.firstFailedAt,
      lastFailedAt: new Date(),
      processingContext: {
        workerId: context.workerId,
        hostname: os.hostname(),
        version: process.env.APP_VERSION,
      },
    };

    // Store in dead letter queue
    await this.dlqStorage.store(deadLetter);

    // Categorize error for appropriate handling
    await this.categorizeAndHandle(deadLetter);

    // Update metrics
    this.metrics.increment("dlq.messages.total");
    this.metrics.increment(`dlq.messages.${this.getErrorCategory(error)}`);
  }

  private async categorizeAndHandle(deadLetter: DeadLetter): Promise<void> {
    const errorCategory = this.getErrorCategory(deadLetter.error);

    switch (errorCategory) {
      case "transient":
        // Temporary issues - retry with exponential backoff
        await this.scheduleRetry(deadLetter);
        break;

      case "configuration":
        // Config issues - alert ops team immediately
        await this.alerting.sendCriticalAlert({
          type: "configuration_error",
          message: "Message failed due to configuration issue",
          deadLetter,
          requiresImmediate: true,
        });
        break;

      case "data_corruption":
        // Bad data - needs manual investigation
        await this.flagForManualReview(deadLetter);
        break;

      case "business_logic":
        // Business rule violations - might need code changes
        await this.alerting.sendAlert({
          type: "business_logic_error",
          deadLetter,
          suggestedAction: "Review business rules and message format",
        });
        break;

      case "external_service":
        // External service issues - monitor and retry
        await this.handleExternalServiceError(deadLetter);
        break;

      default:
        await this.flagForManualReview(deadLetter);
    }
  }

  private getErrorCategory(error: any): string {
    const message = error.message.toLowerCase();

    if (message.includes("timeout") || message.includes("network")) {
      return "transient";
    } else if (message.includes("config") || message.includes("environment")) {
      return "configuration";
    } else if (message.includes("json") || message.includes("parse")) {
      return "data_corruption";
    } else if (message.includes("validation") || message.includes("business")) {
      return "business_logic";
    } else if (message.includes("external") || message.includes("api")) {
      return "external_service";
    }

    return "unknown";
  }

  // Retry logic with intelligent backoff
  private async scheduleRetry(deadLetter: DeadLetter): Promise<void> {
    const retryAttempt = await this.getRetryAttempt(deadLetter.id);

    if (retryAttempt > 5) {
      // Too many retries - give up and alert
      await this.flagForManualReview(deadLetter);
      return;
    }

    // Exponential backoff with jitter
    const baseDelayMs = 1000 * Math.pow(2, retryAttempt);
    const jitterMs = Math.random() * baseDelayMs * 0.1;
    const delayMs = baseDelayMs + jitterMs;

    await this.scheduler.scheduleDelayed(
      "retry-dead-letter",
      { deadLetterId: deadLetter.id },
      delayMs
    );

    await this.updateRetryAttempt(deadLetter.id, retryAttempt + 1);
  }

  // Manual review queue for complex failures
  private async flagForManualReview(deadLetter: DeadLetter): Promise<void> {
    await this.manualReviewQueue.add({
      id: deadLetter.id,
      priority: this.getReviewPriority(deadLetter),
      reviewReason: "Failed multiple automatic retry attempts",
      suggestedActions: this.getSuggestedActions(deadLetter),
    });

    // Alert engineering team for critical messages
    if (this.isCriticalMessage(deadLetter)) {
      await this.alerting.sendCriticalAlert({
        type: "critical_message_failed",
        deadLetter,
        message: "Critical message requires immediate manual review",
      });
    }
  }

  // Recovery operations
  async reprocessDeadLetter(deadLetterId: string): Promise<void> {
    const deadLetter = await this.dlqStorage.get(deadLetterId);
    if (!deadLetter) {
      throw new Error(`Dead letter ${deadLetterId} not found`);
    }

    try {
      // Attempt reprocessing
      await this.originalQueue.requeue(deadLetter.message);

      // Mark as recovered
      await this.dlqStorage.markRecovered(deadLetterId);

      this.metrics.increment("dlq.recovered.success");
    } catch (error) {
      this.metrics.increment("dlq.recovered.failed");
      throw error;
    }
  }

  // Bulk operations for ops teams
  async reprocessCategory(
    category: string,
    limit: number = 100
  ): Promise<void> {
    const deadLetters = await this.dlqStorage.getByCategory(category, limit);

    for (const deadLetter of deadLetters) {
      try {
        await this.reprocessDeadLetter(deadLetter.id);
      } catch (error) {
        console.error(`Failed to reprocess ${deadLetter.id}:`, error);
      }
    }
  }
}

Monitoring and Alerting for Message Systems

// Comprehensive monitoring for async systems
class MessageSystemMonitor {
  private metrics: MetricsCollector;
  private alerting: AlertingService;
  private healthThresholds: HealthThresholds;

  constructor() {
    this.healthThresholds = {
      queueDepth: { warning: 1000, critical: 5000 },
      processingTime: { warning: 30000, critical: 120000 },
      errorRate: { warning: 0.05, critical: 0.15 },
      dlqGrowthRate: { warning: 10, critical: 50 },
    };
  }

  async startMonitoring(): Promise<void> {
    // Monitor queue depths
    setInterval(() => this.checkQueueDepths(), 30000); // Every 30 seconds

    // Monitor processing times
    setInterval(() => this.checkProcessingTimes(), 60000); // Every minute

    // Monitor error rates
    setInterval(() => this.checkErrorRates(), 120000); // Every 2 minutes

    // Monitor dead letter queue growth
    setInterval(() => this.checkDLQGrowth(), 300000); // Every 5 minutes

    // System health checks
    setInterval(() => this.performHealthCheck(), 60000); // Every minute
  }

  private async checkQueueDepths(): Promise<void> {
    const queues = [
      "order-processing",
      "payment-processing",
      "email-notifications",
    ];

    for (const queueName of queues) {
      const depth = await this.messageQueue.getQueueDepth(queueName);

      this.metrics.gauge(`queue.depth.${queueName}`, depth);

      if (depth >= this.healthThresholds.queueDepth.critical) {
        await this.alerting.sendCriticalAlert({
          type: "queue_depth_critical",
          queue: queueName,
          depth,
          message: `Queue ${queueName} has ${depth} messages (critical threshold: ${this.healthThresholds.queueDepth.critical})`,
        });
      } else if (depth >= this.healthThresholds.queueDepth.warning) {
        await this.alerting.sendWarning({
          type: "queue_depth_warning",
          queue: queueName,
          depth,
        });
      }
    }
  }

  private async checkProcessingTimes(): Promise<void> {
    const processingTimes = await this.metrics.getAverageProcessingTime();

    for (const [jobType, avgTime] of processingTimes) {
      this.metrics.gauge(`processing.time.avg.${jobType}`, avgTime);

      if (avgTime >= this.healthThresholds.processingTime.critical) {
        await this.alerting.sendAlert({
          type: "slow_processing",
          jobType,
          averageTime: avgTime,
          threshold: this.healthThresholds.processingTime.critical,
        });
      }
    }
  }

  async generateHealthReport(): Promise<HealthReport> {
    return {
      timestamp: new Date(),
      queueDepths: await this.getAllQueueDepths(),
      processingRates: await this.getProcessingRates(),
      errorRates: await this.getErrorRates(),
      deadLetterCounts: await this.getDLQCounts(),
      workerStatuses: await this.getWorkerStatuses(),
      systemHealth: await this.calculateOverallHealth(),
    };
  }

  private async calculateOverallHealth(): Promise<HealthStatus> {
    const checks = [
      await this.checkQueuesHealthy(),
      await this.checkProcessingHealthy(),
      await this.checkErrorRatesHealthy(),
      await this.checkWorkersHealthy(),
    ];

    const healthyChecks = checks.filter(
      (check) => check.status === "healthy"
    ).length;
    const healthPercentage = (healthyChecks / checks.length) * 100;

    if (healthPercentage >= 90) {
      return { status: "healthy", percentage: healthPercentage };
    } else if (healthPercentage >= 70) {
      return { status: "degraded", percentage: healthPercentage };
    } else {
      return { status: "unhealthy", percentage: healthPercentage };
    }
  }
}

Key Takeaways

Building production-ready async systems requires more than just queueing messages. You need bulletproof error handling, intelligent retry strategies, comprehensive monitoring, and graceful failure recovery.

Essential patterns for success:

  • Worker pools for parallel processing and scalability
  • Circuit breakers to prevent cascading failures
  • Priority queues to handle critical operations first
  • Idempotent operations to make retries safe
  • Dead letter queues with intelligent categorization
  • Comprehensive monitoring to catch issues before they become disasters

The operational reality checklist:

  • ✅ Messages can be safely retried without side effects
  • ✅ Failed messages are categorized and handled appropriately
  • ✅ System degrades gracefully when dependencies fail
  • ✅ Monitoring alerts you before users are affected
  • ✅ Recovery procedures are documented and tested
  • ✅ Performance scales with load patterns

What’s Next?

You’ve now mastered the complete message queue and event system lifecycle. Next, we’ll explore Caching & Performance Optimization, where we’ll learn how to make your async systems blazingly fast while handling millions of operations.

But first, implement these patterns in your systems. The real test isn’t building async processing—it’s building async processing that works reliably at 3 AM when everything else is going wrong.

Your future self (and your on-call rotation) will thank you.