Background Processing & Jobs - 2/2

The $8 Million Queue Collapse That Brought Down Prime Day

Picture this nightmare: Amazon Prime Day morning, 3 AM PDT. The world’s largest e-commerce platform is about to face 48 hours of unprecedented traffic. Their frontend systems are battle-tested, their databases are replicated across multiple regions, and their CDN can handle petabytes of traffic. Everything looks perfect.

Then the job processing system—the invisible engine powering inventory updates, order processing, recommendation calculations, and payment processing—begins to crack under pressure.

By 6 AM, the symptoms were devastating:

  • Order confirmation emails delayed by hours
  • Inventory counts wildly inaccurate, causing overselling
  • Recommendation engine completely stale, showing last week’s data
  • Payment processing queues backing up, creating a 45-minute delay
  • Search indexing stopped working, making new products invisible
  • Analytics pipelines completely frozen, leaving executives blind

The problem wasn’t a single point of failure. It was distributed job processing architecture that looked robust on paper but collapsed like a house of cards in practice:

  • No cross-region job coordination: Jobs were duplicating across regions
  • Resource starvation: Critical order processing jobs starved by millions of recommendation updates
  • No intelligent load balancing: Some workers sat idle while others were overwhelmed
  • No graceful degradation: When payment processing slowed, it brought down inventory updates
  • No real-time monitoring: They discovered problems from customer complaints, not alerts

By noon, they had:

  • $8 million in lost sales due to checkout failures
  • 2 million customers who couldn’t complete purchases
  • 500,000 duplicate charges from retry storms
  • Complete breakdown of their real-time personalization
  • Emergency shutdown of non-critical background processing

The brutal truth? They had scaled their user-facing systems to handle global traffic, but their background processing was still architected like a startup’s single-server setup.

The Uncomfortable Truth About Distributed Job Processing

Here’s what separates systems that scale gracefully across multiple servers from those that create exponentially worse problems as they grow: Distributed job processing isn’t just about running jobs on multiple machines—it’s about building coordinated systems that can maintain consistency, manage resources intelligently, and handle partial failures without cascading disasters.

Most developers approach distributed job processing like this:

  1. Run the same job queue on multiple servers
  2. Point them all at the same Redis instance
  3. Hope that more workers automatically means better performance
  4. Discover that coordination overhead often makes things worse
  5. Panic when jobs start duplicating or conflicting with each other

But systems that handle real-world distributed load work differently:

  1. Design job coordination and deduplication from day one
  2. Implement intelligent resource allocation across the cluster
  3. Build comprehensive monitoring that shows the health of the entire system
  4. Plan for graceful degradation when parts of the cluster fail
  5. Treat job processing as a distributed system with all the complexity that entails

The difference isn’t just performance—it’s the difference between systems that become more reliable as you add resources and systems where adding more servers just creates new ways to fail.

Ready to build job processing that scales like Netflix’s encoding pipeline instead of that background task system that falls over when you add a second server? Let’s dive into the architecture patterns that power distributed job processing at scale.


Distributed Job Processing: Coordination at Scale

Building Cluster-Aware Job Coordination

// Distributed job processing with cluster coordination
class DistributedJobProcessor {
  private nodeId: string;
  private cluster: ClusterManager;
  private jobCoordinator: JobCoordinator;
  private resourceManager: ResourceManager;
  private redis: Redis;
  private workers: Map<string, WorkerPool> = new Map();
  private nodeRegistry: NodeRegistry;

  constructor(config: DistributedProcessorConfig) {
    this.nodeId = `node-${config.region}-${Date.now()}-${Math.random()
      .toString(36)
      .substr(2, 9)}`;
    this.redis = new Redis.Cluster(config.redisClusterNodes);
    this.cluster = new ClusterManager(this.nodeId, this.redis);
    this.jobCoordinator = new JobCoordinator(this.redis);
    this.resourceManager = new ResourceManager(config.resources);
    this.nodeRegistry = new NodeRegistry(this.redis);

    this.setupClusterMembership();
    this.setupJobDistribution();
    this.setupResourceMonitoring();
    this.setupFailureDetection();
  }

  private async setupClusterMembership(): Promise<void> {
    // Register this node in the cluster
    await this.nodeRegistry.registerNode({
      nodeId: this.nodeId,
      region: process.env.AWS_REGION || "us-east-1",
      availability_zone: process.env.AWS_AZ || "us-east-1a",
      instance_type: process.env.INSTANCE_TYPE || "c5.xlarge",
      capabilities: ["email", "image-processing", "data-processing", "reports"],
      max_concurrent_jobs: 50,
      resources: {
        cpu_cores: os.cpus().length,
        memory_mb: Math.round(os.totalmem() / 1024 / 1024),
        disk_space_gb: 100,
      },
      status: "healthy",
      joined_at: Date.now(),
    });

    // Start heartbeat to maintain cluster membership
    setInterval(async () => {
      await this.sendHeartbeat();
    }, 10000); // Every 10 seconds

    console.log(`Node ${this.nodeId} registered in cluster`);
  }

  private async setupJobDistribution(): Promise<void> {
    // Listen for job assignments from the coordinator
    this.redis.subscribe(`jobs:${this.nodeId}`, (message) => {
      const jobAssignment = JSON.parse(message);
      this.handleJobAssignment(jobAssignment);
    });

    // Subscribe to cluster-wide job broadcasts
    this.redis.subscribe("jobs:broadcast", (message) => {
      const jobBroadcast = JSON.parse(message);
      this.handleJobBroadcast(jobBroadcast);
    });

    // Start job coordination loop
    setInterval(async () => {
      await this.coordinateJobDistribution();
    }, 5000); // Every 5 seconds
  }

  private async coordinateJobDistribution(): Promise<void> {
    // Only the cluster leader coordinates job distribution
    if (!(await this.cluster.isLeader())) {
      return;
    }

    try {
      // Get all pending jobs across all queues
      const pendingJobs = await this.jobCoordinator.getPendingJobs();

      if (pendingJobs.length === 0) {
        return;
      }

      // Get cluster topology and capacity
      const clusterNodes = await this.nodeRegistry.getHealthyNodes();
      const clusterCapacity = await this.calculateClusterCapacity(clusterNodes);

      // Distribute jobs based on node capabilities and current load
      const jobDistribution = await this.createJobDistribution(
        pendingJobs,
        clusterNodes,
        clusterCapacity
      );

      // Assign jobs to nodes
      for (const [nodeId, jobs] of jobDistribution.entries()) {
        if (jobs.length > 0) {
          await this.assignJobsToNode(nodeId, jobs);
        }
      }

      console.log(
        `Distributed ${pendingJobs.length} jobs across ${clusterNodes.length} nodes`
      );
    } catch (error) {
      console.error("Error in job distribution coordination:", error);
    }
  }

  private async createJobDistribution(
    pendingJobs: PendingJob[],
    clusterNodes: ClusterNode[],
    clusterCapacity: ClusterCapacity
  ): Promise<Map<string, PendingJob[]>> {
    const distribution = new Map<string, PendingJob[]>();

    // Initialize distribution map
    clusterNodes.forEach((node) => {
      distribution.set(node.nodeId, []);
    });

    // Sort jobs by priority and resource requirements
    const sortedJobs = pendingJobs.sort((a, b) => {
      const priorityWeight =
        this.getPriorityWeight(b.priority) - this.getPriorityWeight(a.priority);
      const resourceWeight =
        b.estimated_resources.cpu - a.estimated_resources.cpu;
      return priorityWeight * 100 + resourceWeight;
    });

    // Distribute jobs using intelligent allocation
    for (const job of sortedJobs) {
      const bestNode = await this.findBestNodeForJob(
        job,
        clusterNodes,
        clusterCapacity
      );

      if (bestNode) {
        distribution.get(bestNode.nodeId)?.push(job);

        // Update capacity calculations
        clusterCapacity.nodes.get(bestNode.nodeId)!.available_slots -= 1;
        clusterCapacity.nodes.get(bestNode.nodeId)!.available_cpu -=
          job.estimated_resources.cpu;
        clusterCapacity.nodes.get(bestNode.nodeId)!.available_memory -=
          job.estimated_resources.memory;
      } else {
        console.warn(`No suitable node found for job ${job.id}`);
      }
    }

    return distribution;
  }

  private async findBestNodeForJob(
    job: PendingJob,
    clusterNodes: ClusterNode[],
    clusterCapacity: ClusterCapacity
  ): Promise<ClusterNode | null> {
    // Filter nodes that can handle this job type
    const capableNodes = clusterNodes.filter(
      (node) =>
        node.capabilities.includes(job.type) && node.status === "healthy"
    );

    if (capableNodes.length === 0) {
      return null;
    }

    // Score each capable node
    const nodeScores = capableNodes.map((node) => {
      const capacity = clusterCapacity.nodes.get(node.nodeId)!;

      // Check if node has sufficient resources
      if (
        capacity.available_slots <= 0 ||
        capacity.available_cpu < job.estimated_resources.cpu ||
        capacity.available_memory < job.estimated_resources.memory
      ) {
        return { node, score: -1 }; // Cannot handle this job
      }

      // Calculate score based on multiple factors
      let score = 0;

      // Resource utilization efficiency (prefer nodes with good fit)
      const cpuUtilization =
        (node.resources.cpu_cores - capacity.available_cpu) /
        node.resources.cpu_cores;
      const memoryUtilization =
        (node.resources.memory_mb - capacity.available_memory) /
        node.resources.memory_mb;
      const avgUtilization = (cpuUtilization + memoryUtilization) / 2;

      // Prefer nodes with moderate utilization (not idle, not overloaded)
      if (avgUtilization >= 0.2 && avgUtilization <= 0.7) {
        score += 50;
      } else if (avgUtilization > 0.7) {
        score += 20; // Slightly penalize high utilization
      } else {
        score += 30; // Slightly penalize idle nodes
      }

      // Geographic/network locality
      if (node.region === this.nodeId.split("-")[1]) {
        score += 20; // Prefer same region
      }

      // Node performance history
      const recentSuccessRate = capacity.recent_success_rate || 1.0;
      score += recentSuccessRate * 20;

      // Load balancing - slightly prefer less loaded nodes
      const loadFactor = capacity.current_jobs / node.max_concurrent_jobs;
      score += (1 - loadFactor) * 10;

      return { node, score };
    });

    // Filter out nodes that cannot handle the job and find the best one
    const validNodes = nodeScores.filter((ns) => ns.score > 0);

    if (validNodes.length === 0) {
      return null;
    }

    // Return the highest scoring node
    validNodes.sort((a, b) => b.score - a.score);
    return validNodes[0].node;
  }

  private async assignJobsToNode(
    nodeId: string,
    jobs: PendingJob[]
  ): Promise<void> {
    const assignment: JobAssignment = {
      assignment_id: `assign-${Date.now()}-${Math.random()
        .toString(36)
        .substr(2, 9)}`,
      target_node: nodeId,
      jobs: jobs,
      assigned_at: Date.now(),
      assigned_by: this.nodeId,
    };

    // Send assignment to target node
    await this.redis.publish(`jobs:${nodeId}`, JSON.stringify(assignment));

    // Mark jobs as assigned
    for (const job of jobs) {
      await this.jobCoordinator.markJobAssigned(job.id, nodeId);
    }
  }

  private async handleJobAssignment(assignment: JobAssignment): Promise<void> {
    console.log(`Received assignment of ${assignment.jobs.length} jobs`);

    for (const job of assignment.jobs) {
      try {
        // Validate that we can still handle this job
        if (!(await this.resourceManager.canHandleJob(job))) {
          await this.rejectJobAssignment(job.id, "Insufficient resources");
          continue;
        }

        // Accept and queue the job
        await this.acceptJobAssignment(job);
      } catch (error) {
        console.error(`Error handling job assignment ${job.id}:`, error);
        await this.rejectJobAssignment(job.id, "Processing error");
      }
    }
  }

  // Job deduplication across the cluster
  private async ensureJobDeduplication(job: Job): Promise<boolean> {
    const deduplicationKey = this.generateDeduplicationKey(job);
    const lockKey = `job_lock:${deduplicationKey}`;

    // Try to acquire distributed lock
    const lockAcquired = await this.redis.set(
      lockKey,
      this.nodeId,
      "PX",
      300000, // 5 minute TTL
      "NX" // Only set if key doesn't exist
    );

    if (lockAcquired === "OK") {
      // We acquired the lock, safe to process
      return true;
    }

    // Check if another node is already processing this job
    const lockHolder = await this.redis.get(lockKey);

    if (lockHolder && lockHolder !== this.nodeId) {
      console.log(
        `Job ${job.id} is already being processed by node ${lockHolder}`
      );
      return false;
    }

    // Lock holder might have failed, try to acquire with longer check
    await new Promise((resolve) => setTimeout(resolve, 1000));

    const retryLock = await this.redis.set(
      lockKey,
      this.nodeId,
      "PX",
      300000,
      "NX"
    );
    return retryLock === "OK";
  }

  private generateDeduplicationKey(job: Job): string {
    // Create a key that uniquely identifies this job's work
    // This could be based on job type + unique identifiers in the job data
    const keyData = {
      type: job.type,
      key_fields: this.extractKeyFields(job),
    };

    return Buffer.from(JSON.stringify(keyData)).toString("base64");
  }

  private extractKeyFields(job: Job): any {
    // Extract fields that make this job unique
    // This is job-type specific logic
    switch (job.type) {
      case "send-email":
        return {
          recipient: job.data.email,
          template: job.data.template,
          timestamp: job.data.send_at,
        };
      case "process-payment":
        return { transaction_id: job.data.transaction_id };
      case "generate-report":
        return { report_type: job.data.type, date_range: job.data.date_range };
      default:
        return { job_id: job.id };
    }
  }

  // Cross-region job replication
  async setupCrossRegionReplication(regions: string[]): Promise<void> {
    for (const region of regions) {
      if (region === this.getMyRegion()) continue;

      // Set up replication stream to other regions
      const replicationStream = new JobReplicationStream(
        this.redis,
        this.getMyRegion(),
        region
      );

      await replicationStream.initialize();

      // Subscribe to jobs from other regions
      this.redis.subscribe(`jobs:replicated:${region}`, (message) => {
        const replicatedJob = JSON.parse(message);
        this.handleReplicatedJob(replicatedJob);
      });
    }
  }

  private async handleReplicatedJob(
    replicatedJob: ReplicatedJob
  ): Promise<void> {
    // Only process replicated jobs if they're critical and we're in a different region
    if (
      replicatedJob.priority !== "critical" ||
      replicatedJob.source_region === this.getMyRegion()
    ) {
      return;
    }

    // Check if we should process this replicated job
    const shouldProcess = await this.shouldProcessReplicatedJob(replicatedJob);

    if (shouldProcess) {
      await this.processReplicatedJob(replicatedJob);
    }
  }

  // Cluster health monitoring
  private async sendHeartbeat(): Promise<void> {
    const heartbeat: NodeHeartbeat = {
      node_id: this.nodeId,
      timestamp: Date.now(),
      status: "healthy",
      current_jobs: this.getCurrentJobCount(),
      available_slots: this.resourceManager.getAvailableSlots(),
      cpu_usage: await this.resourceManager.getCPUUsage(),
      memory_usage: await this.resourceManager.getMemoryUsage(),
      recent_success_rate: await this.calculateRecentSuccessRate(),
      version: process.env.APP_VERSION || "1.0.0",
    };

    await this.nodeRegistry.updateHeartbeat(heartbeat);
  }

  private async calculateClusterCapacity(
    nodes: ClusterNode[]
  ): Promise<ClusterCapacity> {
    const capacity: ClusterCapacity = {
      total_nodes: nodes.length,
      healthy_nodes: nodes.filter((n) => n.status === "healthy").length,
      total_slots: 0,
      available_slots: 0,
      nodes: new Map(),
    };

    for (const node of nodes) {
      const nodeCapacity: NodeCapacity = {
        node_id: node.nodeId,
        total_slots: node.max_concurrent_jobs,
        available_slots: node.max_concurrent_jobs - (node.current_jobs || 0),
        total_cpu: node.resources.cpu_cores,
        available_cpu: node.resources.cpu_cores - (node.cpu_usage || 0),
        total_memory: node.resources.memory_mb,
        available_memory: node.resources.memory_mb - (node.memory_usage || 0),
        current_jobs: node.current_jobs || 0,
        recent_success_rate: node.recent_success_rate || 1.0,
      };

      capacity.nodes.set(node.nodeId, nodeCapacity);
      capacity.total_slots += nodeCapacity.total_slots;
      capacity.available_slots += nodeCapacity.available_slots;
    }

    return capacity;
  }

  // Graceful node shutdown
  async shutdown(): Promise<void> {
    console.log(`Node ${this.nodeId} beginning graceful shutdown...`);

    // Mark node as draining
    await this.nodeRegistry.markNodeDraining(this.nodeId);

    // Stop accepting new jobs
    this.redis.unsubscribe(`jobs:${this.nodeId}`);

    // Wait for current jobs to complete
    await this.waitForJobCompletion(60000); // 1 minute timeout

    // Force stop any remaining jobs
    await this.forceStopRemainingJobs();

    // Unregister from cluster
    await this.nodeRegistry.unregisterNode(this.nodeId);

    console.log(`Node ${this.nodeId} shutdown complete`);
  }

  // Helper methods
  private getPriorityWeight(priority: string): number {
    const weights = { critical: 4, high: 3, normal: 2, low: 1 };
    return weights[priority] || 2;
  }

  private getCurrentJobCount(): number {
    return Array.from(this.workers.values()).reduce(
      (total, pool) => total + pool.getActiveJobCount(),
      0
    );
  }

  private getMyRegion(): string {
    return this.nodeId.split("-")[1];
  }

  private async acceptJobAssignment(job: PendingJob): Promise<void> {
    // Add job to appropriate worker pool
    const pool = this.workers.get(job.type);
    if (pool) {
      await pool.addJob(job);
    }
  }

  private async rejectJobAssignment(
    jobId: string,
    reason: string
  ): Promise<void> {
    await this.jobCoordinator.markJobRejected(jobId, this.nodeId, reason);
  }

  private async calculateRecentSuccessRate(): Promise<number> {
    // Calculate success rate over last hour
    return 0.95; // Placeholder
  }

  private async waitForJobCompletion(timeoutMs: number): Promise<void> {
    const startTime = Date.now();

    while (Date.now() - startTime < timeoutMs) {
      if (this.getCurrentJobCount() === 0) {
        return;
      }

      await new Promise((resolve) => setTimeout(resolve, 1000));
    }
  }

  private async forceStopRemainingJobs(): Promise<void> {
    for (const pool of this.workers.values()) {
      await pool.forceStopAllJobs();
    }
  }

  private async shouldProcessReplicatedJob(
    job: ReplicatedJob
  ): Promise<boolean> {
    // Implement logic to determine if this node should process a replicated job
    return false;
  }

  private async processReplicatedJob(job: ReplicatedJob): Promise<void> {
    // Process a job that was replicated from another region
  }

  private async handleJobBroadcast(broadcast: any): Promise<void> {
    // Handle cluster-wide job broadcasts
  }
}

// Supporting classes and interfaces
class ClusterManager {
  constructor(private nodeId: string, private redis: Redis) {}

  async isLeader(): Promise<boolean> {
    const leader = await this.redis.get("cluster:leader");
    return leader === this.nodeId;
  }
}

class JobCoordinator {
  constructor(private redis: Redis) {}

  async getPendingJobs(): Promise<PendingJob[]> {
    // Get all pending jobs from various queues
    return [];
  }

  async markJobAssigned(jobId: string, nodeId: string): Promise<void> {
    await this.redis.hset(
      `job:${jobId}`,
      "assigned_to",
      nodeId,
      "assigned_at",
      Date.now()
    );
  }

  async markJobRejected(
    jobId: string,
    nodeId: string,
    reason: string
  ): Promise<void> {
    await this.redis.hset(
      `job:${jobId}`,
      "rejected_by",
      nodeId,
      "rejected_reason",
      reason
    );
  }
}

class NodeRegistry {
  constructor(private redis: Redis) {}

  async registerNode(node: ClusterNode): Promise<void> {
    await this.redis.hset("cluster:nodes", node.nodeId, JSON.stringify(node));
  }

  async getHealthyNodes(): Promise<ClusterNode[]> {
    const nodes = await this.redis.hgetall("cluster:nodes");
    return Object.values(nodes)
      .map((nodeData) => JSON.parse(nodeData))
      .filter((node) => node.status === "healthy");
  }

  async updateHeartbeat(heartbeat: NodeHeartbeat): Promise<void> {
    await this.redis.setex(
      `heartbeat:${heartbeat.node_id}`,
      30,
      JSON.stringify(heartbeat)
    );
  }

  async markNodeDraining(nodeId: string): Promise<void> {
    const nodeData = await this.redis.hget("cluster:nodes", nodeId);
    if (nodeData) {
      const node = JSON.parse(nodeData);
      node.status = "draining";
      await this.redis.hset("cluster:nodes", nodeId, JSON.stringify(node));
    }
  }

  async unregisterNode(nodeId: string): Promise<void> {
    await this.redis.hdel("cluster:nodes", nodeId);
    await this.redis.del(`heartbeat:${nodeId}`);
  }
}

// Interfaces
interface DistributedProcessorConfig {
  redisClusterNodes: string[];
  region: string;
  resources: ResourceConfig;
}

interface ResourceConfig {
  max_concurrent_jobs: number;
  cpu_cores: number;
  memory_mb: number;
}

interface ClusterNode {
  nodeId: string;
  region: string;
  availability_zone: string;
  instance_type: string;
  capabilities: string[];
  max_concurrent_jobs: number;
  resources: {
    cpu_cores: number;
    memory_mb: number;
    disk_space_gb: number;
  };
  status: "healthy" | "unhealthy" | "draining";
  joined_at: number;
  current_jobs?: number;
  cpu_usage?: number;
  memory_usage?: number;
  recent_success_rate?: number;
}

interface PendingJob {
  id: string;
  type: string;
  data: any;
  priority: string;
  estimated_resources: {
    cpu: number;
    memory: number;
    duration_ms: number;
  };
  created_at: number;
}

interface JobAssignment {
  assignment_id: string;
  target_node: string;
  jobs: PendingJob[];
  assigned_at: number;
  assigned_by: string;
}

interface ClusterCapacity {
  total_nodes: number;
  healthy_nodes: number;
  total_slots: number;
  available_slots: number;
  nodes: Map<string, NodeCapacity>;
}

interface NodeCapacity {
  node_id: string;
  total_slots: number;
  available_slots: number;
  total_cpu: number;
  available_cpu: number;
  total_memory: number;
  available_memory: number;
  current_jobs: number;
  recent_success_rate: number;
}

interface NodeHeartbeat {
  node_id: string;
  timestamp: number;
  status: string;
  current_jobs: number;
  available_slots: number;
  cpu_usage: number;
  memory_usage: number;
  recent_success_rate: number;
  version: string;
}

interface ReplicatedJob {
  job: Job;
  source_region: string;
  priority: string;
  replicated_at: number;
}

interface Job {
  id: string;
  type: string;
  data: any;
}

Queue Monitoring and Management: Observability at Scale

Real-time Queue Health Monitoring

// Comprehensive queue monitoring and alerting system
class QueueMonitoringSystem {
  private metrics: MetricsCollector;
  private alertManager: AlertManager;
  private dashboardData: Map<string, DashboardMetrics> = new Map();
  private redis: Redis;
  private monitoringIntervals: Map<string, NodeJS.Timeout> = new Map();

  constructor() {
    this.redis = new Redis(process.env.REDIS_URL);
    this.metrics = new MetricsCollector();
    this.alertManager = new AlertManager();

    this.setupMonitoring();
    this.setupAlerting();
    this.setupMetricsCollection();
  }

  private setupMonitoring(): void {
    // Queue depth monitoring
    this.monitoringIntervals.set(
      "queue-depth",
      setInterval(async () => {
        await this.monitorQueueDepth();
      }, 30000)
    ); // Every 30 seconds

    // Processing rate monitoring
    this.monitoringIntervals.set(
      "processing-rate",
      setInterval(async () => {
        await this.monitorProcessingRate();
      }, 60000)
    ); // Every minute

    // Worker health monitoring
    this.monitoringIntervals.set(
      "worker-health",
      setInterval(async () => {
        await this.monitorWorkerHealth();
      }, 15000)
    ); // Every 15 seconds

    // Queue latency monitoring
    this.monitoringIntervals.set(
      "queue-latency",
      setInterval(async () => {
        await this.monitorQueueLatency();
      }, 45000)
    ); // Every 45 seconds

    // Resource utilization monitoring
    this.monitoringIntervals.set(
      "resource-util",
      setInterval(async () => {
        await this.monitorResourceUtilization();
      }, 120000)
    ); // Every 2 minutes
  }

  private async monitorQueueDepth(): Promise<void> {
    const queues = ["critical", "high", "default", "low", "bulk"];

    for (const queueName of queues) {
      try {
        const depth = await this.getQueueDepth(queueName);
        const activeJobs = await this.getActiveJobCount(queueName);
        const delayedJobs = await this.getDelayedJobCount(queueName);

        // Record metrics
        await this.metrics.recordGauge(`queue.depth.${queueName}`, depth);
        await this.metrics.recordGauge(`queue.active.${queueName}`, activeJobs);
        await this.metrics.recordGauge(
          `queue.delayed.${queueName}`,
          delayedJobs
        );

        // Check for depth alerts
        await this.checkQueueDepthAlerts(queueName, depth, activeJobs);

        // Update dashboard data
        this.updateDashboardMetrics(queueName, {
          depth,
          activeJobs,
          delayedJobs,
          timestamp: Date.now(),
        });
      } catch (error) {
        console.error(`Error monitoring queue ${queueName}:`, error);
        await this.alertManager.sendAlert({
          severity: "warning",
          title: "Queue Monitoring Error",
          description: `Failed to monitor queue ${queueName}: ${error.message}`,
          queue: queueName,
        });
      }
    }
  }

  private async monitorProcessingRate(): Promise<void> {
    const queues = ["critical", "high", "default", "low", "bulk"];

    for (const queueName of queues) {
      try {
        // Get processing stats for the last minute
        const stats = await this.getProcessingStats(queueName, 60000); // 1 minute

        const processingRate = stats.completed / 60; // jobs per second
        const errorRate = stats.failed / (stats.completed + stats.failed || 1);
        const avgProcessingTime =
          stats.totalProcessingTime / (stats.completed || 1);

        // Record metrics
        await this.metrics.recordGauge(
          `queue.processing_rate.${queueName}`,
          processingRate
        );
        await this.metrics.recordGauge(
          `queue.error_rate.${queueName}`,
          errorRate
        );
        await this.metrics.recordGauge(
          `queue.avg_processing_time.${queueName}`,
          avgProcessingTime
        );

        // Check for performance alerts
        await this.checkProcessingRateAlerts(
          queueName,
          processingRate,
          errorRate,
          avgProcessingTime
        );

        // Update dashboard
        this.updateDashboardMetrics(queueName, {
          processingRate,
          errorRate,
          avgProcessingTime,
          timestamp: Date.now(),
        });
      } catch (error) {
        console.error(
          `Error monitoring processing rate for ${queueName}:`,
          error
        );
      }
    }
  }

  private async monitorWorkerHealth(): Promise<void> {
    try {
      const workers = await this.getActiveWorkers();

      for (const worker of workers) {
        const health = await this.checkWorkerHealth(worker);

        // Record worker metrics
        await this.metrics.recordGauge(
          `worker.cpu_usage.${worker.id}`,
          health.cpuUsage
        );
        await this.metrics.recordGauge(
          `worker.memory_usage.${worker.id}`,
          health.memoryUsage
        );
        await this.metrics.recordGauge(
          `worker.active_jobs.${worker.id}`,
          health.activeJobs
        );
        await this.metrics.recordGauge(
          `worker.uptime.${worker.id}`,
          health.uptime
        );

        // Check for worker health issues
        await this.checkWorkerHealthAlerts(worker, health);
      }

      // Overall worker metrics
      const totalWorkers = workers.length;
      const healthyWorkers = workers.filter(
        (w) => w.status === "healthy"
      ).length;
      const workerUtilization =
        workers.reduce((sum, w) => sum + w.currentLoad, 0) / totalWorkers;

      await this.metrics.recordGauge("workers.total", totalWorkers);
      await this.metrics.recordGauge("workers.healthy", healthyWorkers);
      await this.metrics.recordGauge("workers.utilization", workerUtilization);
    } catch (error) {
      console.error("Error monitoring worker health:", error);
    }
  }

  private async monitorQueueLatency(): Promise<void> {
    const queues = ["critical", "high", "default", "low", "bulk"];

    for (const queueName of queues) {
      try {
        const latencyStats = await this.calculateQueueLatency(queueName);

        // Record latency metrics
        await this.metrics.recordGauge(
          `queue.latency.p50.${queueName}`,
          latencyStats.p50
        );
        await this.metrics.recordGauge(
          `queue.latency.p95.${queueName}`,
          latencyStats.p95
        );
        await this.metrics.recordGauge(
          `queue.latency.p99.${queueName}`,
          latencyStats.p99
        );
        await this.metrics.recordGauge(
          `queue.latency.max.${queueName}`,
          latencyStats.max
        );

        // Check for latency alerts
        await this.checkLatencyAlerts(queueName, latencyStats);
      } catch (error) {
        console.error(`Error monitoring latency for ${queueName}:`, error);
      }
    }
  }

  private async checkQueueDepthAlerts(
    queueName: string,
    depth: number,
    activeJobs: number
  ): Promise<void> {
    const thresholds = this.getQueueThresholds(queueName);

    // Critical queue depth alert
    if (depth > thresholds.critical_depth) {
      await this.alertManager.sendAlert({
        severity: "critical",
        title: `Queue Depth Critical: ${queueName}`,
        description: `Queue ${queueName} has ${depth} pending jobs (threshold: ${thresholds.critical_depth})`,
        queue: queueName,
        metric: "depth",
        value: depth,
        threshold: thresholds.critical_depth,
      });
    }
    // Warning queue depth alert
    else if (depth > thresholds.warning_depth) {
      await this.alertManager.sendAlert({
        severity: "warning",
        title: `Queue Depth Warning: ${queueName}`,
        description: `Queue ${queueName} has ${depth} pending jobs (threshold: ${thresholds.warning_depth})`,
        queue: queueName,
        metric: "depth",
        value: depth,
        threshold: thresholds.warning_depth,
      });
    }

    // Stalled jobs alert
    const stalledJobs = await this.getStalledJobCount(queueName);
    if (stalledJobs > 0) {
      await this.alertManager.sendAlert({
        severity: "warning",
        title: `Stalled Jobs Detected: ${queueName}`,
        description: `Queue ${queueName} has ${stalledJobs} stalled jobs`,
        queue: queueName,
        metric: "stalled_jobs",
        value: stalledJobs,
      });
    }
  }

  private async checkProcessingRateAlerts(
    queueName: string,
    processingRate: number,
    errorRate: number,
    avgProcessingTime: number
  ): Promise<void> {
    const thresholds = this.getQueueThresholds(queueName);

    // Low processing rate alert
    if (processingRate < thresholds.min_processing_rate) {
      await this.alertManager.sendAlert({
        severity: "warning",
        title: `Low Processing Rate: ${queueName}`,
        description: `Queue ${queueName} processing rate is ${processingRate.toFixed(
          2
        )} jobs/sec (min: ${thresholds.min_processing_rate})`,
        queue: queueName,
        metric: "processing_rate",
        value: processingRate,
        threshold: thresholds.min_processing_rate,
      });
    }

    // High error rate alert
    if (errorRate > thresholds.max_error_rate) {
      await this.alertManager.sendAlert({
        severity: "critical",
        title: `High Error Rate: ${queueName}`,
        description: `Queue ${queueName} error rate is ${(
          errorRate * 100
        ).toFixed(1)}% (max: ${(thresholds.max_error_rate * 100).toFixed(1)}%)`,
        queue: queueName,
        metric: "error_rate",
        value: errorRate,
        threshold: thresholds.max_error_rate,
      });
    }

    // High processing time alert
    if (avgProcessingTime > thresholds.max_processing_time) {
      await this.alertManager.sendAlert({
        severity: "warning",
        title: `High Processing Time: ${queueName}`,
        description: `Queue ${queueName} average processing time is ${avgProcessingTime.toFixed(
          0
        )}ms (max: ${thresholds.max_processing_time}ms)`,
        queue: queueName,
        metric: "processing_time",
        value: avgProcessingTime,
        threshold: thresholds.max_processing_time,
      });
    }
  }

  // Queue management operations
  async pauseQueue(queueName: string, reason: string): Promise<void> {
    try {
      await this.redis.hset(
        "queue:status",
        queueName,
        JSON.stringify({
          status: "paused",
          reason,
          paused_at: Date.now(),
          paused_by: "monitoring_system",
        })
      );

      console.log(`Queue ${queueName} paused: ${reason}`);

      await this.alertManager.sendAlert({
        severity: "info",
        title: `Queue Paused: ${queueName}`,
        description: `Queue ${queueName} has been paused: ${reason}`,
        queue: queueName,
      });
    } catch (error) {
      console.error(`Error pausing queue ${queueName}:`, error);
    }
  }

  async resumeQueue(queueName: string): Promise<void> {
    try {
      await this.redis.hset(
        "queue:status",
        queueName,
        JSON.stringify({
          status: "active",
          resumed_at: Date.now(),
          resumed_by: "monitoring_system",
        })
      );

      console.log(`Queue ${queueName} resumed`);

      await this.alertManager.sendAlert({
        severity: "info",
        title: `Queue Resumed: ${queueName}`,
        description: `Queue ${queueName} has been resumed`,
        queue: queueName,
      });
    } catch (error) {
      console.error(`Error resuming queue ${queueName}:`, error);
    }
  }

  async drainQueue(
    queueName: string,
    timeoutMs: number = 300000
  ): Promise<void> {
    console.log(
      `Starting queue drain for ${queueName} (timeout: ${timeoutMs}ms)`
    );

    try {
      // Pause new job acceptance
      await this.pauseQueue(queueName, "Draining queue");

      const startTime = Date.now();

      // Wait for active jobs to complete
      while (Date.now() - startTime < timeoutMs) {
        const activeJobs = await this.getActiveJobCount(queueName);

        if (activeJobs === 0) {
          console.log(`Queue ${queueName} drained successfully`);
          return;
        }

        console.log(
          `Queue ${queueName} still has ${activeJobs} active jobs, waiting...`
        );
        await new Promise((resolve) => setTimeout(resolve, 5000));
      }

      // Timeout reached - force drain
      const remainingJobs = await this.getActiveJobCount(queueName);
      if (remainingJobs > 0) {
        console.warn(
          `Queue drain timeout reached, ${remainingJobs} jobs may be terminated`
        );

        await this.alertManager.sendAlert({
          severity: "warning",
          title: `Queue Drain Timeout: ${queueName}`,
          description: `Queue drain timeout reached with ${remainingJobs} jobs remaining`,
          queue: queueName,
        });
      }
    } catch (error) {
      console.error(`Error draining queue ${queueName}:`, error);
      throw error;
    }
  }

  // Dashboard and reporting
  async generateQueueHealthReport(): Promise<QueueHealthReport> {
    const queues = ["critical", "high", "default", "low", "bulk"];
    const queueReports: QueueReport[] = [];

    for (const queueName of queues) {
      try {
        const depth = await this.getQueueDepth(queueName);
        const activeJobs = await this.getActiveJobCount(queueName);
        const stats = await this.getProcessingStats(queueName, 3600000); // 1 hour
        const latencyStats = await this.calculateQueueLatency(queueName);

        const report: QueueReport = {
          queue: queueName,
          depth,
          activeJobs,
          completedLastHour: stats.completed,
          failedLastHour: stats.failed,
          errorRate: stats.failed / (stats.completed + stats.failed || 1),
          averageProcessingTime:
            stats.totalProcessingTime / (stats.completed || 1),
          latencyP95: latencyStats.p95,
          status: await this.getQueueStatus(queueName),
          healthScore: this.calculateQueueHealthScore(queueName, {
            depth,
            activeJobs,
            errorRate: stats.failed / (stats.completed + stats.failed || 1),
            latency: latencyStats.p95,
          }),
        };

        queueReports.push(report);
      } catch (error) {
        console.error(`Error generating report for queue ${queueName}:`, error);
      }
    }

    return {
      timestamp: Date.now(),
      totalQueues: queues.length,
      healthyQueues: queueReports.filter((r) => r.healthScore > 0.8).length,
      warningQueues: queueReports.filter(
        (r) => r.healthScore > 0.5 && r.healthScore <= 0.8
      ).length,
      criticalQueues: queueReports.filter((r) => r.healthScore <= 0.5).length,
      queues: queueReports,
      overallHealthScore:
        queueReports.reduce((sum, r) => sum + r.healthScore, 0) /
        queueReports.length,
    };
  }

  // Auto-scaling recommendations
  async generateScalingRecommendations(): Promise<ScalingRecommendation[]> {
    const recommendations: ScalingRecommendation[] = [];
    const queues = ["critical", "high", "default", "low", "bulk"];

    for (const queueName of queues) {
      try {
        const depth = await this.getQueueDepth(queueName);
        const activeJobs = await this.getActiveJobCount(queueName);
        const processingRate = await this.getProcessingRate(queueName, 300000); // 5 minutes
        const workers = await this.getQueueWorkers(queueName);

        // Calculate drain time with current capacity
        const drainTimeMinutes = depth / (processingRate * 60 || 1);

        if (drainTimeMinutes > 30) {
          // More than 30 minutes to drain
          const recommendedWorkers = Math.ceil(workers.length * 1.5);
          recommendations.push({
            queue: queueName,
            action: "scale_up",
            reason: `Queue will take ${drainTimeMinutes.toFixed(
              1
            )} minutes to drain`,
            currentWorkers: workers.length,
            recommendedWorkers,
            urgency: drainTimeMinutes > 60 ? "high" : "medium",
          });
        } else if (drainTimeMinutes < 5 && workers.length > 2) {
          // Less than 5 minutes and multiple workers
          const recommendedWorkers = Math.max(
            2,
            Math.floor(workers.length * 0.7)
          );
          recommendations.push({
            queue: queueName,
            action: "scale_down",
            reason: `Queue can drain in ${drainTimeMinutes.toFixed(
              1
            )} minutes with fewer workers`,
            currentWorkers: workers.length,
            recommendedWorkers,
            urgency: "low",
          });
        }
      } catch (error) {
        console.error(
          `Error generating scaling recommendation for ${queueName}:`,
          error
        );
      }
    }

    return recommendations;
  }

  // Helper methods
  private getQueueThresholds(queueName: string): QueueThresholds {
    const defaultThresholds: Record<string, QueueThresholds> = {
      critical: {
        critical_depth: 100,
        warning_depth: 50,
        min_processing_rate: 2.0,
        max_error_rate: 0.05,
        max_processing_time: 30000,
      },
      high: {
        critical_depth: 500,
        warning_depth: 200,
        min_processing_rate: 1.0,
        max_error_rate: 0.1,
        max_processing_time: 60000,
      },
      default: {
        critical_depth: 1000,
        warning_depth: 500,
        min_processing_rate: 0.5,
        max_error_rate: 0.15,
        max_processing_time: 120000,
      },
      low: {
        critical_depth: 5000,
        warning_depth: 2000,
        min_processing_rate: 0.2,
        max_error_rate: 0.2,
        max_processing_time: 300000,
      },
      bulk: {
        critical_depth: 10000,
        warning_depth: 5000,
        min_processing_rate: 0.1,
        max_error_rate: 0.25,
        max_processing_time: 600000,
      },
    };

    return defaultThresholds[queueName] || defaultThresholds.default;
  }

  private calculateQueueHealthScore(
    queueName: string,
    metrics: {
      depth: number;
      activeJobs: number;
      errorRate: number;
      latency: number;
    }
  ): number {
    const thresholds = this.getQueueThresholds(queueName);
    let score = 1.0;

    // Depth score
    if (metrics.depth > thresholds.critical_depth) {
      score *= 0.3;
    } else if (metrics.depth > thresholds.warning_depth) {
      score *= 0.7;
    }

    // Error rate score
    if (metrics.errorRate > thresholds.max_error_rate) {
      score *= 0.4;
    } else if (metrics.errorRate > thresholds.max_error_rate / 2) {
      score *= 0.8;
    }

    // Latency score
    if (metrics.latency > thresholds.max_processing_time) {
      score *= 0.5;
    } else if (metrics.latency > thresholds.max_processing_time / 2) {
      score *= 0.9;
    }

    return Math.max(0, Math.min(1, score));
  }

  // Placeholder methods for actual implementations
  private async getQueueDepth(queueName: string): Promise<number> {
    return 0;
  }
  private async getActiveJobCount(queueName: string): Promise<number> {
    return 0;
  }
  private async getDelayedJobCount(queueName: string): Promise<number> {
    return 0;
  }
  private async getStalledJobCount(queueName: string): Promise<number> {
    return 0;
  }
  private async getProcessingStats(
    queueName: string,
    periodMs: number
  ): Promise<any> {
    return {};
  }
  private async calculateQueueLatency(queueName: string): Promise<any> {
    return {};
  }
  private async getActiveWorkers(): Promise<any[]> {
    return [];
  }
  private async checkWorkerHealth(worker: any): Promise<any> {
    return {};
  }
  private async checkWorkerHealthAlerts(
    worker: any,
    health: any
  ): Promise<void> {}
  private async checkLatencyAlerts(
    queueName: string,
    latencyStats: any
  ): Promise<void> {}
  private async getQueueStatus(queueName: string): Promise<string> {
    return "active";
  }
  private async getProcessingRate(
    queueName: string,
    periodMs: number
  ): Promise<number> {
    return 0;
  }
  private async getQueueWorkers(queueName: string): Promise<any[]> {
    return [];
  }
  private updateDashboardMetrics(queueName: string, metrics: any): void {}
}

// Supporting classes
class MetricsCollector {
  async recordGauge(metric: string, value: number): Promise<void> {
    // Implement metrics recording (Prometheus, StatsD, etc.)
  }
}

class AlertManager {
  async sendAlert(alert: Alert): Promise<void> {
    console.log(
      `ALERT [${alert.severity}]: ${alert.title} - ${alert.description}`
    );
    // Implement alerting (PagerDuty, Slack, email, etc.)
  }
}

// Interfaces
interface DashboardMetrics {
  depth?: number;
  activeJobs?: number;
  delayedJobs?: number;
  processingRate?: number;
  errorRate?: number;
  avgProcessingTime?: number;
  timestamp: number;
}

interface QueueThresholds {
  critical_depth: number;
  warning_depth: number;
  min_processing_rate: number;
  max_error_rate: number;
  max_processing_time: number;
}

interface Alert {
  severity: "info" | "warning" | "critical";
  title: string;
  description: string;
  queue?: string;
  metric?: string;
  value?: number;
  threshold?: number;
}

interface QueueReport {
  queue: string;
  depth: number;
  activeJobs: number;
  completedLastHour: number;
  failedLastHour: number;
  errorRate: number;
  averageProcessingTime: number;
  latencyP95: number;
  status: string;
  healthScore: number;
}

interface QueueHealthReport {
  timestamp: number;
  totalQueues: number;
  healthyQueues: number;
  warningQueues: number;
  criticalQueues: number;
  queues: QueueReport[];
  overallHealthScore: number;
}

interface ScalingRecommendation {
  queue: string;
  action: "scale_up" | "scale_down" | "maintain";
  reason: string;
  currentWorkers: number;
  recommendedWorkers: number;
  urgency: "low" | "medium" | "high";
}

Resource Management: Intelligent Job Allocation

Dynamic Resource Allocation and Optimization

// Advanced resource management for background job processing
class IntelligentResourceManager {
  private resourcePools: Map<string, ResourcePool> = new Map();
  private resourceQuotas: Map<string, ResourceQuota> = new Map();
  private resourceHistory: ResourceHistoryTracker;
  private predictiveScaler: PredictiveScaler;
  private loadBalancer: IntelligentLoadBalancer;

  constructor(config: ResourceManagerConfig) {
    this.resourceHistory = new ResourceHistoryTracker();
    this.predictiveScaler = new PredictiveScaler(this.resourceHistory);
    this.loadBalancer = new IntelligentLoadBalancer();

    this.initializeResourcePools(config);
    this.setupResourceQuotas(config);
    this.startResourceMonitoring();
  }

  private initializeResourcePools(config: ResourceManagerConfig): void {
    // CPU-intensive task pool
    this.resourcePools.set(
      "cpu-intensive",
      new ResourcePool({
        name: "cpu-intensive",
        maxConcurrency: Math.max(1, Math.floor(os.cpus().length * 0.8)),
        resources: {
          cpu_weight: 1.0,
          memory_weight: 0.3,
          io_weight: 0.1,
        },
        jobTypes: ["image-processing", "data-analysis", "report-generation"],
        priority: "high",
      })
    );

    // Memory-intensive task pool
    this.resourcePools.set(
      "memory-intensive",
      new ResourcePool({
        name: "memory-intensive",
        maxConcurrency: Math.max(1, Math.floor(this.getTotalMemoryGB() / 4)),
        resources: {
          cpu_weight: 0.3,
          memory_weight: 1.0,
          io_weight: 0.2,
        },
        jobTypes: [
          "large-dataset-processing",
          "cache-warming",
          "bulk-operations",
        ],
        priority: "medium",
      })
    );

    // I/O-intensive task pool
    this.resourcePools.set(
      "io-intensive",
      new ResourcePool({
        name: "io-intensive",
        maxConcurrency: 20, // Higher concurrency for I/O bound tasks
        resources: {
          cpu_weight: 0.2,
          memory_weight: 0.3,
          io_weight: 1.0,
        },
        jobTypes: ["file-operations", "api-calls", "database-operations"],
        priority: "normal",
      })
    );

    // Background maintenance pool
    this.resourcePools.set(
      "maintenance",
      new ResourcePool({
        name: "maintenance",
        maxConcurrency: 2,
        resources: {
          cpu_weight: 0.5,
          memory_weight: 0.5,
          io_weight: 0.5,
        },
        jobTypes: ["cleanup", "backup", "maintenance"],
        priority: "low",
      })
    );

    console.log(`Initialized ${this.resourcePools.size} resource pools`);
  }

  private setupResourceQuotas(config: ResourceManagerConfig): void {
    // Set quotas to prevent any single job type from consuming all resources
    this.resourceQuotas.set("email-sending", {
      maxConcurrentJobs: 10,
      maxCpuPercent: 20,
      maxMemoryPercent: 15,
      dailyJobLimit: 100000,
      burstLimit: 50,
    });

    this.resourceQuotas.set("image-processing", {
      maxConcurrentJobs: 8,
      maxCpuPercent: 60,
      maxMemoryPercent: 40,
      dailyJobLimit: 10000,
      burstLimit: 15,
    });

    this.resourceQuotas.set("report-generation", {
      maxConcurrentJobs: 3,
      maxCpuPercent: 30,
      maxMemoryPercent: 50,
      dailyJobLimit: 500,
      burstLimit: 5,
    });

    this.resourceQuotas.set("data-processing", {
      maxConcurrentJobs: 5,
      maxCpuPercent: 40,
      maxMemoryPercent: 60,
      dailyJobLimit: 5000,
      burstLimit: 10,
    });
  }

  // Intelligent job allocation based on current system state
  async allocateResources(job: JobRequest): Promise<ResourceAllocation | null> {
    const jobType = job.type;
    const quota = this.resourceQuotas.get(jobType);

    // Check quota limits first
    if (quota && !(await this.checkQuotaLimits(jobType, quota))) {
      return null;
    }

    // Find the best resource pool for this job
    const suitablePool = this.findBestResourcePool(job);
    if (!suitablePool) {
      return null;
    }

    // Check if pool has available capacity
    if (!suitablePool.hasAvailableCapacity()) {
      // Try resource borrowing from other pools
      const borrowedResources = await this.tryResourceBorrowing(
        job,
        suitablePool
      );
      if (!borrowedResources) {
        return null;
      }
    }

    // Estimate resource requirements
    const resourceEstimate = await this.estimateResourceRequirements(job);

    // Create resource allocation
    const allocation: ResourceAllocation = {
      jobId: job.id,
      poolName: suitablePool.name,
      estimatedResources: resourceEstimate,
      allocatedAt: Date.now(),
      priority: job.priority || "normal",
      timeout: job.timeout || 300000,
      resourceLimits: {
        maxCpu: resourceEstimate.cpu * 1.2, // 20% buffer
        maxMemory: resourceEstimate.memory * 1.2,
        maxDuration: job.timeout || 300000,
      },
    };

    // Reserve resources in the pool
    await suitablePool.reserveResources(allocation);

    // Track allocation for monitoring
    await this.trackResourceAllocation(allocation);

    return allocation;
  }

  private findBestResourcePool(job: JobRequest): ResourcePool | null {
    let bestPool: ResourcePool | null = null;
    let bestScore = -1;

    for (const pool of this.resourcePools.values()) {
      // Check if pool can handle this job type
      if (!pool.canHandleJobType(job.type)) {
        continue;
      }

      // Calculate suitability score
      const score = this.calculatePoolSuitabilityScore(job, pool);

      if (score > bestScore) {
        bestScore = score;
        bestPool = pool;
      }
    }

    return bestPool;
  }

  private calculatePoolSuitabilityScore(
    job: JobRequest,
    pool: ResourcePool
  ): number {
    let score = 0;

    // Base score for job type compatibility
    if (pool.jobTypes.includes(job.type)) {
      score += 50;
    }

    // Available capacity score
    const capacityRatio = pool.getAvailableCapacity() / pool.maxConcurrency;
    score += capacityRatio * 30;

    // Resource efficiency score
    const resourceFit = this.calculateResourceFit(job, pool);
    score += resourceFit * 20;

    // Priority alignment score
    const priorityScore = this.calculatePriorityScore(
      job.priority || "normal",
      pool.priority
    );
    score += priorityScore;

    return score;
  }

  private calculateResourceFit(job: JobRequest, pool: ResourcePool): number {
    // This would use historical data to determine how well this job type fits this pool
    // For now, return a simple heuristic based on job type and pool characteristics

    const jobResourceProfile = this.getJobResourceProfile(job.type);
    const poolResourceProfile = pool.resources;

    // Calculate how well the job's resource needs match the pool's strengths
    let fit = 0;
    fit += jobResourceProfile.cpu * poolResourceProfile.cpu_weight;
    fit += jobResourceProfile.memory * poolResourceProfile.memory_weight;
    fit += jobResourceProfile.io * poolResourceProfile.io_weight;

    return Math.min(1, fit); // Normalize to 0-1
  }

  private getJobResourceProfile(jobType: string): ResourceProfile {
    const profiles: Record<string, ResourceProfile> = {
      "image-processing": { cpu: 0.9, memory: 0.6, io: 0.3 },
      "data-analysis": { cpu: 0.8, memory: 0.7, io: 0.4 },
      "report-generation": { cpu: 0.6, memory: 0.8, io: 0.5 },
      "email-sending": { cpu: 0.2, memory: 0.3, io: 0.8 },
      "file-operations": { cpu: 0.3, memory: 0.4, io: 0.9 },
      "api-calls": { cpu: 0.2, memory: 0.3, io: 0.7 },
      "database-operations": { cpu: 0.4, memory: 0.5, io: 0.8 },
      cleanup: { cpu: 0.3, memory: 0.4, io: 0.6 },
      backup: { cpu: 0.4, memory: 0.3, io: 0.9 },
    };

    return profiles[jobType] || { cpu: 0.5, memory: 0.5, io: 0.5 };
  }

  // Dynamic resource reallocation
  async reallocateResources(): Promise<void> {
    console.log("Starting resource reallocation...");

    const currentAllocations = await this.getCurrentAllocations();
    const systemMetrics = await this.getSystemMetrics();

    // Identify inefficient allocations
    const inefficientAllocations = this.findInefficientAllocations(
      currentAllocations,
      systemMetrics
    );

    for (const allocation of inefficientAllocations) {
      await this.rebalanceAllocation(allocation, systemMetrics);
    }

    // Check for resource borrowing opportunities
    await this.optimizeResourceBorrowing();

    console.log(
      `Reallocation complete. Processed ${inefficientAllocations.length} inefficient allocations`
    );
  }

  private findInefficientAllocations(
    allocations: ResourceAllocation[],
    metrics: SystemMetrics
  ): ResourceAllocation[] {
    const inefficient: ResourceAllocation[] = [];

    for (const allocation of allocations) {
      const actualUsage = metrics.jobMetrics.get(allocation.jobId);
      if (!actualUsage) continue;

      // Check for significant over-allocation
      const cpuWaste =
        (allocation.estimatedResources.cpu - actualUsage.cpu) /
        allocation.estimatedResources.cpu;
      const memoryWaste =
        (allocation.estimatedResources.memory - actualUsage.memory) /
        allocation.estimatedResources.memory;

      if (cpuWaste > 0.5 || memoryWaste > 0.5) {
        inefficient.push(allocation);
      }
    }

    return inefficient;
  }

  // Predictive resource scaling
  async predictiveScale(): Promise<void> {
    const predictions = await this.predictiveScaler.predictResourceNeeds(
      3600000 // Predict for next hour
    );

    for (const [poolName, prediction] of predictions.entries()) {
      const pool = this.resourcePools.get(poolName);
      if (!pool) continue;

      const currentCapacity = pool.maxConcurrency;
      const recommendedCapacity = prediction.recommendedCapacity;

      if (Math.abs(recommendedCapacity - currentCapacity) > 2) {
        console.log(
          `Predictive scaling recommendation for ${poolName}: ` +
            `${currentCapacity} -> ${recommendedCapacity} ` +
            `(confidence: ${(prediction.confidence * 100).toFixed(1)}%)`
        );

        if (prediction.confidence > 0.7) {
          // High confidence threshold
          await this.scalePool(poolName, recommendedCapacity);
        }
      }
    }
  }

  // Resource borrowing between pools
  private async tryResourceBorrowing(
    job: JobRequest,
    targetPool: ResourcePool
  ): Promise<BorrowedResources | null> {
    // Find pools with available capacity
    const availablePools = Array.from(this.resourcePools.values())
      .filter(
        (pool) =>
          pool.name !== targetPool.name &&
          pool.hasAvailableCapacity() &&
          pool.canLendResources()
      )
      .sort(
        (a, b) =>
          this.calculateBorrowingScore(b, job) -
          this.calculateBorrowingScore(a, job)
      );

    for (const lenderPool of availablePools) {
      const borrowAmount = Math.min(
        lenderPool.getAvailableCapacity(),
        targetPool.getResourceDeficit()
      );

      if (borrowAmount > 0) {
        const borrowedResources: BorrowedResources = {
          lenderPool: lenderPool.name,
          borrowerPool: targetPool.name,
          amount: borrowAmount,
          borrowedAt: Date.now(),
          jobId: job.id,
          expectedReturnAt: Date.now() + (job.timeout || 300000),
        };

        await lenderPool.lendResources(borrowAmount, borrowedResources);
        await targetPool.borrowResources(borrowAmount, borrowedResources);

        console.log(
          `Resource borrowing: ${lenderPool.name} lent ${borrowAmount} slots to ${targetPool.name}`
        );

        return borrowedResources;
      }
    }

    return null;
  }

  private calculateBorrowingScore(pool: ResourcePool, job: JobRequest): number {
    // Score pools based on how suitable they are as lenders
    let score = 0;

    // Prefer pools with more available capacity
    score += pool.getAvailableCapacity() * 10;

    // Prefer pools with lower priority (less critical work)
    const priorityScores = { low: 3, normal: 2, medium: 1, high: 0 };
    score += priorityScores[pool.priority] || 0;

    // Prefer pools with different resource characteristics (complementary lending)
    const resourceDistance = this.calculateResourceDistance(
      this.getJobResourceProfile(job.type),
      pool.resources
    );
    score += resourceDistance * 5; // Higher distance is better for borrowing

    return score;
  }

  private calculateResourceDistance(
    profile1: ResourceProfile,
    profile2: any
  ): number {
    const cpuDist = Math.abs(profile1.cpu - profile2.cpu_weight);
    const memoryDist = Math.abs(profile1.memory - profile2.memory_weight);
    const ioDist = Math.abs(profile1.io - profile2.io_weight);

    return Math.sqrt(
      cpuDist * cpuDist + memoryDist * memoryDist + ioDist * ioDist
    );
  }

  // Resource cleanup and optimization
  async optimizeResourceUsage(): Promise<void> {
    // Identify and clean up completed job allocations
    await this.cleanupCompletedAllocations();

    // Return borrowed resources that are no longer needed
    await this.returnUnneededBorrowedResources();

    // Defragment resource allocations
    await this.defragmentResourceAllocations();

    // Update resource pool configurations based on usage patterns
    await this.optimizePoolConfigurations();
  }

  private async cleanupCompletedAllocations(): Promise<void> {
    const cutoffTime = Date.now() - 3600000; // 1 hour ago

    for (const pool of this.resourcePools.values()) {
      const cleaned = await pool.cleanupOldAllocations(cutoffTime);
      if (cleaned > 0) {
        console.log(
          `Cleaned up ${cleaned} old allocations from pool ${pool.name}`
        );
      }
    }
  }

  private async optimizePoolConfigurations(): Promise<void> {
    const usageHistory = await this.resourceHistory.getUsagePatterns(
      Date.now() - 7 * 24 * 60 * 60 * 1000 // Last 7 days
    );

    for (const [poolName, pool] of this.resourcePools.entries()) {
      const poolUsage = usageHistory.get(poolName);
      if (!poolUsage) continue;

      // Adjust pool size based on utilization patterns
      const avgUtilization = poolUsage.averageUtilization;
      const peakUtilization = poolUsage.peakUtilization;

      let newCapacity = pool.maxConcurrency;

      if (peakUtilization > 0.9 && avgUtilization > 0.7) {
        // High utilization - consider increasing capacity
        newCapacity = Math.ceil(pool.maxConcurrency * 1.2);
      } else if (peakUtilization < 0.5 && avgUtilization < 0.3) {
        // Low utilization - consider decreasing capacity
        newCapacity = Math.max(1, Math.floor(pool.maxConcurrency * 0.8));
      }

      if (newCapacity !== pool.maxConcurrency) {
        console.log(
          `Optimizing pool ${poolName}: ${pool.maxConcurrency} -> ${newCapacity} ` +
            `(avg: ${(avgUtilization * 100).toFixed(1)}%, peak: ${(
              peakUtilization * 100
            ).toFixed(1)}%)`
        );

        await pool.adjustCapacity(newCapacity);
      }
    }
  }

  // Monitoring and metrics
  private startResourceMonitoring(): void {
    // Monitor resource usage every 30 seconds
    setInterval(async () => {
      await this.collectResourceMetrics();
    }, 30000);

    // Perform reallocation every 5 minutes
    setInterval(async () => {
      await this.reallocateResources();
    }, 300000);

    // Run predictive scaling every 15 minutes
    setInterval(async () => {
      await this.predictiveScale();
    }, 900000);

    // Optimize resource usage every hour
    setInterval(async () => {
      await this.optimizeResourceUsage();
    }, 3600000);
  }

  private async collectResourceMetrics(): Promise<void> {
    const metrics = await this.getSystemMetrics();

    // Record pool utilization metrics
    for (const [poolName, pool] of this.resourcePools.entries()) {
      const utilization = pool.getCurrentUtilization();
      await this.recordMetric(
        `resource.pool.utilization.${poolName}`,
        utilization
      );
      await this.recordMetric(
        `resource.pool.capacity.${poolName}`,
        pool.maxConcurrency
      );
      await this.recordMetric(
        `resource.pool.active.${poolName}`,
        pool.getActiveJobs()
      );
    }

    // Record system-wide metrics
    await this.recordMetric("resource.system.cpu", metrics.cpu);
    await this.recordMetric("resource.system.memory", metrics.memory);
    await this.recordMetric("resource.system.io", metrics.io);
  }

  // Helper methods
  private getTotalMemoryGB(): number {
    return Math.round(os.totalmem() / 1024 / 1024 / 1024);
  }

  private calculatePriorityScore(
    jobPriority: string,
    poolPriority: string
  ): number {
    // Higher score for better priority alignment
    const priorities = ["low", "normal", "medium", "high"];
    const jobIndex = priorities.indexOf(jobPriority);
    const poolIndex = priorities.indexOf(poolPriority);

    return Math.max(0, 5 - Math.abs(jobIndex - poolIndex));
  }

  private async checkQuotaLimits(
    jobType: string,
    quota: ResourceQuota
  ): Promise<boolean> {
    // Implement quota checking logic
    return true; // Placeholder
  }

  private async estimateResourceRequirements(
    job: JobRequest
  ): Promise<ResourceEstimate> {
    // Use machine learning or historical data to estimate resource needs
    return {
      cpu: 1.0,
      memory: 256,
      duration: job.timeout || 300000,
      io: 0.5,
    };
  }

  private async getCurrentAllocations(): Promise<ResourceAllocation[]> {
    // Get current resource allocations across all pools
    return [];
  }

  private async getSystemMetrics(): Promise<SystemMetrics> {
    return {
      cpu: os.loadavg()[0] / os.cpus().length,
      memory: (os.totalmem() - os.freemem()) / os.totalmem(),
      io: 0.5, // Placeholder
      jobMetrics: new Map(),
    };
  }

  private async recordMetric(name: string, value: number): Promise<void> {
    // Record metric to monitoring system
  }

  private async scalePool(
    poolName: string,
    newCapacity: number
  ): Promise<void> {
    const pool = this.resourcePools.get(poolName);
    if (pool) {
      await pool.adjustCapacity(newCapacity);
    }
  }

  private async trackResourceAllocation(
    allocation: ResourceAllocation
  ): Promise<void> {
    await this.resourceHistory.recordAllocation(allocation);
  }

  private async rebalanceAllocation(
    allocation: ResourceAllocation,
    metrics: SystemMetrics
  ): Promise<void> {
    // Rebalance specific allocation based on current metrics
  }

  private async optimizeResourceBorrowing(): Promise<void> {
    // Check if borrowed resources can be returned or reallocated more efficiently
  }

  private async returnUnneededBorrowedResources(): Promise<void> {
    // Return borrowed resources that are no longer needed
  }

  private async defragmentResourceAllocations(): Promise<void> {
    // Consolidate resource allocations to reduce fragmentation
  }
}

// Supporting classes
class ResourcePool {
  constructor(public config: ResourcePoolConfig) {}

  get name(): string {
    return this.config.name;
  }
  get maxConcurrency(): number {
    return this.config.maxConcurrency;
  }
  get jobTypes(): string[] {
    return this.config.jobTypes;
  }
  get priority(): string {
    return this.config.priority;
  }
  get resources(): any {
    return this.config.resources;
  }

  hasAvailableCapacity(): boolean {
    return true;
  } // Placeholder
  getAvailableCapacity(): number {
    return 5;
  } // Placeholder
  canHandleJobType(jobType: string): boolean {
    return this.jobTypes.includes(jobType);
  }
  canLendResources(): boolean {
    return true;
  } // Placeholder
  getResourceDeficit(): number {
    return 2;
  } // Placeholder
  getCurrentUtilization(): number {
    return 0.5;
  } // Placeholder
  getActiveJobs(): number {
    return 3;
  } // Placeholder

  async reserveResources(allocation: ResourceAllocation): Promise<void> {}
  async lendResources(
    amount: number,
    borrowing: BorrowedResources
  ): Promise<void> {}
  async borrowResources(
    amount: number,
    borrowing: BorrowedResources
  ): Promise<void> {}
  async cleanupOldAllocations(cutoffTime: number): Promise<number> {
    return 0;
  }
  async adjustCapacity(newCapacity: number): Promise<void> {}
}

class ResourceHistoryTracker {
  async recordAllocation(allocation: ResourceAllocation): Promise<void> {}
  async getUsagePatterns(
    sinceTimestamp: number
  ): Promise<Map<string, UsagePattern>> {
    return new Map();
  }
}

class PredictiveScaler {
  constructor(private history: ResourceHistoryTracker) {}

  async predictResourceNeeds(
    timeHorizonMs: number
  ): Promise<Map<string, ResourcePrediction>> {
    return new Map();
  }
}

class IntelligentLoadBalancer {
  // Implement intelligent load balancing across resource pools
}

// Interfaces
interface ResourceManagerConfig {
  pools: ResourcePoolConfig[];
  quotas: Record<string, ResourceQuota>;
}

interface ResourcePoolConfig {
  name: string;
  maxConcurrency: number;
  resources: {
    cpu_weight: number;
    memory_weight: number;
    io_weight: number;
  };
  jobTypes: string[];
  priority: string;
}

interface ResourceQuota {
  maxConcurrentJobs: number;
  maxCpuPercent: number;
  maxMemoryPercent: number;
  dailyJobLimit: number;
  burstLimit: number;
}

interface JobRequest {
  id: string;
  type: string;
  data: any;
  priority?: string;
  timeout?: number;
}

interface ResourceAllocation {
  jobId: string;
  poolName: string;
  estimatedResources: ResourceEstimate;
  allocatedAt: number;
  priority: string;
  timeout: number;
  resourceLimits: {
    maxCpu: number;
    maxMemory: number;
    maxDuration: number;
  };
}

interface ResourceEstimate {
  cpu: number;
  memory: number;
  duration: number;
  io: number;
}

interface ResourceProfile {
  cpu: number;
  memory: number;
  io: number;
}

interface BorrowedResources {
  lenderPool: string;
  borrowerPool: string;
  amount: number;
  borrowedAt: number;
  jobId: string;
  expectedReturnAt: number;
}

interface SystemMetrics {
  cpu: number;
  memory: number;
  io: number;
  jobMetrics: Map<string, any>;
}

interface UsagePattern {
  averageUtilization: number;
  peakUtilization: number;
  timeOfPeakUtilization: number[];
}

interface ResourcePrediction {
  recommendedCapacity: number;
  confidence: number;
  reasoning: string;
}

Graceful Shutdowns and Job Cleanup: Reliability Under Pressure

Handling System Shutdowns Without Data Loss

// Comprehensive graceful shutdown and cleanup system
class GracefulShutdownManager {
  private shutdownInitiated: boolean = false;
  private shutdownStartTime: number = 0;
  private activeJobs: Map<string, ActiveJob> = new Map();
  private shutdownPromise: Promise<void> | null = null;
  private cleanupTasks: CleanupTask[] = [];
  private redis: Redis;
  private jobQueue: any; // Your job queue instance
  private workerPools: Map<string, any> = new Map(); // Your worker pools

  constructor(private config: ShutdownConfig) {
    this.redis = new Redis(process.env.REDIS_URL);
    this.setupShutdownHandlers();
    this.setupPeriodicCleanup();
  }

  private setupShutdownHandlers(): void {
    // Handle graceful shutdown signals
    process.on("SIGTERM", () => {
      console.log("Received SIGTERM signal. Starting graceful shutdown...");
      this.initiateShutdown("SIGTERM");
    });

    process.on("SIGINT", () => {
      console.log("Received SIGINT signal. Starting graceful shutdown...");
      this.initiateShutdown("SIGINT");
    });

    // Handle uncaught exceptions
    process.on("uncaughtException", (error) => {
      console.error("Uncaught exception:", error);
      this.initiateEmergencyShutdown("UNCAUGHT_EXCEPTION");
    });

    // Handle unhandled promise rejections
    process.on("unhandledRejection", (reason, promise) => {
      console.error("Unhandled promise rejection:", reason);
      this.initiateEmergencyShutdown("UNHANDLED_REJECTION");
    });
  }

  public async initiateShutdown(signal: string): Promise<void> {
    if (this.shutdownInitiated) {
      console.log("Shutdown already in progress...");
      return this.shutdownPromise || Promise.resolve();
    }

    this.shutdownInitiated = true;
    this.shutdownStartTime = Date.now();

    console.log(`Starting graceful shutdown (signal: ${signal})...`);

    this.shutdownPromise = this.performGracefulShutdown();

    return this.shutdownPromise;
  }

  private async performGracefulShutdown(): Promise<void> {
    try {
      // Step 1: Stop accepting new jobs
      console.log("Step 1: Stopping new job acceptance...");
      await this.stopAcceptingNewJobs();

      // Step 2: Wait for current jobs to complete (with timeout)
      console.log("Step 2: Waiting for current jobs to complete...");
      await this.waitForJobCompletion();

      // Step 3: Save job state and cleanup
      console.log("Step 3: Saving job state and performing cleanup...");
      await this.saveJobState();
      await this.performCleanup();

      // Step 4: Close external connections
      console.log("Step 4: Closing external connections...");
      await this.closeExternalConnections();

      // Step 5: Final cleanup
      console.log("Step 5: Final cleanup...");
      await this.finalCleanup();

      console.log("Graceful shutdown completed successfully");
      process.exit(0);
    } catch (error) {
      console.error("Error during graceful shutdown:", error);
      await this.emergencyCleanup();
      process.exit(1);
    }
  }

  private async stopAcceptingNewJobs(): Promise<void> {
    // Mark this node as draining in the cluster
    await this.redis.hset(
      "cluster:nodes",
      this.getNodeId(),
      JSON.stringify({
        status: "draining",
        draining_started_at: Date.now(),
        reason: "graceful_shutdown",
      })
    );

    // Stop job queue processing
    for (const [poolName, pool] of this.workerPools) {
      await pool.pause();
      console.log(`Paused worker pool: ${poolName}`);
    }

    // Unsubscribe from new job notifications
    await this.redis.unsubscribe();

    console.log("Stopped accepting new jobs");
  }

  private async waitForJobCompletion(): Promise<void> {
    const maxWaitTime = this.config.maxWaitTimeMs || 300000; // 5 minutes default
    const checkInterval = 5000; // Check every 5 seconds
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitTime) {
      // Get current active job count
      const activeJobCount = await this.getActiveJobCount();

      if (activeJobCount === 0) {
        console.log("All jobs completed successfully");
        return;
      }

      console.log(
        `Waiting for ${activeJobCount} jobs to complete... (${Math.floor(
          (Date.now() - startTime) / 1000
        )}s elapsed)`
      );

      // Show progress of long-running jobs
      await this.reportJobProgress();

      // Wait before next check
      await new Promise((resolve) => setTimeout(resolve, checkInterval));
    }

    // Timeout reached - handle remaining jobs
    const remainingJobs = await this.getActiveJobCount();
    if (remainingJobs > 0) {
      console.warn(`Timeout reached with ${remainingJobs} jobs still running`);
      await this.handleRemainingJobs();
    }
  }

  private async handleRemainingJobs(): Promise<void> {
    const activeJobs = Array.from(this.activeJobs.values());

    for (const job of activeJobs) {
      try {
        if (job.canBeSafelyInterrupted) {
          // Jobs that can be safely interrupted
          await this.interruptJob(job);
          console.log(`Interrupted job ${job.id} (safe interruption)`);
        } else if (job.canBeMigrated) {
          // Jobs that can be migrated to another node
          await this.migrateJob(job);
          console.log(`Migrated job ${job.id} to another node`);
        } else {
          // Critical jobs - try to finish or save state
          await this.saveJobStateAndRequeue(job);
          console.log(`Saved state for job ${job.id} and requeued`);
        }
      } catch (error) {
        console.error(`Error handling remaining job ${job.id}:`, error);
      }
    }
  }

  private async saveJobState(): Promise<void> {
    const stateToSave: JobState[] = [];

    for (const job of this.activeJobs.values()) {
      const state: JobState = {
        jobId: job.id,
        type: job.type,
        data: job.data,
        progress: job.progress,
        startedAt: job.startedAt,
        checkpoints: job.checkpoints,
        status: "interrupted",
        interruptedAt: Date.now(),
        nodeId: this.getNodeId(),
      };

      stateToSave.push(state);
    }

    if (stateToSave.length > 0) {
      // Save to persistent storage
      await this.redis.set(
        `job_state:${this.getNodeId()}:${Date.now()}`,
        JSON.stringify(stateToSave),
        "EX",
        86400 // 24 hour expiry
      );

      console.log(`Saved state for ${stateToSave.length} jobs`);
    }
  }

  private async performCleanup(): Promise<void> {
    // Execute all registered cleanup tasks
    for (const task of this.cleanupTasks) {
      try {
        console.log(`Executing cleanup task: ${task.name}`);
        await task.execute();
        console.log(`Cleanup task completed: ${task.name}`);
      } catch (error) {
        console.error(`Cleanup task failed: ${task.name}:`, error);
      }
    }

    // Clean up temporary files
    await this.cleanupTempFiles();

    // Clean up job artifacts
    await this.cleanupJobArtifacts();

    // Update job metrics
    await this.updateShutdownMetrics();
  }

  private async cleanupTempFiles(): Promise<void> {
    const tempDir = process.env.TEMP_DIR || "/tmp";
    const nodePrefix = this.getNodeId();

    try {
      const files = await fs.readdir(tempDir);
      const tempFiles = files.filter((file) =>
        file.startsWith(`job_temp_${nodePrefix}`)
      );

      for (const file of tempFiles) {
        try {
          await fs.unlink(path.join(tempDir, file));
          console.log(`Cleaned up temp file: ${file}`);
        } catch (error) {
          console.warn(`Failed to clean up temp file ${file}:`, error);
        }
      }
    } catch (error) {
      console.warn("Error cleaning up temp files:", error);
    }
  }

  private async cleanupJobArtifacts(): Promise<void> {
    // Clean up job-specific artifacts like cached data, intermediate files, etc.
    const activeJobIds = Array.from(this.activeJobs.keys());

    for (const jobId of activeJobIds) {
      try {
        // Clean up job-specific cache entries
        await this.redis.del(`job_cache:${jobId}`);

        // Clean up job-specific temp data
        await this.redis.del(`job_temp:${jobId}`);

        // Clean up job progress tracking
        await this.redis.del(`job_progress:${jobId}`);
      } catch (error) {
        console.warn(`Error cleaning up artifacts for job ${jobId}:`, error);
      }
    }
  }

  private async closeExternalConnections(): Promise<void> {
    const closePromises: Promise<void>[] = [];

    // Close database connections
    closePromises.push(this.closeDatabaseConnections());

    // Close Redis connections
    closePromises.push(this.closeRedisConnections());

    // Close external API connections
    closePromises.push(this.closeExternalApiConnections());

    // Close monitoring connections
    closePromises.push(this.closeMonitoringConnections());

    try {
      await Promise.all(closePromises);
      console.log("All external connections closed");
    } catch (error) {
      console.warn("Some connections failed to close cleanly:", error);
    }
  }

  // Emergency shutdown for critical failures
  private async initiateEmergencyShutdown(reason: string): Promise<void> {
    if (this.shutdownInitiated) {
      return;
    }

    this.shutdownInitiated = true;
    console.error(`EMERGENCY SHUTDOWN: ${reason}`);

    try {
      // Quick save of critical state
      await this.emergencyStateSave();

      // Notify monitoring systems
      await this.sendEmergencyAlert(reason);

      // Quick cleanup
      await this.emergencyCleanup();
    } catch (error) {
      console.error("Error during emergency shutdown:", error);
    }

    process.exit(2); // Exit with error code 2 for emergency shutdown
  }

  private async emergencyStateSave(): Promise<void> {
    try {
      const criticalJobs = Array.from(this.activeJobs.values())
        .filter((job) => job.isCritical)
        .map((job) => ({
          jobId: job.id,
          type: job.type,
          data: job.data,
          progress: job.progress,
          emergencyShutdown: true,
          timestamp: Date.now(),
        }));

      if (criticalJobs.length > 0) {
        await this.redis.set(
          `emergency_job_state:${this.getNodeId()}:${Date.now()}`,
          JSON.stringify(criticalJobs),
          "EX",
          86400
        );

        console.log(
          `Emergency state saved for ${criticalJobs.length} critical jobs`
        );
      }
    } catch (error) {
      console.error("Failed to save emergency state:", error);
    }
  }

  // Periodic cleanup operations
  private setupPeriodicCleanup(): void {
    // Clean up completed jobs every hour
    setInterval(async () => {
      await this.cleanupCompletedJobs();
    }, 3600000); // 1 hour

    // Clean up orphaned job data every 6 hours
    setInterval(async () => {
      await this.cleanupOrphanedData();
    }, 21600000); // 6 hours

    // Clean up old job states daily
    setInterval(async () => {
      await this.cleanupOldJobStates();
    }, 86400000); // 24 hours
  }

  private async cleanupCompletedJobs(): Promise<void> {
    try {
      const completedBefore =
        Date.now() - (this.config.completedJobRetentionMs || 3600000);

      // Clean up completed job data
      const cleanedCount = await this.redis.eval(
        `
        local keys = redis.call('KEYS', 'job_completed:*')
        local cleaned = 0
        for i=1, #keys do
          local job = redis.call('GET', keys[i])
          if job then
            local data = cjson.decode(job)
            if data.completedAt and data.completedAt < tonumber(ARGV[1]) then
              redis.call('DEL', keys[i])
              cleaned = cleaned + 1
            end
          end
        end
        return cleaned
      `,
        0,
        completedBefore.toString()
      );

      if (cleanedCount > 0) {
        console.log(`Cleaned up ${cleanedCount} completed jobs`);
      }
    } catch (error) {
      console.error("Error cleaning up completed jobs:", error);
    }
  }

  private async cleanupOrphanedData(): Promise<void> {
    try {
      // Find and clean up orphaned cache entries
      const cacheKeys = await this.redis.keys("job_cache:*");
      let orphanedCount = 0;

      for (const key of cacheKeys) {
        const jobId = key.split(":")[1];

        // Check if job still exists
        const jobExists = await this.redis.exists(`job:${jobId}`);

        if (!jobExists) {
          await this.redis.del(key);
          orphanedCount++;
        }
      }

      if (orphanedCount > 0) {
        console.log(`Cleaned up ${orphanedCount} orphaned cache entries`);
      }
    } catch (error) {
      console.error("Error cleaning up orphaned data:", error);
    }
  }

  // Registration methods for cleanup tasks
  public registerCleanupTask(task: CleanupTask): void {
    this.cleanupTasks.push(task);
  }

  public registerJob(job: ActiveJob): void {
    this.activeJobs.set(job.id, job);
  }

  public unregisterJob(jobId: string): void {
    this.activeJobs.delete(jobId);
  }

  // Health check for shutdown readiness
  public async checkShutdownReadiness(): Promise<ShutdownReadiness> {
    const activeJobCount = this.activeJobs.size;
    const criticalJobCount = Array.from(this.activeJobs.values()).filter(
      (job) => job.isCritical
    ).length;

    const longestRunningJob = Math.max(
      ...Array.from(this.activeJobs.values()).map(
        (job) => Date.now() - job.startedAt
      ),
      0
    );

    const canShutdownSafely =
      criticalJobCount === 0 && longestRunningJob < 300000; // 5 minutes

    return {
      canShutdownSafely,
      activeJobCount,
      criticalJobCount,
      longestRunningJobDuration: longestRunningJob,
      estimatedShutdownTime: this.estimateShutdownTime(),
    };
  }

  // Helper methods
  private getNodeId(): string {
    return process.env.NODE_ID || `node-${Date.now()}`;
  }

  private async getActiveJobCount(): Promise<number> {
    return this.activeJobs.size;
  }

  private async reportJobProgress(): Promise<void> {
    const longRunningJobs = Array.from(this.activeJobs.values())
      .filter((job) => Date.now() - job.startedAt > 60000) // Running for more than 1 minute
      .sort((a, b) => b.startedAt - a.startedAt);

    if (longRunningJobs.length > 0) {
      console.log("Long-running jobs:");
      for (const job of longRunningJobs.slice(0, 5)) {
        // Show top 5
        const runtime = Math.floor((Date.now() - job.startedAt) / 1000);
        console.log(
          `  - ${job.id} (${job.type}): ${runtime}s, ${job.progress}% complete`
        );
      }
    }
  }

  private async interruptJob(job: ActiveJob): Promise<void> {
    // Send interrupt signal to job
    if (job.worker) {
      job.worker.terminate();
    }
  }

  private async migrateJob(job: ActiveJob): Promise<void> {
    // Migrate job to another available node
    const availableNodes = await this.getAvailableNodes();

    if (availableNodes.length > 0) {
      const targetNode = availableNodes[0];

      // Send job to target node
      await this.redis.lpush(
        `jobs:${targetNode}`,
        JSON.stringify({
          ...job.data,
          migrated: true,
          originalNode: this.getNodeId(),
          migratedAt: Date.now(),
        })
      );
    }
  }

  private async saveJobStateAndRequeue(job: ActiveJob): Promise<void> {
    // Save current job state and requeue for later processing
    const jobState = {
      ...job.data,
      progress: job.progress,
      checkpoints: job.checkpoints,
      interrupted: true,
      interruptedAt: Date.now(),
      originalNode: this.getNodeId(),
    };

    // Requeue with delay
    await this.jobQueue.add(job.type, jobState, {
      delay: 60000, // 1 minute delay
      attempts: 1,
    });
  }

  private async getAvailableNodes(): Promise<string[]> {
    const nodes = await this.redis.hgetall("cluster:nodes");
    return Object.keys(nodes).filter((nodeId) => {
      const nodeData = JSON.parse(nodes[nodeId]);
      return nodeData.status === "healthy";
    });
  }

  private estimateShutdownTime(): number {
    const activeJobs = Array.from(this.activeJobs.values());
    const avgJobDuration =
      activeJobs.reduce((sum, job) => sum + (Date.now() - job.startedAt), 0) /
      (activeJobs.length || 1);

    return Math.min(avgJobDuration, this.config.maxWaitTimeMs || 300000);
  }

  private async updateShutdownMetrics(): Promise<void> {
    const shutdownDuration = Date.now() - this.shutdownStartTime;

    await this.redis.lpush(
      "shutdown_metrics",
      JSON.stringify({
        nodeId: this.getNodeId(),
        shutdownStartTime: this.shutdownStartTime,
        shutdownDuration,
        jobsAtShutdown: this.activeJobs.size,
        timestamp: Date.now(),
      })
    );
  }

  private async sendEmergencyAlert(reason: string): Promise<void> {
    // Send alert to monitoring/alerting system
    console.error(
      `EMERGENCY ALERT: Node ${this.getNodeId()} emergency shutdown: ${reason}`
    );
  }

  private async closeDatabaseConnections(): Promise<void> {
    // Close database connections
  }

  private async closeRedisConnections(): Promise<void> {
    if (this.redis) {
      await this.redis.disconnect();
    }
  }

  private async closeExternalApiConnections(): Promise<void> {
    // Close external API connections
  }

  private async closeMonitoringConnections(): Promise<void> {
    // Close monitoring connections
  }

  private async finalCleanup(): Promise<void> {
    // Final cleanup operations
  }

  private async emergencyCleanup(): Promise<void> {
    // Emergency cleanup operations
  }

  private async cleanupOldJobStates(): Promise<void> {
    const cutoffTime = Date.now() - 7 * 24 * 60 * 60 * 1000; // 7 days ago

    const keys = await this.redis.keys("job_state:*");
    let cleanedCount = 0;

    for (const key of keys) {
      const timestamp = parseInt(key.split(":")[2]);
      if (timestamp < cutoffTime) {
        await this.redis.del(key);
        cleanedCount++;
      }
    }

    if (cleanedCount > 0) {
      console.log(`Cleaned up ${cleanedCount} old job state entries`);
    }
  }
}

// Interfaces
interface ShutdownConfig {
  maxWaitTimeMs: number;
  completedJobRetentionMs: number;
  enablePeriodicCleanup: boolean;
}

interface ActiveJob {
  id: string;
  type: string;
  data: any;
  progress: number;
  startedAt: number;
  checkpoints: any[];
  isCritical: boolean;
  canBeSafelyInterrupted: boolean;
  canBeMigrated: boolean;
  worker?: any;
}

interface JobState {
  jobId: string;
  type: string;
  data: any;
  progress: number;
  startedAt: number;
  checkpoints: any[];
  status: string;
  interruptedAt: number;
  nodeId: string;
}

interface CleanupTask {
  name: string;
  execute: () => Promise<void>;
}

interface ShutdownReadiness {
  canShutdownSafely: boolean;
  activeJobCount: number;
  criticalJobCount: number;
  longestRunningJobDuration: number;
  estimatedShutdownTime: number;
}

Key Takeaways

Distributed background processing isn’t just about running jobs on multiple servers—it’s about building coordinated systems that can maintain consistency, manage resources intelligently, and handle partial failures without cascading disasters.

Essential distributed processing patterns:

  • Cluster coordination with leader election and intelligent job distribution across nodes
  • Resource management with dynamic allocation, borrowing, and predictive scaling
  • Comprehensive monitoring with real-time queue health, worker metrics, and alerting
  • Graceful shutdown handling with job migration, state persistence, and cleanup
  • Intelligent load balancing based on node capabilities and current resource usage

The production-ready distributed job architecture:

  • Use Redis Cluster for distributed coordination and job state management
  • Implement node registry with heartbeat monitoring and automatic failure detection
  • Build resource pools with intelligent allocation based on job characteristics
  • Design cross-region replication for critical jobs and disaster recovery
  • Plan graceful degradation when parts of the cluster become unavailable

Distributed processing best practices:

  • Always implement job deduplication to prevent duplicate processing across nodes
  • Use resource borrowing to maximize cluster utilization during uneven loads
  • Monitor queue health continuously with automated alerting on depth and error rates
  • Plan for node failures with job migration and state recovery mechanisms
  • Implement predictive scaling based on historical patterns and current trends

The scaling decision framework:

  • Use single-node processing for applications with predictable, low job volumes
  • Use clustered job processing when you need fault tolerance and moderate scaling
  • Use distributed job processing for high-throughput, mission-critical applications
  • Use cloud-managed queues (SQS, Pub/Sub) when operational overhead is a concern
  • Use specialized job processors (Airflow, Prefect) for complex workflow orchestration

The operational reality checklist:

  • ✅ Job coordination prevents duplicate processing across nodes
  • ✅ Resource management handles load spikes without oversubscription
  • ✅ Monitoring provides real-time visibility into cluster health
  • ✅ Graceful shutdowns never lose in-flight job data
  • ✅ Auto-scaling responds to demand without manual intervention
  • ✅ Failed nodes don’t bring down the entire cluster
  • ✅ Job state persistence enables seamless recovery from failures

The Complete Background Processing Journey

Over these two blogs, we’ve covered the complete spectrum of background job processing—from simple job queues to distributed processing clusters that can handle millions of jobs across multiple regions.

The key insight? Background processing systems are often the most critical and most overlooked part of modern applications. They’re what turns user actions into actual business outcomes, and when they fail, everything else fails with them.

Your users might never see your background processing system, but it’s what determines whether their orders get processed, their emails get sent, their data gets backed up, and their experience feels seamless or broken.

Build it right from the beginning, because retrofitting reliability into a broken background processing system is like performing surgery on a patient who’s already coding.