Background Processing & Jobs - 1/2

The $3 Million E-commerce Meltdown That Exposed Background Job Hell

Picture this catastrophe: Black Friday morning, 6 AM. One of the largest e-commerce platforms in the world is about to face their biggest traffic day. They’ve stress-tested their frontend, optimized their databases, and scaled their servers. Everything looks perfect.

Then the traffic hits.

Within the first hour, users start reporting issues:

  • Order confirmations taking 10+ minutes to arrive
  • Payment processing hanging indefinitely
  • Inventory updates delayed by hours, causing overselling
  • Email notifications completely stopped working
  • Product recommendation engine showing stale data
  • Analytics dashboard frozen with yesterday’s numbers

The frontend was handling millions of requests beautifully. The database was purring along at 30% capacity. But their background job system—the invisible backbone processing orders, sending emails, updating inventory, and generating analytics—had completely collapsed.

Here’s what went wrong:

  • All background jobs were running on a single Redis instance that hit memory limits
  • Email sending was blocking payment processing in the same queue
  • Failed jobs kept retrying without backoff, creating infinite loops
  • No monitoring or alerting on job queue health
  • Critical order processing jobs were stuck behind thousands of promotional emails

By noon, they had:

  • $3 million in abandoned carts due to payment failures
  • 50,000 angry customers who never received order confirmations
  • 200,000 promotional emails that never sent
  • A complete breakdown of their recommendation system
  • Emergency all-hands meeting with C-level executives

The brutal truth? Their real-time user-facing features were rock solid, but they had treated background processing like an afterthought. A single point of failure in their job queue brought down features that directly impacted revenue.

The Uncomfortable Truth About Background Processing

Here’s what separates applications that scale gracefully from those that crumble under pressure: Background processing isn’t just about moving slow tasks out of the request cycle—it’s about building resilient, distributed systems that can handle failures, prioritize work intelligently, and scale independently of your main application.

Most developers approach background jobs like this:

  1. Move slow operations to a basic job queue
  2. Add more workers when things get slow
  3. Hope retry mechanisms will handle failures
  4. Realize too late that job processing is a single point of failure
  5. Panic when the queue becomes a bottleneck during peak traffic

But systems that handle real-world load work differently:

  1. Design job processing as a distributed system from day one
  2. Implement intelligent job prioritization and resource allocation
  3. Build comprehensive monitoring and failure recovery mechanisms
  4. Plan for job isolation and graceful degradation
  5. Treat background processing as mission-critical infrastructure

The difference isn’t just performance—it’s the difference between systems that enhance user experience invisibly and systems that become the reason your users leave.

Ready to build background processing that works like Shopify during Black Friday instead of that side project that crashes when three people use it simultaneously? Let’s dive into the architecture patterns that power reliable job processing at scale.


Job Queue Systems: The Invisible Backbone of Modern Applications

Understanding Job Queue Architecture

// Basic job queue implementation that will fail in production
class BadJobQueue {
  private jobs: Job[] = [];
  private isProcessing = false;

  async addJob(job: Job): Promise<void> {
    // This approach has so many problems it's painful to look at
    this.jobs.push(job);

    if (!this.isProcessing) {
      this.processJobs(); // No error handling, no concurrency control
    }
  }

  private async processJobs(): Promise<void> {
    this.isProcessing = true;

    while (this.jobs.length > 0) {
      const job = this.jobs.shift()!;

      try {
        await this.executeJob(job); // Blocking execution, no timeout
      } catch (error) {
        console.log("Job failed:", error); // Jobs just disappear on failure
      }
    }

    this.isProcessing = false;
  }

  private async executeJob(job: Job): Promise<void> {
    // What if this takes 10 minutes? What if it crashes?
    // What if the server restarts? All jobs lost!
    await job.handler(job.data);
  }
}
// Production-ready job queue with Redis and Bull
import Queue from "bull";
import Redis from "ioredis";

class ProductionJobQueue {
  private queues: Map<string, Queue.Queue> = new Map();
  private redis: Redis;
  private jobProcessors: Map<string, JobProcessor> = new Map();

  constructor() {
    this.redis = new Redis({
      host: process.env.REDIS_HOST || "localhost",
      port: parseInt(process.env.REDIS_PORT || "6379"),
      maxRetriesPerRequest: 3,
      retryDelayOnFailover: 100,
      lazyConnect: true,
    });

    this.setupDefaultQueues();
    this.setupMonitoring();
  }

  private setupDefaultQueues(): void {
    // Different queues for different priorities and types
    const queueConfigs = [
      {
        name: "critical",
        concurrency: 10,
        settings: {
          stalledInterval: 30000, // 30 seconds
          maxStalledCount: 1, // Fail fast for critical jobs
        },
      },
      {
        name: "high",
        concurrency: 8,
        settings: {
          stalledInterval: 60000, // 1 minute
          maxStalledCount: 3,
        },
      },
      {
        name: "default",
        concurrency: 5,
        settings: {
          stalledInterval: 120000, // 2 minutes
          maxStalledCount: 3,
        },
      },
      {
        name: "low",
        concurrency: 2,
        settings: {
          stalledInterval: 300000, // 5 minutes
          maxStalledCount: 5,
        },
      },
      {
        name: "bulk",
        concurrency: 1, // Bulk operations run one at a time
        settings: {
          stalledInterval: 600000, // 10 minutes
          maxStalledCount: 2,
        },
      },
    ];

    queueConfigs.forEach((config) => {
      const queue = new Queue(config.name, {
        redis: {
          host: process.env.REDIS_HOST || "localhost",
          port: parseInt(process.env.REDIS_PORT || "6379"),
        },
        defaultJobOptions: {
          removeOnComplete: 50, // Keep last 50 completed jobs
          removeOnFail: 100, // Keep last 100 failed jobs
          attempts: this.getAttemptsForQueue(config.name),
          backoff: {
            type: "exponential",
            delay: 2000,
          },
        },
        settings: config.settings,
      });

      this.queues.set(config.name, queue);
      this.setupQueueProcessing(queue, config.concurrency);
    });
  }

  private getAttemptsForQueue(queueName: string): number {
    const attemptMap = {
      critical: 3, // Critical jobs retry less to fail fast
      high: 5,
      default: 5,
      low: 10, // Low priority jobs can retry more
      bulk: 3, // Bulk jobs usually shouldn't retry much
    };

    return attemptMap[queueName] || 5;
  }

  private setupQueueProcessing(queue: Queue.Queue, concurrency: number): void {
    // Process jobs with proper error handling and monitoring
    queue.process("*", concurrency, async (job: Queue.Job) => {
      const startTime = Date.now();
      const processor = this.jobProcessors.get(job.name);

      if (!processor) {
        throw new Error(`No processor found for job type: ${job.name}`);
      }

      try {
        // Set up job timeout
        const timeoutMs = processor.timeout || 300000; // Default 5 minutes
        const timeoutPromise = new Promise((_, reject) => {
          setTimeout(() => reject(new Error("Job timeout")), timeoutMs);
        });

        // Race between job execution and timeout
        const result = await Promise.race([
          processor.handler(job.data, job),
          timeoutPromise,
        ]);

        // Log successful completion
        this.logJobCompletion(job, Date.now() - startTime, true);

        return result;
      } catch (error) {
        // Enhanced error logging
        this.logJobCompletion(job, Date.now() - startTime, false, error);

        // Update job progress for monitoring
        await job.progress(100);

        throw error; // Re-throw to let Bull handle retries
      }
    });

    // Set up event listeners for monitoring
    this.setupQueueEventListeners(queue);
  }

  private setupQueueEventListeners(queue: Queue.Queue): void {
    queue.on("completed", (job: Queue.Job, result: any) => {
      console.log(`Job ${job.id} completed in queue ${queue.name}`);
      this.updateMetrics("completed", queue.name);
    });

    queue.on("failed", (job: Queue.Job, error: Error) => {
      console.error(`Job ${job.id} failed in queue ${queue.name}:`, error);
      this.updateMetrics("failed", queue.name);

      // Alert on critical job failures
      if (queue.name === "critical") {
        this.alertOnCriticalJobFailure(job, error);
      }
    });

    queue.on("stalled", (job: Queue.Job) => {
      console.warn(`Job ${job.id} stalled in queue ${queue.name}`);
      this.updateMetrics("stalled", queue.name);
    });

    queue.on("progress", (job: Queue.Job, progress: number) => {
      // Update job progress for long-running tasks
      this.updateJobProgress(job.id, progress);
    });
  }

  // Register job processors
  registerProcessor(jobType: string, processor: JobProcessor): void {
    this.jobProcessors.set(jobType, processor);
  }

  // Add jobs with intelligent queue selection
  async addJob(
    jobType: string,
    data: any,
    options: JobOptions = {}
  ): Promise<Queue.Job> {
    const queueName = this.selectQueue(jobType, options.priority);
    const queue = this.queues.get(queueName);

    if (!queue) {
      throw new Error(`Queue ${queueName} not found`);
    }

    const jobOptions: Queue.JobOptions = {
      priority: this.getPriorityScore(options.priority),
      delay: options.delay || 0,
      attempts: options.attempts || this.getAttemptsForQueue(queueName),
      backoff: options.backoff || {
        type: "exponential",
        delay: 2000,
      },
      removeOnComplete: options.removeOnComplete ?? true,
      removeOnFail: options.removeOnFail ?? false,
    };

    if (options.jobId) {
      jobOptions.jobId = options.jobId;
    }

    if (options.repeat) {
      jobOptions.repeat = options.repeat;
    }

    const job = await queue.add(jobType, data, jobOptions);

    this.logJobCreation(job, queueName);

    return job;
  }

  private selectQueue(jobType: string, priority?: JobPriority): string {
    if (priority === "critical") return "critical";
    if (priority === "high") return "high";
    if (priority === "low") return "low";

    // Job type based routing
    const jobTypeQueues = {
      "send-email": "default",
      "process-payment": "critical",
      "generate-report": "bulk",
      "resize-image": "default",
      "send-notification": "high",
      "cleanup-data": "low",
      "backup-database": "bulk",
    };

    return jobTypeQueues[jobType] || "default";
  }

  private getPriorityScore(priority?: JobPriority): number {
    const priorityScores = {
      critical: 100,
      high: 75,
      normal: 50,
      low: 25,
      bulk: 1,
    };

    return priorityScores[priority || "normal"];
  }

  // Job status and monitoring
  async getJobStatus(jobId: string): Promise<JobStatus | null> {
    for (const [queueName, queue] of this.queues) {
      try {
        const job = await queue.getJob(jobId);
        if (job) {
          return {
            id: job.id,
            name: job.name,
            data: job.data,
            progress: job.progress(),
            state: await job.getState(),
            queueName,
            attempts: job.attemptsMade,
            maxAttempts: job.opts.attempts || 0,
            createdAt: new Date(job.timestamp),
            processedAt: job.processedOn ? new Date(job.processedOn) : null,
            finishedAt: job.finishedOn ? new Date(job.finishedOn) : null,
            failedReason: job.failedReason,
          };
        }
      } catch (error) {
        // Continue searching other queues
        continue;
      }
    }

    return null;
  }

  // Queue statistics and health monitoring
  async getQueueStats(): Promise<QueueStats[]> {
    const stats: QueueStats[] = [];

    for (const [queueName, queue] of this.queues) {
      const waiting = await queue.getWaiting();
      const active = await queue.getActive();
      const completed = await queue.getCompleted();
      const failed = await queue.getFailed();
      const delayed = await queue.getDelayed();

      stats.push({
        queueName,
        waiting: waiting.length,
        active: active.length,
        completed: completed.length,
        failed: failed.length,
        delayed: delayed.length,
        totalJobs:
          waiting.length +
          active.length +
          completed.length +
          failed.length +
          delayed.length,
      });
    }

    return stats;
  }

  // Clean up completed and failed jobs
  async cleanupJobs(olderThanMs: number = 24 * 60 * 60 * 1000): Promise<void> {
    for (const [queueName, queue] of this.queues) {
      try {
        await queue.clean(olderThanMs, "completed");
        await queue.clean(olderThanMs, "failed");
        console.log(`Cleaned up old jobs in queue: ${queueName}`);
      } catch (error) {
        console.error(`Failed to clean queue ${queueName}:`, error);
      }
    }
  }

  // Graceful shutdown
  async shutdown(): Promise<void> {
    console.log("Shutting down job queues...");

    const shutdownPromises = Array.from(this.queues.values()).map((queue) =>
      queue.close()
    );

    await Promise.all(shutdownPromises);
    await this.redis.disconnect();

    console.log("Job queues shut down successfully");
  }

  private logJobCreation(job: Queue.Job, queueName: string): void {
    console.log(`Job ${job.id} (${job.name}) added to queue ${queueName}`);
  }

  private logJobCompletion(
    job: Queue.Job,
    duration: number,
    success: boolean,
    error?: Error
  ): void {
    const message = `Job ${job.id} (${job.name}) ${
      success ? "completed" : "failed"
    } in ${duration}ms`;

    if (success) {
      console.log(message);
    } else {
      console.error(message, error);
    }
  }

  private updateMetrics(event: string, queueName: string): void {
    // Update your metrics collection system
    // This could be Prometheus, StatsD, etc.
  }

  private updateJobProgress(jobId: string, progress: number): void {
    // Update progress in external system for UI display
  }

  private async alertOnCriticalJobFailure(
    job: Queue.Job,
    error: Error
  ): Promise<void> {
    // Send alert to your alerting system (PagerDuty, Slack, etc.)
    console.error(`CRITICAL: Job ${job.id} failed:`, error);
  }
}

// Job processor interface
interface JobProcessor {
  handler: (data: any, job?: Queue.Job) => Promise<any>;
  timeout?: number; // Timeout in milliseconds
}

// Job options interface
interface JobOptions {
  priority?: JobPriority;
  delay?: number;
  attempts?: number;
  backoff?: {
    type: string;
    delay: number;
  };
  removeOnComplete?: boolean;
  removeOnFail?: boolean;
  jobId?: string;
  repeat?: {
    cron?: string;
    every?: number;
  };
}

type JobPriority = "critical" | "high" | "normal" | "low" | "bulk";

interface JobStatus {
  id: string;
  name: string;
  data: any;
  progress: number;
  state: string;
  queueName: string;
  attempts: number;
  maxAttempts: number;
  createdAt: Date;
  processedAt: Date | null;
  finishedAt: Date | null;
  failedReason?: string;
}

interface QueueStats {
  queueName: string;
  waiting: number;
  active: number;
  completed: number;
  failed: number;
  delayed: number;
  totalJobs: number;
}

Cron Jobs and Scheduling: Time-Based Task Automation

Building Robust Scheduling Systems

// Advanced cron job management with clustering support
import { CronJob } from "cron";
import Redis from "ioredis";

class AdvancedScheduler {
  private jobs: Map<string, CronJob> = new Map();
  private redis: Redis;
  private nodeId: string;
  private isLeader: boolean = false;
  private leaderElectionInterval: NodeJS.Timeout | null = null;

  constructor() {
    this.redis = new Redis(process.env.REDIS_URL);
    this.nodeId = `scheduler-${Date.now()}-${Math.random()
      .toString(36)
      .substr(2, 9)}`;
    this.startLeaderElection();
    this.setupGracefulShutdown();
  }

  // Leader election for distributed scheduling
  private startLeaderElection(): void {
    this.leaderElectionInterval = setInterval(async () => {
      try {
        const result = await this.redis.set(
          "scheduler:leader",
          this.nodeId,
          "PX",
          30000, // 30 second TTL
          "NX" // Only set if key doesn't exist
        );

        if (result === "OK") {
          if (!this.isLeader) {
            console.log(`Node ${this.nodeId} became leader`);
            this.isLeader = true;
            this.startScheduledJobs();
          }
        } else {
          // Check if we're still the leader
          const currentLeader = await this.redis.get("scheduler:leader");
          if (currentLeader !== this.nodeId && this.isLeader) {
            console.log(`Node ${this.nodeId} lost leadership`);
            this.isLeader = false;
            this.stopScheduledJobs();
          }
        }

        // Refresh leadership if we're still leader
        if (this.isLeader) {
          await this.redis.pexpire("scheduler:leader", 30000);
        }
      } catch (error) {
        console.error("Leader election error:", error);
        if (this.isLeader) {
          this.isLeader = false;
          this.stopScheduledJobs();
        }
      }
    }, 10000); // Check every 10 seconds
  }

  // Schedule a new cron job
  schedule(
    jobId: string,
    cronPattern: string,
    handler: () => Promise<void>,
    options: ScheduleOptions = {}
  ): void {
    const job = new CronJob({
      cronTime: cronPattern,
      onTick: async () => {
        if (!this.isLeader) {
          return; // Only leader executes scheduled jobs
        }

        await this.executeScheduledJob(jobId, handler, options);
      },
      start: false,
      timeZone: options.timeZone || "UTC",
    });

    this.jobs.set(jobId, job);

    // Start immediately if we're already leader
    if (this.isLeader) {
      job.start();
    }

    console.log(`Scheduled job ${jobId} with pattern ${cronPattern}`);
  }

  private async executeScheduledJob(
    jobId: string,
    handler: () => Promise<void>,
    options: ScheduleOptions
  ): Promise<void> {
    const startTime = Date.now();
    const lockKey = `scheduler:lock:${jobId}`;
    const lockTTL = options.timeout || 300000; // Default 5 minutes

    try {
      // Acquire distributed lock to prevent concurrent execution
      const lockAcquired = await this.redis.set(
        lockKey,
        this.nodeId,
        "PX",
        lockTTL,
        "NX"
      );

      if (lockAcquired !== "OK") {
        console.log(`Job ${jobId} skipped - already running`);
        return;
      }

      console.log(`Executing scheduled job: ${jobId}`);

      // Set up timeout
      const timeoutPromise = new Promise<void>((_, reject) => {
        setTimeout(() => reject(new Error("Job timeout")), lockTTL);
      });

      // Execute job with timeout
      await Promise.race([handler(), timeoutPromise]);

      const duration = Date.now() - startTime;
      console.log(`Scheduled job ${jobId} completed in ${duration}ms`);

      // Track execution metrics
      await this.recordJobExecution(jobId, duration, true);
    } catch (error) {
      const duration = Date.now() - startTime;
      console.error(
        `Scheduled job ${jobId} failed after ${duration}ms:`,
        error
      );

      await this.recordJobExecution(jobId, duration, false, error);

      // Handle job failure based on options
      if (options.onError) {
        try {
          await options.onError(error);
        } catch (handlerError) {
          console.error(`Error handler for job ${jobId} failed:`, handlerError);
        }
      }
    } finally {
      // Release lock
      await this.redis.del(lockKey);
    }
  }

  // Dynamic scheduling with database persistence
  async scheduleFromDatabase(): Promise<void> {
    try {
      const scheduledJobs = await this.getScheduledJobsFromDB();

      for (const jobConfig of scheduledJobs) {
        this.schedule(
          jobConfig.id,
          jobConfig.cronPattern,
          () => this.executeDatabaseJob(jobConfig),
          {
            timeZone: jobConfig.timeZone,
            timeout: jobConfig.timeout,
            onError: async (error) => {
              await this.logJobError(jobConfig.id, error);
            },
          }
        );
      }

      console.log(`Loaded ${scheduledJobs.length} jobs from database`);
    } catch (error) {
      console.error("Failed to load scheduled jobs from database:", error);
    }
  }

  private async executeDatabaseJob(
    jobConfig: DatabaseJobConfig
  ): Promise<void> {
    // Execute job based on configuration
    switch (jobConfig.type) {
      case "cleanup":
        await this.executeCleanupJob(jobConfig);
        break;
      case "report":
        await this.executeReportJob(jobConfig);
        break;
      case "backup":
        await this.executeBackupJob(jobConfig);
        break;
      case "maintenance":
        await this.executeMaintenanceJob(jobConfig);
        break;
      default:
        throw new Error(`Unknown job type: ${jobConfig.type}`);
    }
  }

  // Specific job implementations
  private async executeCleanupJob(config: DatabaseJobConfig): Promise<void> {
    console.log(`Running cleanup job: ${config.id}`);

    // Example: Clean up old logs
    if (config.parameters?.cleanupType === "logs") {
      const olderThanDays = config.parameters.olderThanDays || 30;
      const cutoffDate = new Date();
      cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);

      // Your cleanup logic here
      const deletedCount = await this.cleanupOldLogs(cutoffDate);
      console.log(`Cleaned up ${deletedCount} old log entries`);
    }
  }

  private async executeReportJob(config: DatabaseJobConfig): Promise<void> {
    console.log(`Running report job: ${config.id}`);

    // Generate and send reports
    const reportData = await this.generateReport(config.parameters?.reportType);
    await this.sendReport(reportData, config.parameters?.recipients);
  }

  private async executeBackupJob(config: DatabaseJobConfig): Promise<void> {
    console.log(`Running backup job: ${config.id}`);

    // Perform backup
    await this.performBackup(config.parameters?.backupType);
  }

  // Advanced scheduling patterns
  setupCommonSchedules(): void {
    // Every minute health check
    this.schedule(
      "health-check",
      "* * * * *",
      async () => {
        await this.performHealthCheck();
      },
      { timeout: 30000 } // 30 second timeout
    );

    // Every 5 minutes queue monitoring
    this.schedule("queue-monitor", "*/5 * * * *", async () => {
      await this.monitorQueueHealth();
    });

    // Hourly cleanup of temporary files
    this.schedule("temp-cleanup", "0 * * * *", async () => {
      await this.cleanupTempFiles();
    });

    // Daily database maintenance at 2 AM
    this.schedule(
      "db-maintenance",
      "0 2 * * *",
      async () => {
        await this.performDatabaseMaintenance();
      },
      { timeout: 1800000 } // 30 minute timeout
    );

    // Weekly report generation on Sundays at 6 AM
    this.schedule(
      "weekly-reports",
      "0 6 * * 0",
      async () => {
        await this.generateWeeklyReports();
      },
      { timeout: 3600000 } // 1 hour timeout
    );

    // Monthly billing processing on 1st at midnight
    this.schedule(
      "monthly-billing",
      "0 0 1 * *",
      async () => {
        await this.processMonthlyBilling();
      },
      { timeout: 7200000 } // 2 hour timeout
    );
  }

  // Job management methods
  unschedule(jobId: string): boolean {
    const job = this.jobs.get(jobId);
    if (job) {
      job.stop();
      this.jobs.delete(jobId);
      console.log(`Unscheduled job: ${jobId}`);
      return true;
    }
    return false;
  }

  pauseJob(jobId: string): boolean {
    const job = this.jobs.get(jobId);
    if (job) {
      job.stop();
      console.log(`Paused job: ${jobId}`);
      return true;
    }
    return false;
  }

  resumeJob(jobId: string): boolean {
    const job = this.jobs.get(jobId);
    if (job && this.isLeader) {
      job.start();
      console.log(`Resumed job: ${jobId}`);
      return true;
    }
    return false;
  }

  getJobStatus(): Map<string, JobScheduleStatus> {
    const status = new Map<string, JobScheduleStatus>();

    for (const [jobId, job] of this.jobs) {
      status.set(jobId, {
        jobId,
        isRunning: job.running,
        nextRun: job.nextDate()?.toDate() || null,
        lastRun: job.lastDate()?.toDate() || null,
      });
    }

    return status;
  }

  private startScheduledJobs(): void {
    if (this.isLeader) {
      for (const job of this.jobs.values()) {
        job.start();
      }
      console.log("Started all scheduled jobs");
    }
  }

  private stopScheduledJobs(): void {
    for (const job of this.jobs.values()) {
      job.stop();
    }
    console.log("Stopped all scheduled jobs");
  }

  private async recordJobExecution(
    jobId: string,
    duration: number,
    success: boolean,
    error?: Error
  ): Promise<void> {
    const record = {
      jobId,
      nodeId: this.nodeId,
      startTime: Date.now() - duration,
      duration,
      success,
      error: error?.message,
    };

    await this.redis.lpush(
      `scheduler:executions:${jobId}`,
      JSON.stringify(record)
    );
    await this.redis.ltrim(`scheduler:executions:${jobId}`, 0, 100); // Keep last 100 executions
  }

  private setupGracefulShutdown(): void {
    const shutdown = async () => {
      console.log("Shutting down scheduler...");

      if (this.leaderElectionInterval) {
        clearInterval(this.leaderElectionInterval);
      }

      // Release leadership
      if (this.isLeader) {
        await this.redis.del("scheduler:leader");
      }

      // Stop all jobs
      this.stopScheduledJobs();

      await this.redis.disconnect();
      process.exit(0);
    };

    process.on("SIGINT", shutdown);
    process.on("SIGTERM", shutdown);
  }

  // Placeholder methods for actual implementations
  private async getScheduledJobsFromDB(): Promise<DatabaseJobConfig[]> {
    // Load from your database
    return [];
  }

  private async cleanupOldLogs(cutoffDate: Date): Promise<number> {
    // Implement log cleanup
    return 0;
  }

  private async generateReport(reportType: string): Promise<any> {
    // Implement report generation
    return {};
  }

  private async sendReport(
    reportData: any,
    recipients: string[]
  ): Promise<void> {
    // Implement report sending
  }

  private async performBackup(backupType: string): Promise<void> {
    // Implement backup logic
  }

  private async performHealthCheck(): Promise<void> {
    // Implement health check
  }

  private async monitorQueueHealth(): Promise<void> {
    // Implement queue monitoring
  }

  private async cleanupTempFiles(): Promise<void> {
    // Implement temp file cleanup
  }

  private async performDatabaseMaintenance(): Promise<void> {
    // Implement database maintenance
  }

  private async generateWeeklyReports(): Promise<void> {
    // Implement weekly report generation
  }

  private async processMonthlyBilling(): Promise<void> {
    // Implement monthly billing
  }

  private async logJobError(jobId: string, error: Error): Promise<void> {
    // Implement error logging
  }
}

interface ScheduleOptions {
  timeZone?: string;
  timeout?: number;
  onError?: (error: Error) => Promise<void>;
}

interface DatabaseJobConfig {
  id: string;
  type: "cleanup" | "report" | "backup" | "maintenance";
  cronPattern: string;
  timeZone?: string;
  timeout?: number;
  parameters?: Record<string, any>;
}

interface JobScheduleStatus {
  jobId: string;
  isRunning: boolean;
  nextRun: Date | null;
  lastRun: Date | null;
}

Background Task Patterns: Organizing Work for Scale

Common Background Processing Patterns

// Producer-Consumer Pattern with multiple workers
class ProducerConsumerJobSystem {
  private producers: Map<string, Producer> = new Map();
  private consumers: Map<string, Consumer> = new Map();
  private jobQueue: ProductionJobQueue;

  constructor(jobQueue: ProductionJobQueue) {
    this.jobQueue = jobQueue;
    this.setupCommonPatterns();
  }

  private setupCommonPatterns(): void {
    this.setupEmailProcessingPattern();
    this.setupImageProcessingPattern();
    this.setupDataProcessingPattern();
    this.setupReportGenerationPattern();
  }

  // Email processing pattern
  private setupEmailProcessingPattern(): void {
    // Producer: Creates email jobs
    this.producers.set("email", {
      name: "email-producer",
      generate: async (data: EmailBatchData) => {
        const { recipients, template, variables } = data;

        // Create individual email jobs for each recipient
        for (const recipient of recipients) {
          await this.jobQueue.addJob(
            "send-email",
            {
              to: recipient.email,
              template,
              variables: { ...variables, ...recipient.customVars },
            },
            {
              priority: "normal",
              attempts: 5,
            }
          );
        }
      },
    });

    // Consumer: Processes email jobs
    this.jobQueue.registerProcessor("send-email", {
      handler: async (data: EmailJobData) => {
        const { to, template, variables } = data;

        // Load email template
        const emailTemplate = await this.loadEmailTemplate(template);

        // Render with variables
        const renderedEmail = this.renderEmailTemplate(
          emailTemplate,
          variables
        );

        // Send email
        await this.sendEmail({
          to,
          subject: renderedEmail.subject,
          html: renderedEmail.html,
          text: renderedEmail.text,
        });

        console.log(`Email sent to ${to}`);
      },
      timeout: 30000, // 30 seconds
    });
  }

  // Image processing pipeline pattern
  private setupImageProcessingPattern(): void {
    // Multi-stage image processing
    this.jobQueue.registerProcessor("process-image", {
      handler: async (data: ImageProcessingData, job) => {
        const { imageUrl, transformations, outputPath } = data;

        await job?.progress(10);

        // Download original image
        const originalImage = await this.downloadImage(imageUrl);

        await job?.progress(30);

        // Apply transformations sequentially
        let processedImage = originalImage;
        for (const [index, transformation] of transformations.entries()) {
          processedImage = await this.applyTransformation(
            processedImage,
            transformation
          );

          const progress = 30 + (index + 1) * (60 / transformations.length);
          await job?.progress(progress);
        }

        await job?.progress(90);

        // Upload processed image
        const finalUrl = await this.uploadProcessedImage(
          processedImage,
          outputPath
        );

        await job?.progress(100);

        return { processedImageUrl: finalUrl };
      },
      timeout: 300000, // 5 minutes
    });

    // Batch image processing producer
    this.producers.set("image-batch", {
      name: "image-batch-producer",
      generate: async (data: ImageBatchData) => {
        const { images, transformations } = data;

        // Create processing job for each image
        const jobs = images.map((image) =>
          this.jobQueue.addJob(
            "process-image",
            {
              imageUrl: image.url,
              transformations,
              outputPath: image.outputPath,
            },
            {
              priority: "normal",
            }
          )
        );

        // Wait for all jobs to be queued
        await Promise.all(jobs);

        return { totalImages: images.length };
      },
    });
  }

  // Data processing with Map-Reduce pattern
  private setupDataProcessingPattern(): void {
    // Map phase: Split large datasets
    this.jobQueue.registerProcessor("data-map", {
      handler: async (data: DataMapData) => {
        const { dataset, chunkSize, mapFunction } = data;

        const chunks = this.chunkArray(dataset, chunkSize);
        const results = [];

        for (const chunk of chunks) {
          const result = await this.executeMapFunction(mapFunction, chunk);
          results.push(result);
        }

        return { mappedResults: results };
      },
      timeout: 600000, // 10 minutes
    });

    // Reduce phase: Combine results
    this.jobQueue.registerProcessor("data-reduce", {
      handler: async (data: DataReduceData) => {
        const { mappedResults, reduceFunction } = data;

        const finalResult = await this.executeReduceFunction(
          reduceFunction,
          mappedResults
        );

        return { reducedResult: finalResult };
      },
      timeout: 300000, // 5 minutes
    });

    // Orchestrator for Map-Reduce jobs
    this.producers.set("map-reduce", {
      name: "map-reduce-orchestrator",
      generate: async (data: MapReduceData) => {
        const { dataset, chunkSize, mapFunction, reduceFunction } = data;

        // Start map jobs
        const mapJobs = [];
        const chunks = this.chunkArray(dataset, chunkSize);

        for (const chunk of chunks) {
          const mapJob = await this.jobQueue.addJob("data-map", {
            dataset: chunk,
            chunkSize: chunkSize,
            mapFunction,
          });
          mapJobs.push(mapJob);
        }

        // Wait for all map jobs to complete
        const mapResults = await Promise.all(
          mapJobs.map((job) => this.waitForJobCompletion(job.id))
        );

        // Start reduce job
        await this.jobQueue.addJob("data-reduce", {
          mappedResults: mapResults.map((r) => r.mappedResults).flat(),
          reduceFunction,
        });

        return { mapJobCount: mapJobs.length };
      },
    });
  }

  // Report generation with dependency management
  private setupReportGenerationPattern(): void {
    this.jobQueue.registerProcessor("generate-report", {
      handler: async (data: ReportGenerationData, job) => {
        const { reportType, dateRange, recipients } = data;

        await job?.progress(10);

        // Collect data from multiple sources
        const dataSources = await this.identifyDataSources(reportType);
        const collectedData = {};

        for (const [index, source] of dataSources.entries()) {
          collectedData[source.name] = await this.collectDataFromSource(
            source,
            dateRange
          );

          const progress = 10 + (index + 1) * (60 / dataSources.length);
          await job?.progress(progress);
        }

        await job?.progress(70);

        // Generate report
        const report = await this.generateReport(
          reportType,
          collectedData,
          dateRange
        );

        await job?.progress(90);

        // Distribute report
        await this.distributeReport(report, recipients);

        await job?.progress(100);

        return { reportId: report.id, recipients: recipients.length };
      },
      timeout: 1800000, // 30 minutes
    });
  }

  // Chain pattern for sequential processing
  async createProcessingChain(steps: ChainStep[]): Promise<string> {
    const chainId = `chain-${Date.now()}-${Math.random()
      .toString(36)
      .substr(2, 9)}`;

    // Create jobs in sequence, each depending on the previous
    let previousJobId: string | null = null;

    for (const [index, step] of steps.entries()) {
      const jobId = `${chainId}-step-${index}`;

      await this.jobQueue.addJob(
        "chain-step",
        {
          stepIndex: index,
          step,
          chainId,
          previousJobId,
        },
        {
          jobId,
          priority: "normal",
          delay: previousJobId ? 1000 : 0, // Small delay to ensure order
        }
      );

      previousJobId = jobId;
    }

    return chainId;
  }

  // Chain step processor
  private setupChainProcessor(): void {
    this.jobQueue.registerProcessor("chain-step", {
      handler: async (data: ChainStepData) => {
        const { stepIndex, step, chainId, previousJobId } = data;

        // Wait for previous step if exists
        if (previousJobId) {
          await this.waitForJobCompletion(previousJobId);
        }

        // Execute current step
        console.log(`Executing step ${stepIndex} of chain ${chainId}`);

        const result = await step.handler(step.data);

        // Store result for next step
        await this.redis.setex(
          `chain:${chainId}:step:${stepIndex}`,
          3600, // 1 hour TTL
          JSON.stringify(result)
        );

        return result;
      },
    });
  }

  // Fan-out/Fan-in pattern
  async createFanOutFanIn(
    fanOutData: any,
    fanOutJobs: FanOutJob[],
    fanInHandler: (results: any[]) => Promise<any>
  ): Promise<string> {
    const fanOutId = `fanout-${Date.now()}-${Math.random()
      .toString(36)
      .substr(2, 9)}`;

    // Create fan-out jobs
    const jobIds = [];
    for (const [index, fanOutJob] of fanOutJobs.entries()) {
      const jobId = `${fanOutId}-fanout-${index}`;

      await this.jobQueue.addJob(
        fanOutJob.jobType,
        {
          ...fanOutJob.data,
          fanOutId,
          fanOutIndex: index,
        },
        {
          jobId,
          priority: fanOutJob.priority || "normal",
        }
      );

      jobIds.push(jobId);
    }

    // Create fan-in job that waits for all fan-out jobs
    await this.jobQueue.addJob(
      "fan-in",
      {
        fanOutId,
        fanOutJobIds: jobIds,
        fanInHandler: fanInHandler.toString(), // Serialize function
      },
      {
        jobId: `${fanOutId}-fanin`,
        delay: 5000, // Give fan-out jobs time to start
      }
    );

    return fanOutId;
  }

  // Utility methods
  private chunkArray<T>(array: T[], chunkSize: number): T[][] {
    const chunks = [];
    for (let i = 0; i < array.length; i += chunkSize) {
      chunks.push(array.slice(i, i + chunkSize));
    }
    return chunks;
  }

  private async waitForJobCompletion(jobId: string): Promise<any> {
    // Poll job status until completion
    let attempts = 0;
    const maxAttempts = 300; // 5 minutes with 1-second intervals

    while (attempts < maxAttempts) {
      const status = await this.jobQueue.getJobStatus(jobId);

      if (!status) {
        throw new Error(`Job ${jobId} not found`);
      }

      if (status.state === "completed") {
        return status.data;
      }

      if (status.state === "failed") {
        throw new Error(`Job ${jobId} failed: ${status.failedReason}`);
      }

      await new Promise((resolve) => setTimeout(resolve, 1000));
      attempts++;
    }

    throw new Error(`Job ${jobId} timed out waiting for completion`);
  }

  // Placeholder implementations
  private async loadEmailTemplate(template: string): Promise<any> {
    return {};
  }
  private renderEmailTemplate(template: any, variables: any): any {
    return {};
  }
  private async sendEmail(emailData: any): Promise<void> {}
  private async downloadImage(url: string): Promise<any> {
    return null;
  }
  private async applyTransformation(
    image: any,
    transformation: any
  ): Promise<any> {
    return image;
  }
  private async uploadProcessedImage(
    image: any,
    path: string
  ): Promise<string> {
    return "";
  }
  private async executeMapFunction(func: any, data: any): Promise<any> {
    return null;
  }
  private async executeReduceFunction(func: any, data: any): Promise<any> {
    return null;
  }
  private async identifyDataSources(reportType: string): Promise<any[]> {
    return [];
  }
  private async collectDataFromSource(
    source: any,
    dateRange: any
  ): Promise<any> {
    return {};
  }
  private async generateReport(
    type: string,
    data: any,
    dateRange: any
  ): Promise<any> {
    return {};
  }
  private async distributeReport(
    report: any,
    recipients: any[]
  ): Promise<void> {}
}

// Pattern interfaces
interface Producer {
  name: string;
  generate: (data: any) => Promise<any>;
}

interface Consumer {
  name: string;
  process: (data: any) => Promise<any>;
}

interface ChainStep {
  handler: (data: any) => Promise<any>;
  data: any;
}

interface ChainStepData {
  stepIndex: number;
  step: ChainStep;
  chainId: string;
  previousJobId: string | null;
}

interface FanOutJob {
  jobType: string;
  data: any;
  priority?: JobPriority;
}

// Data interfaces for different job types
interface EmailBatchData {
  recipients: Array<{ email: string; customVars?: Record<string, any> }>;
  template: string;
  variables: Record<string, any>;
}

interface EmailJobData {
  to: string;
  template: string;
  variables: Record<string, any>;
}

interface ImageProcessingData {
  imageUrl: string;
  transformations: any[];
  outputPath: string;
}

interface ImageBatchData {
  images: Array<{ url: string; outputPath: string }>;
  transformations: any[];
}

interface DataMapData {
  dataset: any[];
  chunkSize: number;
  mapFunction: any;
}

interface DataReduceData {
  mappedResults: any[];
  reduceFunction: any;
}

interface MapReduceData {
  dataset: any[];
  chunkSize: number;
  mapFunction: any;
  reduceFunction: any;
}

interface ReportGenerationData {
  reportType: string;
  dateRange: { start: Date; end: Date };
  recipients: string[];
}

Worker Processes and Threading: Maximizing Concurrent Processing

Building Scalable Worker Architecture

// Advanced worker pool management with clustering
import cluster from "cluster";
import os from "os";
import { Worker, isMainThread, parentPort, workerData } from "worker_threads";

class WorkerPoolManager {
  private workers: Map<string, WorkerInfo> = new Map();
  private taskQueue: Task[] = [];
  private completedTasks: Map<string, TaskResult> = new Map();
  private workerConfig: WorkerConfig;

  constructor(config: WorkerConfig) {
    this.workerConfig = config;
    this.initializeWorkerPool();
    this.setupTaskDistribution();
    this.setupHealthMonitoring();
  }

  private initializeWorkerPool(): void {
    const numWorkers = this.workerConfig.maxWorkers || os.cpus().length;

    console.log(`Initializing worker pool with ${numWorkers} workers`);

    for (let i = 0; i < numWorkers; i++) {
      this.createWorker(`worker-${i}`);
    }
  }

  private createWorker(workerId: string): void {
    const worker = new Worker(__filename, {
      workerData: {
        workerId,
        workerType: this.workerConfig.workerType,
      },
    });

    const workerInfo: WorkerInfo = {
      id: workerId,
      worker,
      isAvailable: true,
      currentTask: null,
      tasksCompleted: 0,
      tasksErrored: 0,
      startedAt: Date.now(),
      lastActivity: Date.now(),
      memoryUsage: 0,
      cpuUsage: 0,
    };

    // Handle worker messages
    worker.on("message", (message: WorkerMessage) => {
      this.handleWorkerMessage(workerId, message);
    });

    // Handle worker errors
    worker.on("error", (error) => {
      console.error(`Worker ${workerId} error:`, error);
      this.handleWorkerError(workerId, error);
    });

    // Handle worker exit
    worker.on("exit", (code) => {
      console.log(`Worker ${workerId} exited with code ${code}`);
      this.handleWorkerExit(workerId, code);
    });

    this.workers.set(workerId, workerInfo);

    console.log(`Worker ${workerId} created and ready`);
  }

  private handleWorkerMessage(workerId: string, message: WorkerMessage): void {
    const workerInfo = this.workers.get(workerId);
    if (!workerInfo) return;

    switch (message.type) {
      case "task-completed":
        this.handleTaskCompleted(workerId, message.payload);
        break;
      case "task-failed":
        this.handleTaskFailed(workerId, message.payload);
        break;
      case "progress-update":
        this.handleProgressUpdate(workerId, message.payload);
        break;
      case "health-stats":
        this.handleHealthStats(workerId, message.payload);
        break;
    }

    workerInfo.lastActivity = Date.now();
  }

  private handleTaskCompleted(workerId: string, payload: any): void {
    const workerInfo = this.workers.get(workerId);
    if (!workerInfo || !workerInfo.currentTask) return;

    const task = workerInfo.currentTask;

    // Store result
    this.completedTasks.set(task.id, {
      taskId: task.id,
      result: payload.result,
      completedAt: Date.now(),
      workerId,
      executionTime: payload.executionTime,
    });

    // Update worker stats
    workerInfo.tasksCompleted++;
    workerInfo.currentTask = null;
    workerInfo.isAvailable = true;

    console.log(`Task ${task.id} completed by worker ${workerId}`);

    // Process next task if available
    this.assignTaskToWorker(workerId);
  }

  private handleTaskFailed(workerId: string, payload: any): void {
    const workerInfo = this.workers.get(workerId);
    if (!workerInfo || !workerInfo.currentTask) return;

    const task = workerInfo.currentTask;

    // Update worker stats
    workerInfo.tasksErrored++;
    workerInfo.currentTask = null;
    workerInfo.isAvailable = true;

    console.error(
      `Task ${task.id} failed on worker ${workerId}:`,
      payload.error
    );

    // Handle retry logic
    if (task.retryCount < task.maxRetries) {
      task.retryCount++;
      task.lastError = payload.error;
      this.taskQueue.unshift(task); // Add back to front of queue
      console.log(
        `Task ${task.id} queued for retry (attempt ${task.retryCount}/${task.maxRetries})`
      );
    } else {
      // Store failed result
      this.completedTasks.set(task.id, {
        taskId: task.id,
        error: payload.error,
        completedAt: Date.now(),
        workerId,
        failed: true,
      });
    }

    // Process next task
    this.assignTaskToWorker(workerId);
  }

  // Task submission and management
  async submitTask(taskData: TaskData): Promise<string> {
    const task: Task = {
      id: `task-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
      type: taskData.type,
      data: taskData.data,
      priority: taskData.priority || "normal",
      submittedAt: Date.now(),
      retryCount: 0,
      maxRetries: taskData.maxRetries || 3,
      timeout: taskData.timeout || 300000, // 5 minutes default
    };

    // Insert task into queue based on priority
    this.insertTaskByPriority(task);

    // Try to assign immediately to available worker
    this.assignTaskToAvailableWorker();

    console.log(`Task ${task.id} submitted to queue`);

    return task.id;
  }

  private insertTaskByPriority(task: Task): void {
    const priorityValues = { critical: 4, high: 3, normal: 2, low: 1 };
    const taskPriority = priorityValues[task.priority];

    // Find insertion point based on priority
    let insertIndex = this.taskQueue.length;
    for (let i = 0; i < this.taskQueue.length; i++) {
      const queuedTaskPriority = priorityValues[this.taskQueue[i].priority];
      if (taskPriority > queuedTaskPriority) {
        insertIndex = i;
        break;
      }
    }

    this.taskQueue.splice(insertIndex, 0, task);
  }

  private assignTaskToAvailableWorker(): void {
    if (this.taskQueue.length === 0) return;

    for (const [workerId, workerInfo] of this.workers) {
      if (workerInfo.isAvailable) {
        this.assignTaskToWorker(workerId);
        break;
      }
    }
  }

  private assignTaskToWorker(workerId: string): void {
    const workerInfo = this.workers.get(workerId);
    if (!workerInfo || !workerInfo.isAvailable || this.taskQueue.length === 0) {
      return;
    }

    const task = this.taskQueue.shift()!;

    workerInfo.currentTask = task;
    workerInfo.isAvailable = false;

    // Send task to worker
    workerInfo.worker.postMessage({
      type: "execute-task",
      payload: {
        task: {
          id: task.id,
          type: task.type,
          data: task.data,
          timeout: task.timeout,
        },
      },
    });

    console.log(`Task ${task.id} assigned to worker ${workerId}`);
  }

  private setupTaskDistribution(): void {
    // Periodically check for pending tasks
    setInterval(() => {
      if (this.taskQueue.length > 0) {
        this.assignTaskToAvailableWorker();
      }
    }, 1000);
  }

  // Health monitoring and auto-scaling
  private setupHealthMonitoring(): void {
    setInterval(async () => {
      await this.monitorWorkerHealth();
      await this.autoScale();
    }, 30000); // Every 30 seconds
  }

  private async monitorWorkerHealth(): Promise<void> {
    for (const [workerId, workerInfo] of this.workers) {
      // Check for stuck workers
      if (
        workerInfo.currentTask &&
        Date.now() - workerInfo.lastActivity > 300000
      ) {
        // 5 minutes
        console.warn(`Worker ${workerId} appears stuck, restarting...`);
        await this.restartWorker(workerId);
      }

      // Request health stats from worker
      workerInfo.worker.postMessage({
        type: "health-check",
        payload: {},
      });
    }
  }

  private async autoScale(): Promise<void> {
    const totalWorkers = this.workers.size;
    const availableWorkers = Array.from(this.workers.values()).filter(
      (w) => w.isAvailable
    ).length;
    const queueLength = this.taskQueue.length;

    // Scale up if queue is building up
    if (
      queueLength > totalWorkers * 2 &&
      totalWorkers < this.workerConfig.maxWorkers!
    ) {
      const newWorkerId = `worker-${Date.now()}`;
      this.createWorker(newWorkerId);
      console.log(`Scaled up: Created worker ${newWorkerId}`);
    }

    // Scale down if too many idle workers
    if (
      availableWorkers > totalWorkers * 0.7 &&
      totalWorkers > this.workerConfig.minWorkers! &&
      queueLength === 0
    ) {
      // Find least busy worker to terminate
      const workerToTerminate = this.findLeastBusyWorker();
      if (workerToTerminate) {
        await this.terminateWorker(workerToTerminate.id);
        console.log(`Scaled down: Terminated worker ${workerToTerminate.id}`);
      }
    }
  }

  private async restartWorker(workerId: string): Promise<void> {
    const workerInfo = this.workers.get(workerId);
    if (!workerInfo) return;

    // If worker has a current task, add it back to queue
    if (workerInfo.currentTask) {
      this.taskQueue.unshift(workerInfo.currentTask);
    }

    // Terminate and recreate worker
    await this.terminateWorker(workerId);
    this.createWorker(workerId);
  }

  private async terminateWorker(workerId: string): Promise<void> {
    const workerInfo = this.workers.get(workerId);
    if (!workerInfo) return;

    try {
      await workerInfo.worker.terminate();
    } catch (error) {
      console.error(`Error terminating worker ${workerId}:`, error);
    }

    this.workers.delete(workerId);
  }

  private findLeastBusyWorker(): WorkerInfo | null {
    let leastBusy: WorkerInfo | null = null;

    for (const workerInfo of this.workers.values()) {
      if (
        workerInfo.isAvailable &&
        (!leastBusy || workerInfo.tasksCompleted < leastBusy.tasksCompleted)
      ) {
        leastBusy = workerInfo;
      }
    }

    return leastBusy;
  }

  // Task result retrieval
  async getTaskResult(
    taskId: string,
    timeoutMs: number = 300000
  ): Promise<any> {
    const startTime = Date.now();

    while (Date.now() - startTime < timeoutMs) {
      const result = this.completedTasks.get(taskId);
      if (result) {
        return result;
      }

      // Check if task is still in queue or being processed
      const inQueue = this.taskQueue.some((t) => t.id === taskId);
      const beingProcessed = Array.from(this.workers.values()).some(
        (w) => w.currentTask?.id === taskId
      );

      if (!inQueue && !beingProcessed) {
        throw new Error(`Task ${taskId} not found`);
      }

      // Wait before checking again
      await new Promise((resolve) => setTimeout(resolve, 100));
    }

    throw new Error(`Task ${taskId} timed out`);
  }

  // Statistics and monitoring
  getPoolStats(): WorkerPoolStats {
    const workers = Array.from(this.workers.values());

    return {
      totalWorkers: workers.length,
      availableWorkers: workers.filter((w) => w.isAvailable).length,
      busyWorkers: workers.filter((w) => !w.isAvailable).length,
      queueLength: this.taskQueue.length,
      totalTasksCompleted: workers.reduce(
        (sum, w) => sum + w.tasksCompleted,
        0
      ),
      totalTasksErrored: workers.reduce((sum, w) => sum + w.tasksErrored, 0),
      avgTasksPerWorker:
        workers.reduce((sum, w) => sum + w.tasksCompleted, 0) / workers.length,
      oldestWorkerAge: Math.max(
        ...workers.map((w) => Date.now() - w.startedAt)
      ),
    };
  }

  // Graceful shutdown
  async shutdown(): Promise<void> {
    console.log("Shutting down worker pool...");

    // Wait for current tasks to complete (with timeout)
    const shutdownTimeout = 60000; // 1 minute
    const startTime = Date.now();

    while (Date.now() - startTime < shutdownTimeout) {
      const busyWorkers = Array.from(this.workers.values()).filter(
        (w) => !w.isAvailable
      );

      if (busyWorkers.length === 0) {
        break;
      }

      await new Promise((resolve) => setTimeout(resolve, 1000));
    }

    // Terminate all workers
    const terminatePromises = Array.from(this.workers.keys()).map((workerId) =>
      this.terminateWorker(workerId)
    );

    await Promise.all(terminatePromises);

    console.log("Worker pool shut down successfully");
  }
}

// Worker thread implementation
if (!isMainThread) {
  const { workerId, workerType } = workerData;

  // Worker message handler
  parentPort?.on("message", async (message: WorkerMessage) => {
    switch (message.type) {
      case "execute-task":
        await executeTask(message.payload.task);
        break;
      case "health-check":
        sendHealthStats();
        break;
    }
  });

  async function executeTask(task: any): Promise<void> {
    const startTime = Date.now();

    try {
      // Set up task timeout
      const timeoutPromise = new Promise((_, reject) => {
        setTimeout(() => reject(new Error("Task timeout")), task.timeout);
      });

      // Execute task based on type
      const taskPromise = executeTaskByType(task);

      // Race between task execution and timeout
      const result = await Promise.race([taskPromise, timeoutPromise]);

      // Send success message
      parentPort?.postMessage({
        type: "task-completed",
        payload: {
          taskId: task.id,
          result,
          executionTime: Date.now() - startTime,
        },
      });
    } catch (error) {
      // Send error message
      parentPort?.postMessage({
        type: "task-failed",
        payload: {
          taskId: task.id,
          error: error.message,
          executionTime: Date.now() - startTime,
        },
      });
    }
  }

  async function executeTaskByType(task: any): Promise<any> {
    switch (task.type) {
      case "cpu-intensive":
        return executeCPUIntensiveTask(task.data);
      case "io-intensive":
        return executeIOIntensiveTask(task.data);
      case "data-processing":
        return executeDataProcessingTask(task.data);
      default:
        throw new Error(`Unknown task type: ${task.type}`);
    }
  }

  function sendHealthStats(): void {
    const memUsage = process.memoryUsage();

    parentPort?.postMessage({
      type: "health-stats",
      payload: {
        workerId,
        memoryUsage: memUsage,
        uptime: process.uptime(),
      },
    });
  }

  // Task type implementations
  async function executeCPUIntensiveTask(data: any): Promise<any> {
    // Simulate CPU-intensive work
    const { iterations = 1000000 } = data;

    let result = 0;
    for (let i = 0; i < iterations; i++) {
      result += Math.sqrt(i);
    }

    return { result, iterations };
  }

  async function executeIOIntensiveTask(data: any): Promise<any> {
    // Simulate I/O intensive work
    const { url, timeout = 10000 } = data;

    // Simulate network request
    await new Promise((resolve) => setTimeout(resolve, Math.random() * 1000));

    return { url, responseTime: Date.now() };
  }

  async function executeDataProcessingTask(data: any): Promise<any> {
    // Simulate data processing
    const { dataset, operation } = data;

    let processedData;

    switch (operation) {
      case "filter":
        processedData = dataset.filter((item: any) => item.active);
        break;
      case "transform":
        processedData = dataset.map((item: any) => ({
          ...item,
          processed: true,
          timestamp: Date.now(),
        }));
        break;
      case "aggregate":
        processedData = {
          count: dataset.length,
          sum: dataset.reduce(
            (sum: number, item: any) => sum + (item.value || 0),
            0
          ),
        };
        break;
      default:
        throw new Error(`Unknown operation: ${operation}`);
    }

    return { processedData, operation };
  }
}

// Interfaces
interface WorkerConfig {
  maxWorkers?: number;
  minWorkers?: number;
  workerType: string;
}

interface WorkerInfo {
  id: string;
  worker: Worker;
  isAvailable: boolean;
  currentTask: Task | null;
  tasksCompleted: number;
  tasksErrored: number;
  startedAt: number;
  lastActivity: number;
  memoryUsage: number;
  cpuUsage: number;
}

interface Task {
  id: string;
  type: string;
  data: any;
  priority: "critical" | "high" | "normal" | "low";
  submittedAt: number;
  retryCount: number;
  maxRetries: number;
  timeout: number;
  lastError?: string;
}

interface TaskData {
  type: string;
  data: any;
  priority?: "critical" | "high" | "normal" | "low";
  maxRetries?: number;
  timeout?: number;
}

interface TaskResult {
  taskId: string;
  result?: any;
  error?: string;
  completedAt: number;
  workerId: string;
  executionTime?: number;
  failed?: boolean;
}

interface WorkerMessage {
  type: string;
  payload: any;
}

interface WorkerPoolStats {
  totalWorkers: number;
  availableWorkers: number;
  busyWorkers: number;
  queueLength: number;
  totalTasksCompleted: number;
  totalTasksErrored: number;
  avgTasksPerWorker: number;
  oldestWorkerAge: number;
}

Job Retry and Failure Handling: Building Resilient Processing

Intelligent Retry Strategies and Dead Letter Queues

// Advanced retry mechanisms with exponential backoff and circuit breakers
class RetryHandler {
  private retryConfigs: Map<string, RetryConfig> = new Map();
  private circuitBreakers: Map<string, CircuitBreaker> = new Map();
  private deadLetterQueue: DeadLetterQueue;
  private redis: Redis;

  constructor() {
    this.redis = new Redis(process.env.REDIS_URL);
    this.deadLetterQueue = new DeadLetterQueue(this.redis);
    this.setupDefaultRetryConfigs();
    this.setupCircuitBreakers();
  }

  private setupDefaultRetryConfigs(): void {
    // Email sending - gentle retries
    this.retryConfigs.set("send-email", {
      maxAttempts: 5,
      baseDelayMs: 1000,
      maxDelayMs: 300000, // 5 minutes
      backoffStrategy: "exponential",
      jitterFactor: 0.1,
      retryableErrors: ["TIMEOUT", "RATE_LIMITED", "TEMPORARY_FAILURE"],
      nonRetryableErrors: ["INVALID_EMAIL", "AUTHENTICATION_FAILED"],
    });

    // Payment processing - aggressive retries with strict limits
    this.retryConfigs.set("process-payment", {
      maxAttempts: 3,
      baseDelayMs: 500,
      maxDelayMs: 30000, // 30 seconds
      backoffStrategy: "linear",
      jitterFactor: 0.05,
      retryableErrors: ["NETWORK_ERROR", "GATEWAY_TIMEOUT"],
      nonRetryableErrors: [
        "INSUFFICIENT_FUNDS",
        "INVALID_CARD",
        "FRAUD_DETECTED",
      ],
    });

    // Data processing - patient retries
    this.retryConfigs.set("process-data", {
      maxAttempts: 10,
      baseDelayMs: 2000,
      maxDelayMs: 1800000, // 30 minutes
      backoffStrategy: "exponential",
      jitterFactor: 0.2,
      retryableErrors: ["DATABASE_LOCKED", "RESOURCE_BUSY", "TEMPORARY_ERROR"],
      nonRetryableErrors: ["INVALID_DATA", "SCHEMA_ERROR"],
    });

    // External API calls - circuit breaker friendly
    this.retryConfigs.set("external-api", {
      maxAttempts: 7,
      baseDelayMs: 1500,
      maxDelayMs: 600000, // 10 minutes
      backoffStrategy: "exponential",
      jitterFactor: 0.15,
      retryableErrors: ["API_UNAVAILABLE", "RATE_LIMITED", "TIMEOUT"],
      nonRetryableErrors: ["UNAUTHORIZED", "NOT_FOUND", "BAD_REQUEST"],
      enableCircuitBreaker: true,
    });
  }

  private setupCircuitBreakers(): void {
    // Email service circuit breaker
    this.circuitBreakers.set(
      "email-service",
      new CircuitBreaker({
        failureThreshold: 50, // 50% failure rate
        recoveryTimeout: 60000, // 1 minute
        monitoringPeriod: 300000, // 5 minutes
      })
    );

    // Payment gateway circuit breaker
    this.circuitBreakers.set(
      "payment-gateway",
      new CircuitBreaker({
        failureThreshold: 25, // 25% failure rate (stricter for payments)
        recoveryTimeout: 120000, // 2 minutes
        monitoringPeriod: 180000, // 3 minutes
      })
    );

    // External API circuit breaker
    this.circuitBreakers.set(
      "external-api",
      new CircuitBreaker({
        failureThreshold: 60, // 60% failure rate
        recoveryTimeout: 300000, // 5 minutes
        monitoringPeriod: 600000, // 10 minutes
      })
    );
  }

  async handleJobFailure(
    job: FailedJob,
    error: JobError,
    retryContext: RetryContext
  ): Promise<RetryDecision> {
    const config = this.retryConfigs.get(job.type);
    if (!config) {
      return { shouldRetry: false, reason: "No retry config found" };
    }

    // Check if error is retryable
    if (!this.isRetryableError(error, config)) {
      await this.deadLetterQueue.addJob(job, error, "NON_RETRYABLE_ERROR");
      return {
        shouldRetry: false,
        reason: `Non-retryable error: ${error.code}`,
      };
    }

    // Check circuit breaker if enabled
    if (config.enableCircuitBreaker) {
      const circuitBreaker = this.circuitBreakers.get(job.service || job.type);
      if (circuitBreaker && circuitBreaker.isOpen()) {
        await this.deadLetterQueue.addJob(job, error, "CIRCUIT_BREAKER_OPEN");
        return {
          shouldRetry: false,
          reason: "Circuit breaker is open",
        };
      }
    }

    // Check retry limits
    if (retryContext.attemptNumber >= config.maxAttempts) {
      await this.deadLetterQueue.addJob(job, error, "MAX_RETRIES_EXCEEDED");
      return {
        shouldRetry: false,
        reason: "Maximum retry attempts exceeded",
      };
    }

    // Calculate retry delay
    const delay = this.calculateRetryDelay(
      config,
      retryContext.attemptNumber,
      retryContext.previousDelays
    );

    // Update circuit breaker
    if (config.enableCircuitBreaker) {
      const circuitBreaker = this.circuitBreakers.get(job.service || job.type);
      circuitBreaker?.recordFailure();
    }

    // Store retry context
    await this.storeRetryContext(job.id, {
      ...retryContext,
      attemptNumber: retryContext.attemptNumber + 1,
      previousDelays: [...retryContext.previousDelays, delay],
      lastError: error,
      lastAttemptAt: Date.now(),
    });

    return {
      shouldRetry: true,
      delayMs: delay,
      reason: `Retry attempt ${retryContext.attemptNumber + 1}/${
        config.maxAttempts
      }`,
    };
  }

  private isRetryableError(error: JobError, config: RetryConfig): boolean {
    // Check non-retryable errors first
    if (config.nonRetryableErrors.includes(error.code)) {
      return false;
    }

    // Check retryable errors
    if (config.retryableErrors.length > 0) {
      return config.retryableErrors.includes(error.code);
    }

    // Default behavior - retry unless explicitly non-retryable
    return true;
  }

  private calculateRetryDelay(
    config: RetryConfig,
    attemptNumber: number,
    previousDelays: number[]
  ): number {
    let delay: number;

    switch (config.backoffStrategy) {
      case "linear":
        delay = config.baseDelayMs * attemptNumber;
        break;

      case "exponential":
        delay = config.baseDelayMs * Math.pow(2, attemptNumber - 1);
        break;

      case "fibonacci":
        delay = config.baseDelayMs * this.fibonacci(attemptNumber);
        break;

      case "custom":
        delay = this.calculateCustomDelay(
          config,
          attemptNumber,
          previousDelays
        );
        break;

      default:
        delay = config.baseDelayMs;
    }

    // Apply jitter to prevent thundering herd
    if (config.jitterFactor > 0) {
      const jitter = delay * config.jitterFactor * (Math.random() - 0.5) * 2;
      delay += jitter;
    }

    // Ensure delay is within bounds
    delay = Math.max(config.baseDelayMs, Math.min(delay, config.maxDelayMs));

    return Math.round(delay);
  }

  private fibonacci(n: number): number {
    if (n <= 1) return 1;
    let a = 1,
      b = 1;
    for (let i = 2; i < n; i++) {
      [a, b] = [b, a + b];
    }
    return b;
  }

  private calculateCustomDelay(
    config: RetryConfig,
    attemptNumber: number,
    previousDelays: number[]
  ): number {
    // Custom delay calculation based on specific business logic
    // For example, time-aware delays for email sending
    const now = new Date();
    const hour = now.getHours();

    // Avoid business hours for non-critical emails
    if (hour >= 9 && hour <= 17) {
      return config.baseDelayMs * 3; // Longer delay during business hours
    }

    // Shorter delays during off hours
    return config.baseDelayMs;
  }

  // Dead letter queue management
  async getDeadLetterJobs(
    limit: number = 50,
    offset: number = 0
  ): Promise<DeadLetterJob[]> {
    return await this.deadLetterQueue.getJobs(limit, offset);
  }

  async reprocessDeadLetterJob(jobId: string): Promise<boolean> {
    const deadJob = await this.deadLetterQueue.getJob(jobId);
    if (!deadJob) {
      return false;
    }

    // Reset retry context
    await this.storeRetryContext(deadJob.originalJob.id, {
      attemptNumber: 0,
      previousDelays: [],
      lastError: null,
      lastAttemptAt: Date.now(),
    });

    // Remove from dead letter queue
    await this.deadLetterQueue.removeJob(jobId);

    // Re-queue the job
    await this.requeue(deadJob.originalJob);

    return true;
  }

  // Retry context management
  private async storeRetryContext(
    jobId: string,
    context: RetryContext
  ): Promise<void> {
    await this.redis.setex(
      `retry_context:${jobId}`,
      3600, // 1 hour TTL
      JSON.stringify(context)
    );
  }

  async getRetryContext(jobId: string): Promise<RetryContext | null> {
    const contextData = await this.redis.get(`retry_context:${jobId}`);
    if (!contextData) {
      return {
        attemptNumber: 0,
        previousDelays: [],
        lastError: null,
        lastAttemptAt: Date.now(),
      };
    }

    return JSON.parse(contextData);
  }

  // Monitoring and analytics
  async getRetryStatistics(
    timeframe: string = "24h"
  ): Promise<RetryStatistics> {
    const stats = await this.redis.hgetall(`retry_stats:${timeframe}`);

    return {
      totalJobs: parseInt(stats.total_jobs || "0"),
      succeededJobs: parseInt(stats.succeeded_jobs || "0"),
      failedJobs: parseInt(stats.failed_jobs || "0"),
      retriedJobs: parseInt(stats.retried_jobs || "0"),
      deadLetterJobs: parseInt(stats.dead_letter_jobs || "0"),
      avgRetryAttempts: parseFloat(stats.avg_retry_attempts || "0"),
      successRateAfterRetry: parseFloat(stats.success_rate_after_retry || "0"),
    };
  }

  // Helper methods
  private async requeue(job: FailedJob): Promise<void> {
    // Implementation depends on your job queue system
    console.log(`Re-queuing job ${job.id}`);
  }
}

// Circuit breaker implementation
class CircuitBreaker {
  private failureCount: number = 0;
  private successCount: number = 0;
  private lastFailureTime: number = 0;
  private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";

  constructor(private config: CircuitBreakerConfig) {}

  isOpen(): boolean {
    if (this.state === "OPEN") {
      // Check if we should try half-open
      if (Date.now() - this.lastFailureTime > this.config.recoveryTimeout) {
        this.state = "HALF_OPEN";
        return false;
      }
      return true;
    }

    return false;
  }

  recordSuccess(): void {
    this.successCount++;

    if (this.state === "HALF_OPEN") {
      // Recovery successful
      this.state = "CLOSED";
      this.failureCount = 0;
    }
  }

  recordFailure(): void {
    this.failureCount++;
    this.lastFailureTime = Date.now();

    // Calculate failure rate
    const totalRequests = this.failureCount + this.successCount;
    const failureRate = (this.failureCount / totalRequests) * 100;

    if (failureRate >= this.config.failureThreshold) {
      this.state = "OPEN";
    }

    // Reset counters periodically
    if (Date.now() - this.lastFailureTime > this.config.monitoringPeriod) {
      this.failureCount = 0;
      this.successCount = 0;
    }
  }
}

// Dead letter queue implementation
class DeadLetterQueue {
  constructor(private redis: Redis) {}

  async addJob(job: FailedJob, error: JobError, reason: string): Promise<void> {
    const deadLetterJob: DeadLetterJob = {
      id: `dlq-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
      originalJob: job,
      error,
      reason,
      addedAt: Date.now(),
      retryCount: job.retryCount || 0,
    };

    await this.redis.lpush("dead_letter_queue", JSON.stringify(deadLetterJob));

    console.log(`Job ${job.id} added to dead letter queue: ${reason}`);
  }

  async getJobs(limit: number, offset: number): Promise<DeadLetterJob[]> {
    const jobs = await this.redis.lrange(
      "dead_letter_queue",
      offset,
      offset + limit - 1
    );
    return jobs.map((job) => JSON.parse(job));
  }

  async getJob(jobId: string): Promise<DeadLetterJob | null> {
    const jobs = await this.redis.lrange("dead_letter_queue", 0, -1);

    for (const jobData of jobs) {
      const job = JSON.parse(jobData);
      if (job.id === jobId || job.originalJob.id === jobId) {
        return job;
      }
    }

    return null;
  }

  async removeJob(jobId: string): Promise<boolean> {
    const job = await this.getJob(jobId);
    if (!job) return false;

    await this.redis.lrem("dead_letter_queue", 1, JSON.stringify(job));
    return true;
  }
}

// Interfaces
interface RetryConfig {
  maxAttempts: number;
  baseDelayMs: number;
  maxDelayMs: number;
  backoffStrategy: "linear" | "exponential" | "fibonacci" | "custom";
  jitterFactor: number;
  retryableErrors: string[];
  nonRetryableErrors: string[];
  enableCircuitBreaker?: boolean;
}

interface CircuitBreakerConfig {
  failureThreshold: number; // Percentage
  recoveryTimeout: number; // Milliseconds
  monitoringPeriod: number; // Milliseconds
}

interface FailedJob {
  id: string;
  type: string;
  data: any;
  service?: string;
  retryCount?: number;
}

interface JobError {
  code: string;
  message: string;
  stack?: string;
  metadata?: any;
}

interface RetryContext {
  attemptNumber: number;
  previousDelays: number[];
  lastError: JobError | null;
  lastAttemptAt: number;
}

interface RetryDecision {
  shouldRetry: boolean;
  delayMs?: number;
  reason: string;
}

interface DeadLetterJob {
  id: string;
  originalJob: FailedJob;
  error: JobError;
  reason: string;
  addedAt: number;
  retryCount: number;
}

interface RetryStatistics {
  totalJobs: number;
  succeededJobs: number;
  failedJobs: number;
  retriedJobs: number;
  deadLetterJobs: number;
  avgRetryAttempts: number;
  successRateAfterRetry: number;
}

Key Takeaways

Background processing isn’t just about moving slow tasks out of the request cycle—it’s about building resilient, distributed systems that can handle failures gracefully and scale independently of your main application.

Essential background processing patterns:

  • Queue-based architecture with priority-based task distribution and intelligent routing
  • Robust scheduling systems with leader election and distributed coordination
  • Worker pool management with auto-scaling and health monitoring
  • Intelligent retry mechanisms with exponential backoff and circuit breakers
  • Dead letter queues for handling permanently failed jobs

The production-ready job processing framework:

  • Use Redis or similar for reliable job persistence and cross-server coordination
  • Implement multiple queue priorities to handle critical vs. bulk operations differently
  • Build comprehensive monitoring with metrics on queue depth, processing time, and failure rates
  • Design graceful degradation so job failures don’t cascade to user-facing features
  • Plan horizontal scaling with worker auto-scaling based on queue metrics

Background processing best practices:

  • Always implement timeouts for job execution to prevent stuck workers
  • Use exponential backoff for retries to avoid overwhelming failing services
  • Monitor queue health as aggressively as you monitor application health
  • Plan for job isolation so one type of job can’t block others
  • Build idempotent jobs that can be safely retried without side effects

The architecture decision framework:

  • Use simple cron jobs for basic scheduled tasks with low failure tolerance
  • Use Redis/Bull queues for high-throughput job processing with retry logic
  • Use worker threads for CPU-intensive tasks that can be parallelized
  • Use distributed job systems (like Celery) for complex multi-service processing
  • Use cloud functions for sporadic, event-driven background tasks

What’s Next?

In the next blog, we’ll complete our background processing journey by diving into distributed job processing, queue monitoring and management, resource management for background jobs, and scaling background processing across multiple servers.

We’ll also explore the operational challenges of maintaining job processing systems at scale—from handling queue backlog during traffic spikes to debugging why critical background jobs aren’t processing.

Because building background processing that works during development is the easy part. Making it reliable enough to handle Black Friday traffic while processing millions of background tasks—that’s where systems engineering becomes art.