Message Queues & Event Systems - 1/2
The 3 AM Phone Call That Changes Everything
You’re sleeping peacefully when your phone buzzes with a critical alert: “Order processing down, revenue dropping by the minute.” You stumble to your laptop to find:
- 50,000 orders stuck in pending state
- Payment confirmations missing while charges went through
- Inventory reservations expired, creating overselling
- Customer service drowning in “Where’s my order?” tickets
- Your message queue is “healthy” but no messages are processing
As you dig deeper, you discover the horror: your carefully designed async system has a few tiny flaws that just created a $2 million problem. The payment processor went down for 30 seconds three hours ago. Your retry logic gave up. Your dead letter queue is full. Your monitoring missed the cascading failures.
Welcome to the harsh reality of production message systems.
The Uncomfortable Truth About Async Systems
Here’s what the tutorials don’t tell you: Building async systems is the easy part. Making them bulletproof in production is where most developers fail spectacularly.
Your beautiful event-driven architecture becomes a nightmare when:
- Messages get lost in network failures
- Services crash mid-processing
- External APIs have intermittent failures
- Database deadlocks occur during high load
- Memory leaks cause gradual slowdowns
The difference between systems that survive chaos and systems that crumble lies in one critical principle: Expecting failure and designing recovery strategies from day one.
Ready to build async systems that actually work when things go wrong? Let’s dive into the operational reality of production message systems.
Asynchronous Processing Patterns: Beyond Fire and Forget
The Worker Pool Pattern
Instead of processing messages sequentially, distribute work across multiple workers for parallel processing.
// Naive single-threaded processing - bottleneck waiting to happen
class SingleThreadOrderProcessor {
private isProcessing = false;
async processOrders(): Promise<void> {
if (this.isProcessing) return; // Skip if already processing
this.isProcessing = true;
while (true) {
const message = await this.orderQueue.receive();
if (!message) break;
// Process one order at a time (slow!)
await this.processOrder(message.data);
await this.orderQueue.acknowledge(message);
}
this.isProcessing = false;
}
}
// Scalable worker pool pattern
class WorkerPoolOrderProcessor {
private workers: Worker[] = [];
private workerCount: number;
private isRunning = false;
constructor(workerCount = os.cpus().length) {
this.workerCount = workerCount;
}
async start(): Promise<void> {
this.isRunning = true;
// Start multiple workers
for (let i = 0; i < this.workerCount; i++) {
const worker = new Worker(i);
this.workers.push(worker);
this.startWorker(worker);
}
}
private async startWorker(worker: Worker): Promise<void> {
while (this.isRunning) {
try {
const message = await this.orderQueue.receive({
timeout: 5000, // Don't block forever
workerId: worker.id,
});
if (!message) continue;
// Process with timeout and error handling
await this.processWithTimeout(worker, message);
} catch (error) {
console.error(`Worker ${worker.id} error:`, error);
await this.handleWorkerError(worker, error);
}
}
}
private async processWithTimeout(
worker: Worker,
message: QueueMessage
): Promise<void> {
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error("Processing timeout")), 30000)
);
try {
await Promise.race([
this.processOrder(worker, message.data),
timeoutPromise,
]);
// Success - acknowledge the message
await this.orderQueue.acknowledge(message);
this.updateWorkerMetrics(worker, "success");
} catch (error) {
// Failure - handle retry or dead letter
await this.handleProcessingFailure(message, error);
this.updateWorkerMetrics(worker, "failure");
}
}
private async processOrder(worker: Worker, orderData: any): Promise<void> {
worker.currentOrder = orderData.orderId;
worker.startTime = Date.now();
// Your actual order processing logic here
await this.validateOrder(orderData);
await this.chargePayment(orderData);
await this.reserveInventory(orderData);
await this.createShipment(orderData);
worker.currentOrder = null;
worker.completedOrders++;
}
// Graceful shutdown with in-flight message completion
async shutdown(): Promise<void> {
console.log("Initiating graceful shutdown...");
this.isRunning = false;
// Wait for workers to complete current messages
const shutdownPromises = this.workers.map(async (worker) => {
if (worker.currentOrder) {
console.log(
`Waiting for worker ${worker.id} to finish order ${worker.currentOrder}`
);
// Wait up to 60 seconds for completion
const startTime = Date.now();
while (worker.currentOrder && Date.now() - startTime < 60000) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
});
await Promise.all(shutdownPromises);
console.log("All workers shut down gracefully");
}
}
The Circuit Breaker Pattern for External Dependencies
Prevent cascading failures when external services become unavailable.
// Circuit breaker for payment processing
class PaymentCircuitBreaker {
private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";
private failureCount = 0;
private lastFailureTime: number = 0;
private readonly failureThreshold: number;
private readonly recoveryTimeout: number;
private readonly monitoringWindow: number;
constructor(
failureThreshold = 5, // Open after 5 failures
recoveryTimeout = 60000, // Try again after 1 minute
monitoringWindow = 300000 // Reset count every 5 minutes
) {
this.failureThreshold = failureThreshold;
this.recoveryTimeout = recoveryTimeout;
this.monitoringWindow = monitoringWindow;
}
async executePayment<T>(paymentOperation: () => Promise<T>): Promise<T> {
if (this.state === "OPEN") {
if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
this.state = "HALF_OPEN";
} else {
throw new Error(
"Circuit breaker is OPEN - payment service unavailable"
);
}
}
try {
const result = await paymentOperation();
// Success - reset if we were in HALF_OPEN
if (this.state === "HALF_OPEN") {
this.state = "CLOSED";
this.failureCount = 0;
}
return result;
} catch (error) {
this.recordFailure();
throw error;
}
}
private recordFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
// Reset failure count if monitoring window has passed
if (Date.now() - this.lastFailureTime > this.monitoringWindow) {
this.failureCount = 1;
}
// Open circuit if threshold reached
if (this.failureCount >= this.failureThreshold) {
this.state = "OPEN";
}
}
getState(): { state: string; failureCount: number; lastFailureTime: number } {
return {
state: this.state,
failureCount: this.failureCount,
lastFailureTime: this.lastFailureTime,
};
}
}
// Usage in order processing
class ResilientOrderProcessor {
private paymentBreaker = new PaymentCircuitBreaker();
private inventoryBreaker = new PaymentCircuitBreaker();
private shippingBreaker = new PaymentCircuitBreaker();
async processOrder(orderData: OrderData): Promise<void> {
const orderId = orderData.orderId;
try {
// Payment with circuit breaker
const payment = await this.paymentBreaker.executePayment(async () => {
return await this.paymentService.charge(orderData.payment);
});
// Inventory with circuit breaker and fallback
let inventoryReserved = false;
try {
await this.inventoryBreaker.executePayment(async () => {
await this.inventoryService.reserve(orderData.items);
inventoryReserved = true;
});
} catch (error) {
// Fallback: Allow overselling with manual review
await this.scheduleInventoryReview(orderId, orderData.items);
inventoryReserved = true; // Continue processing
}
// Shipping with circuit breaker
await this.shippingBreaker.executePayment(async () => {
return await this.shippingService.createShipment(orderData);
});
// Success - publish completion event
await this.eventBus.publish("order.completed", {
orderId,
paymentId: payment.id,
inventoryReserved,
timestamp: new Date().toISOString(),
});
} catch (error) {
// Handle partial failures
await this.handlePartialFailure(orderId, error);
}
}
private async handlePartialFailure(
orderId: string,
error: Error
): Promise<void> {
// Implement compensation patterns
if (error.message.includes("payment")) {
// Payment failed - no compensation needed
await this.eventBus.publish("order.payment_failed", {
orderId,
error: error.message,
});
} else if (error.message.includes("inventory")) {
// Inventory failed after payment - refund
await this.eventBus.publish("order.refund_required", {
orderId,
reason: "inventory_unavailable",
});
} else if (error.message.includes("shipping")) {
// Shipping failed - retry later or manual intervention
await this.eventBus.publish("order.shipping_failed", {
orderId,
requiresManualShipping: true,
});
}
}
}
Job Queues and Background Tasks: The Engine Behind User Experience
Priority Queue Implementation
Not all tasks are created equal. Critical operations should jump the line.
// Priority-based job processing
enum JobPriority {
CRITICAL = 0, // Payment failures, security alerts
HIGH = 1, // Order processing, customer requests
MEDIUM = 2, // Email notifications, report generation
LOW = 3, // Data cleanup, analytics processing
}
interface Job {
id: string;
type: string;
priority: JobPriority;
data: any;
attempts: number;
maxAttempts: number;
createdAt: Date;
scheduledFor?: Date;
timeout?: number;
}
class PriorityJobQueue {
private queues: Map<JobPriority, Job[]> = new Map();
private processing: Map<string, Job> = new Map();
private failed: Map<string, Job> = new Map();
constructor() {
// Initialize priority queues
Object.values(JobPriority)
.filter((p) => typeof p === "number")
.forEach((priority) => {
this.queues.set(priority as JobPriority, []);
});
}
// Add job with priority
async addJob(
job: Omit<Job, "id" | "attempts" | "createdAt">
): Promise<string> {
const fullJob: Job = {
...job,
id: uuidv4(),
attempts: 0,
createdAt: new Date(),
};
const queue = this.queues.get(job.priority);
if (!queue) {
throw new Error(`Invalid priority: ${job.priority}`);
}
// Insert maintaining time-based order within priority level
const insertIndex = queue.findIndex(
(existingJob) => existingJob.createdAt > fullJob.createdAt
);
if (insertIndex === -1) {
queue.push(fullJob);
} else {
queue.splice(insertIndex, 0, fullJob);
}
return fullJob.id;
}
// Get next job respecting priority
async getNextJob(): Promise<Job | null> {
// Check each priority level from highest to lowest
for (const priority of [
JobPriority.CRITICAL,
JobPriority.HIGH,
JobPriority.MEDIUM,
JobPriority.LOW,
]) {
const queue = this.queues.get(priority);
if (!queue || queue.length === 0) continue;
// Find first job that's ready to run
const jobIndex = queue.findIndex(
(job) => !job.scheduledFor || job.scheduledFor <= new Date()
);
if (jobIndex !== -1) {
const job = queue.splice(jobIndex, 1)[0];
job.attempts++;
this.processing.set(job.id, job);
return job;
}
}
return null;
}
// Mark job as completed
async completeJob(jobId: string): Promise<void> {
this.processing.delete(jobId);
}
// Handle job failure with retry logic
async failJob(jobId: string, error: Error): Promise<void> {
const job = this.processing.get(jobId);
if (!job) return;
this.processing.delete(jobId);
if (job.attempts < job.maxAttempts) {
// Exponential backoff retry
const backoffMs = Math.min(1000 * Math.pow(2, job.attempts), 300000); // Max 5 minutes
job.scheduledFor = new Date(Date.now() + backoffMs);
// Re-queue for retry
const queue = this.queues.get(job.priority);
queue?.push(job);
} else {
// Max attempts reached - send to dead letter queue
this.failed.set(jobId, job);
await this.handleDeadLetter(job, error);
}
}
private async handleDeadLetter(job: Job, error: Error): Promise<void> {
console.error(`Job ${job.id} failed permanently:`, error);
// Send to monitoring/alerting system
await this.alertingService.sendAlert({
type: "job_failed_permanently",
jobId: job.id,
jobType: job.type,
error: error.message,
attempts: job.attempts,
});
// Store for manual review
await this.deadLetterStore.save({
...job,
failedAt: new Date(),
error: error.message,
});
}
}
// Job processors with different strategies
class JobProcessorRegistry {
private processors: Map<string, JobProcessor> = new Map();
register(jobType: string, processor: JobProcessor): void {
this.processors.set(jobType, processor);
}
async processJob(job: Job): Promise<void> {
const processor = this.processors.get(job.type);
if (!processor) {
throw new Error(`No processor registered for job type: ${job.type}`);
}
const timeoutPromise = job.timeout
? new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error("Job timeout")), job.timeout)
)
: new Promise<never>(() => {}); // Never resolves if no timeout
await Promise.race([processor.process(job.data), timeoutPromise]);
}
}
// Example job processors
class EmailJobProcessor implements JobProcessor {
async process(data: any): Promise<void> {
const { recipient, subject, template, templateData } = data;
const emailContent = await this.templateEngine.render(
template,
templateData
);
await this.emailService.send({
to: recipient,
subject,
html: emailContent,
});
}
}
class PaymentRetryProcessor implements JobProcessor {
async process(data: any): Promise<void> {
const { orderId, paymentMethodId, amount } = data;
// Retry failed payment
const payment = await this.paymentService.retry(paymentMethodId, amount);
if (payment.status === "succeeded") {
await this.eventBus.publish("payment.retry.succeeded", {
orderId,
paymentId: payment.id,
});
} else {
throw new Error(`Payment retry failed: ${payment.failureReason}`);
}
}
}
Scheduled and Recurring Jobs
Handle cron-like functionality with database-backed scheduling.
// Scheduled job system
class JobScheduler {
private scheduledJobs: Map<string, ScheduledJob> = new Map();
private isRunning = false;
async start(): Promise<void> {
this.isRunning = true;
await this.loadScheduledJobs();
this.startSchedulerLoop();
}
// Add recurring job (cron-like)
async scheduleRecurring(
name: string,
cronExpression: string,
jobType: string,
jobData: any,
options: ScheduleOptions = {}
): Promise<void> {
const scheduledJob: ScheduledJob = {
id: uuidv4(),
name,
cronExpression,
jobType,
jobData,
nextRun: this.calculateNextRun(cronExpression),
isActive: true,
...options,
};
this.scheduledJobs.set(name, scheduledJob);
await this.persistScheduledJob(scheduledJob);
}
// Add one-time delayed job
async scheduleDelayed(
jobType: string,
jobData: any,
delayMs: number,
options: ScheduleOptions = {}
): Promise<string> {
const runAt = new Date(Date.now() + delayMs);
const jobId = uuidv4();
const scheduledJob: ScheduledJob = {
id: jobId,
name: `delayed-${jobId}`,
jobType,
jobData,
nextRun: runAt,
isActive: true,
isOneTime: true,
...options,
};
this.scheduledJobs.set(jobId, scheduledJob);
await this.persistScheduledJob(scheduledJob);
return jobId;
}
private async startSchedulerLoop(): Promise<void> {
while (this.isRunning) {
try {
await this.processScheduledJobs();
await new Promise((resolve) => setTimeout(resolve, 60000)); // Check every minute
} catch (error) {
console.error("Scheduler loop error:", error);
}
}
}
private async processScheduledJobs(): Promise<void> {
const now = new Date();
for (const [name, scheduledJob] of this.scheduledJobs) {
if (!scheduledJob.isActive || scheduledJob.nextRun > now) {
continue;
}
try {
// Queue the job for execution
await this.jobQueue.addJob({
type: scheduledJob.jobType,
priority: scheduledJob.priority || JobPriority.MEDIUM,
data: scheduledJob.jobData,
maxAttempts: scheduledJob.maxAttempts || 3,
timeout: scheduledJob.timeout,
});
// Update next run time
if (scheduledJob.isOneTime) {
// Remove one-time jobs after execution
this.scheduledJobs.delete(name);
await this.removeScheduledJob(scheduledJob.id);
} else {
// Calculate next run for recurring jobs
scheduledJob.nextRun = this.calculateNextRun(
scheduledJob.cronExpression
);
await this.persistScheduledJob(scheduledJob);
}
} catch (error) {
console.error(`Failed to queue scheduled job ${name}:`, error);
}
}
}
// Example scheduled job types
async setupCommonJobs(): Promise<void> {
// Daily cleanup of old logs
await this.scheduleRecurring(
"daily-log-cleanup",
"0 2 * * *", // 2 AM daily
"cleanup-logs",
{ olderThanDays: 30 }
);
// Weekly user engagement emails
await this.scheduleRecurring(
"weekly-engagement-emails",
"0 9 * * MON", // 9 AM every Monday
"send-engagement-emails",
{ segmentType: "inactive_users" }
);
// Monthly billing cycle
await this.scheduleRecurring(
"monthly-billing",
"0 0 1 * *", // First day of every month at midnight
"process-monthly-billing",
{}
);
// Hourly health checks
await this.scheduleRecurring(
"health-check-alerts",
"0 * * * *", // Every hour
"system-health-check",
{ alertThreshold: 95 }
);
}
}
Message Delivery Guarantees: The Promise You Must Keep
At-Most-Once vs. At-Least-Once vs. Exactly-Once
Understanding delivery semantics is crucial for building reliable systems.
// At-most-once delivery - fire and forget
class AtMostOnceProcessor {
async processMessage(message: Message): Promise<void> {
// Acknowledge first, then process
await this.messageQueue.acknowledge(message);
try {
await this.processPayment(message.data);
} catch (error) {
// Message is lost if processing fails!
console.error(
"Processing failed, but message already acknowledged:",
error
);
}
}
}
// At-least-once delivery - guarantee processing but risk duplicates
class AtLeastOnceProcessor {
async processMessage(message: Message): Promise<void> {
try {
// Process first
await this.processPayment(message.data);
// Acknowledge only after successful processing
await this.messageQueue.acknowledge(message);
} catch (error) {
// Message will be redelivered (potential duplicate processing)
console.error("Processing failed, message will retry:", error);
throw error;
}
}
}
// Exactly-once delivery - the holy grail (complex but bulletproof)
class ExactlyOnceProcessor {
private processedMessages: Set<string> = new Set();
private processingLock: Map<string, Promise<void>> = new Map();
async processMessage(message: Message): Promise<void> {
const messageId = message.id;
// Check if already processed (idempotency)
if (await this.isAlreadyProcessed(messageId)) {
await this.messageQueue.acknowledge(message);
return;
}
// Check if currently being processed by another worker
const existingProcess = this.processingLock.get(messageId);
if (existingProcess) {
await existingProcess;
return;
}
// Create processing lock
const processingPromise = this.doExactlyOnceProcessing(message);
this.processingLock.set(messageId, processingPromise);
try {
await processingPromise;
} finally {
this.processingLock.delete(messageId);
}
}
private async doExactlyOnceProcessing(message: Message): Promise<void> {
const messageId = message.id;
// Use database transaction for atomicity
await this.database.transaction(async (tx) => {
// Double-check within transaction
const alreadyProcessed = await tx.query(
"SELECT 1 FROM processed_messages WHERE message_id = $1",
[messageId]
);
if (alreadyProcessed.length > 0) {
return; // Already processed
}
// Process the message
await this.processPayment(message.data, tx);
// Mark as processed atomically
await tx.query(
"INSERT INTO processed_messages (message_id, processed_at) VALUES ($1, $2)",
[messageId, new Date()]
);
});
// Acknowledge only after successful transaction
await this.messageQueue.acknowledge(message);
}
private async isAlreadyProcessed(messageId: string): Promise<boolean> {
if (this.processedMessages.has(messageId)) {
return true;
}
const result = await this.database.query(
"SELECT 1 FROM processed_messages WHERE message_id = $1",
[messageId]
);
const processed = result.length > 0;
if (processed) {
this.processedMessages.add(messageId);
}
return processed;
}
}
Implementing Idempotent Operations
Make operations safe to retry without side effects.
// Non-idempotent operation (dangerous)
class BadPaymentProcessor {
async processRefund(orderId: string, amount: number): Promise<void> {
// This creates problems if called multiple times!
const currentBalance = await this.getCustomerBalance(orderId);
const newBalance = currentBalance + amount;
await this.updateCustomerBalance(orderId, newBalance);
await this.createRefundTransaction(orderId, amount);
}
}
// Idempotent operation (safe)
class SafePaymentProcessor {
async processRefund(
orderId: string,
amount: number,
idempotencyKey: string
): Promise<RefundResult> {
// Check if refund already processed
const existingRefund = await this.getRefundByKey(idempotencyKey);
if (existingRefund) {
return existingRefund; // Return same result, no side effects
}
// Process refund atomically
return await this.database.transaction(async (tx) => {
// Lock the order to prevent concurrent processing
const order = await tx.query(
"SELECT * FROM orders WHERE id = $1 FOR UPDATE",
[orderId]
);
if (order.refund_status === "refunded") {
throw new Error("Order already refunded");
}
// Create refund record with idempotency key
const refund = await tx.query(
`
INSERT INTO refunds (order_id, amount, idempotency_key, status, created_at)
VALUES ($1, $2, $3, 'completed', NOW())
RETURNING *
`,
[orderId, amount, idempotencyKey]
);
// Update order status
await tx.query("UPDATE orders SET refund_status = $1 WHERE id = $2", [
"refunded",
orderId,
]);
// Update customer balance
await tx.query(
`
UPDATE customer_balances
SET balance = balance + $1
WHERE customer_id = (SELECT customer_id FROM orders WHERE id = $2)
`,
[amount, orderId]
);
return refund[0];
});
}
private async getRefundByKey(
idempotencyKey: string
): Promise<RefundResult | null> {
const result = await this.database.query(
"SELECT * FROM refunds WHERE idempotency_key = $1",
[idempotencyKey]
);
return result.length > 0 ? result[0] : null;
}
}
// Usage with message processing
class IdempotentMessageProcessor {
async processMessage(message: Message): Promise<void> {
// Use message ID as idempotency key
const idempotencyKey = `msg-${message.id}`;
switch (message.type) {
case "PROCESS_REFUND":
await this.paymentProcessor.processRefund(
message.data.orderId,
message.data.amount,
idempotencyKey
);
break;
case "SEND_EMAIL":
await this.emailService.sendEmail(
message.data.recipient,
message.data.template,
message.data.data,
idempotencyKey
);
break;
}
}
}
Handling Failures and Dead Letters: When Things Go Wrong
Comprehensive Error Handling Strategy
// Dead letter queue implementation
class DeadLetterQueueManager {
private dlqStorage: DeadLetterStorage;
private alerting: AlertingService;
private metrics: MetricsService;
async handleFailedMessage(
originalMessage: Message,
error: Error,
attempts: number,
context: ProcessingContext
): Promise<void> {
const deadLetter: DeadLetter = {
id: uuidv4(),
originalMessageId: originalMessage.id,
originalQueue: context.queueName,
message: originalMessage,
error: {
message: error.message,
stack: error.stack,
code: (error as any).code,
},
attempts,
firstFailedAt: context.firstFailedAt,
lastFailedAt: new Date(),
processingContext: {
workerId: context.workerId,
hostname: os.hostname(),
version: process.env.APP_VERSION,
},
};
// Store in dead letter queue
await this.dlqStorage.store(deadLetter);
// Categorize error for appropriate handling
await this.categorizeAndHandle(deadLetter);
// Update metrics
this.metrics.increment("dlq.messages.total");
this.metrics.increment(`dlq.messages.${this.getErrorCategory(error)}`);
}
private async categorizeAndHandle(deadLetter: DeadLetter): Promise<void> {
const errorCategory = this.getErrorCategory(deadLetter.error);
switch (errorCategory) {
case "transient":
// Temporary issues - retry with exponential backoff
await this.scheduleRetry(deadLetter);
break;
case "configuration":
// Config issues - alert ops team immediately
await this.alerting.sendCriticalAlert({
type: "configuration_error",
message: "Message failed due to configuration issue",
deadLetter,
requiresImmediate: true,
});
break;
case "data_corruption":
// Bad data - needs manual investigation
await this.flagForManualReview(deadLetter);
break;
case "business_logic":
// Business rule violations - might need code changes
await this.alerting.sendAlert({
type: "business_logic_error",
deadLetter,
suggestedAction: "Review business rules and message format",
});
break;
case "external_service":
// External service issues - monitor and retry
await this.handleExternalServiceError(deadLetter);
break;
default:
await this.flagForManualReview(deadLetter);
}
}
private getErrorCategory(error: any): string {
const message = error.message.toLowerCase();
if (message.includes("timeout") || message.includes("network")) {
return "transient";
} else if (message.includes("config") || message.includes("environment")) {
return "configuration";
} else if (message.includes("json") || message.includes("parse")) {
return "data_corruption";
} else if (message.includes("validation") || message.includes("business")) {
return "business_logic";
} else if (message.includes("external") || message.includes("api")) {
return "external_service";
}
return "unknown";
}
// Retry logic with intelligent backoff
private async scheduleRetry(deadLetter: DeadLetter): Promise<void> {
const retryAttempt = await this.getRetryAttempt(deadLetter.id);
if (retryAttempt > 5) {
// Too many retries - give up and alert
await this.flagForManualReview(deadLetter);
return;
}
// Exponential backoff with jitter
const baseDelayMs = 1000 * Math.pow(2, retryAttempt);
const jitterMs = Math.random() * baseDelayMs * 0.1;
const delayMs = baseDelayMs + jitterMs;
await this.scheduler.scheduleDelayed(
"retry-dead-letter",
{ deadLetterId: deadLetter.id },
delayMs
);
await this.updateRetryAttempt(deadLetter.id, retryAttempt + 1);
}
// Manual review queue for complex failures
private async flagForManualReview(deadLetter: DeadLetter): Promise<void> {
await this.manualReviewQueue.add({
id: deadLetter.id,
priority: this.getReviewPriority(deadLetter),
reviewReason: "Failed multiple automatic retry attempts",
suggestedActions: this.getSuggestedActions(deadLetter),
});
// Alert engineering team for critical messages
if (this.isCriticalMessage(deadLetter)) {
await this.alerting.sendCriticalAlert({
type: "critical_message_failed",
deadLetter,
message: "Critical message requires immediate manual review",
});
}
}
// Recovery operations
async reprocessDeadLetter(deadLetterId: string): Promise<void> {
const deadLetter = await this.dlqStorage.get(deadLetterId);
if (!deadLetter) {
throw new Error(`Dead letter ${deadLetterId} not found`);
}
try {
// Attempt reprocessing
await this.originalQueue.requeue(deadLetter.message);
// Mark as recovered
await this.dlqStorage.markRecovered(deadLetterId);
this.metrics.increment("dlq.recovered.success");
} catch (error) {
this.metrics.increment("dlq.recovered.failed");
throw error;
}
}
// Bulk operations for ops teams
async reprocessCategory(
category: string,
limit: number = 100
): Promise<void> {
const deadLetters = await this.dlqStorage.getByCategory(category, limit);
for (const deadLetter of deadLetters) {
try {
await this.reprocessDeadLetter(deadLetter.id);
} catch (error) {
console.error(`Failed to reprocess ${deadLetter.id}:`, error);
}
}
}
}
Monitoring and Alerting for Message Systems
// Comprehensive monitoring for async systems
class MessageSystemMonitor {
private metrics: MetricsCollector;
private alerting: AlertingService;
private healthThresholds: HealthThresholds;
constructor() {
this.healthThresholds = {
queueDepth: { warning: 1000, critical: 5000 },
processingTime: { warning: 30000, critical: 120000 },
errorRate: { warning: 0.05, critical: 0.15 },
dlqGrowthRate: { warning: 10, critical: 50 },
};
}
async startMonitoring(): Promise<void> {
// Monitor queue depths
setInterval(() => this.checkQueueDepths(), 30000); // Every 30 seconds
// Monitor processing times
setInterval(() => this.checkProcessingTimes(), 60000); // Every minute
// Monitor error rates
setInterval(() => this.checkErrorRates(), 120000); // Every 2 minutes
// Monitor dead letter queue growth
setInterval(() => this.checkDLQGrowth(), 300000); // Every 5 minutes
// System health checks
setInterval(() => this.performHealthCheck(), 60000); // Every minute
}
private async checkQueueDepths(): Promise<void> {
const queues = [
"order-processing",
"payment-processing",
"email-notifications",
];
for (const queueName of queues) {
const depth = await this.messageQueue.getQueueDepth(queueName);
this.metrics.gauge(`queue.depth.${queueName}`, depth);
if (depth >= this.healthThresholds.queueDepth.critical) {
await this.alerting.sendCriticalAlert({
type: "queue_depth_critical",
queue: queueName,
depth,
message: `Queue ${queueName} has ${depth} messages (critical threshold: ${this.healthThresholds.queueDepth.critical})`,
});
} else if (depth >= this.healthThresholds.queueDepth.warning) {
await this.alerting.sendWarning({
type: "queue_depth_warning",
queue: queueName,
depth,
});
}
}
}
private async checkProcessingTimes(): Promise<void> {
const processingTimes = await this.metrics.getAverageProcessingTime();
for (const [jobType, avgTime] of processingTimes) {
this.metrics.gauge(`processing.time.avg.${jobType}`, avgTime);
if (avgTime >= this.healthThresholds.processingTime.critical) {
await this.alerting.sendAlert({
type: "slow_processing",
jobType,
averageTime: avgTime,
threshold: this.healthThresholds.processingTime.critical,
});
}
}
}
async generateHealthReport(): Promise<HealthReport> {
return {
timestamp: new Date(),
queueDepths: await this.getAllQueueDepths(),
processingRates: await this.getProcessingRates(),
errorRates: await this.getErrorRates(),
deadLetterCounts: await this.getDLQCounts(),
workerStatuses: await this.getWorkerStatuses(),
systemHealth: await this.calculateOverallHealth(),
};
}
private async calculateOverallHealth(): Promise<HealthStatus> {
const checks = [
await this.checkQueuesHealthy(),
await this.checkProcessingHealthy(),
await this.checkErrorRatesHealthy(),
await this.checkWorkersHealthy(),
];
const healthyChecks = checks.filter(
(check) => check.status === "healthy"
).length;
const healthPercentage = (healthyChecks / checks.length) * 100;
if (healthPercentage >= 90) {
return { status: "healthy", percentage: healthPercentage };
} else if (healthPercentage >= 70) {
return { status: "degraded", percentage: healthPercentage };
} else {
return { status: "unhealthy", percentage: healthPercentage };
}
}
}
Key Takeaways
Building production-ready async systems requires more than just queueing messages. You need bulletproof error handling, intelligent retry strategies, comprehensive monitoring, and graceful failure recovery.
Essential patterns for success:
- Worker pools for parallel processing and scalability
- Circuit breakers to prevent cascading failures
- Priority queues to handle critical operations first
- Idempotent operations to make retries safe
- Dead letter queues with intelligent categorization
- Comprehensive monitoring to catch issues before they become disasters
The operational reality checklist:
- ✅ Messages can be safely retried without side effects
- ✅ Failed messages are categorized and handled appropriately
- ✅ System degrades gracefully when dependencies fail
- ✅ Monitoring alerts you before users are affected
- ✅ Recovery procedures are documented and tested
- ✅ Performance scales with load patterns
What’s Next?
You’ve now mastered the complete message queue and event system lifecycle. Next, we’ll explore Caching & Performance Optimization, where we’ll learn how to make your async systems blazingly fast while handling millions of operations.
But first, implement these patterns in your systems. The real test isn’t building async processing—it’s building async processing that works reliably at 3 AM when everything else is going wrong.
Your future self (and your on-call rotation) will thank you.