Background Processing & Jobs - 1/2
The $3 Million E-commerce Meltdown That Exposed Background Job Hell
Picture this catastrophe: Black Friday morning, 6 AM. One of the largest e-commerce platforms in the world is about to face their biggest traffic day. They’ve stress-tested their frontend, optimized their databases, and scaled their servers. Everything looks perfect.
Then the traffic hits.
Within the first hour, users start reporting issues:
- Order confirmations taking 10+ minutes to arrive
- Payment processing hanging indefinitely
- Inventory updates delayed by hours, causing overselling
- Email notifications completely stopped working
- Product recommendation engine showing stale data
- Analytics dashboard frozen with yesterday’s numbers
The frontend was handling millions of requests beautifully. The database was purring along at 30% capacity. But their background job system—the invisible backbone processing orders, sending emails, updating inventory, and generating analytics—had completely collapsed.
Here’s what went wrong:
- All background jobs were running on a single Redis instance that hit memory limits
- Email sending was blocking payment processing in the same queue
- Failed jobs kept retrying without backoff, creating infinite loops
- No monitoring or alerting on job queue health
- Critical order processing jobs were stuck behind thousands of promotional emails
By noon, they had:
- $3 million in abandoned carts due to payment failures
- 50,000 angry customers who never received order confirmations
- 200,000 promotional emails that never sent
- A complete breakdown of their recommendation system
- Emergency all-hands meeting with C-level executives
The brutal truth? Their real-time user-facing features were rock solid, but they had treated background processing like an afterthought. A single point of failure in their job queue brought down features that directly impacted revenue.
The Uncomfortable Truth About Background Processing
Here’s what separates applications that scale gracefully from those that crumble under pressure: Background processing isn’t just about moving slow tasks out of the request cycle—it’s about building resilient, distributed systems that can handle failures, prioritize work intelligently, and scale independently of your main application.
Most developers approach background jobs like this:
- Move slow operations to a basic job queue
- Add more workers when things get slow
- Hope retry mechanisms will handle failures
- Realize too late that job processing is a single point of failure
- Panic when the queue becomes a bottleneck during peak traffic
But systems that handle real-world load work differently:
- Design job processing as a distributed system from day one
- Implement intelligent job prioritization and resource allocation
- Build comprehensive monitoring and failure recovery mechanisms
- Plan for job isolation and graceful degradation
- Treat background processing as mission-critical infrastructure
The difference isn’t just performance—it’s the difference between systems that enhance user experience invisibly and systems that become the reason your users leave.
Ready to build background processing that works like Shopify during Black Friday instead of that side project that crashes when three people use it simultaneously? Let’s dive into the architecture patterns that power reliable job processing at scale.
Job Queue Systems: The Invisible Backbone of Modern Applications
Understanding Job Queue Architecture
// Basic job queue implementation that will fail in production
class BadJobQueue {
private jobs: Job[] = [];
private isProcessing = false;
async addJob(job: Job): Promise<void> {
// This approach has so many problems it's painful to look at
this.jobs.push(job);
if (!this.isProcessing) {
this.processJobs(); // No error handling, no concurrency control
}
}
private async processJobs(): Promise<void> {
this.isProcessing = true;
while (this.jobs.length > 0) {
const job = this.jobs.shift()!;
try {
await this.executeJob(job); // Blocking execution, no timeout
} catch (error) {
console.log("Job failed:", error); // Jobs just disappear on failure
}
}
this.isProcessing = false;
}
private async executeJob(job: Job): Promise<void> {
// What if this takes 10 minutes? What if it crashes?
// What if the server restarts? All jobs lost!
await job.handler(job.data);
}
}
// Production-ready job queue with Redis and Bull
import Queue from "bull";
import Redis from "ioredis";
class ProductionJobQueue {
private queues: Map<string, Queue.Queue> = new Map();
private redis: Redis;
private jobProcessors: Map<string, JobProcessor> = new Map();
constructor() {
this.redis = new Redis({
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
maxRetriesPerRequest: 3,
retryDelayOnFailover: 100,
lazyConnect: true,
});
this.setupDefaultQueues();
this.setupMonitoring();
}
private setupDefaultQueues(): void {
// Different queues for different priorities and types
const queueConfigs = [
{
name: "critical",
concurrency: 10,
settings: {
stalledInterval: 30000, // 30 seconds
maxStalledCount: 1, // Fail fast for critical jobs
},
},
{
name: "high",
concurrency: 8,
settings: {
stalledInterval: 60000, // 1 minute
maxStalledCount: 3,
},
},
{
name: "default",
concurrency: 5,
settings: {
stalledInterval: 120000, // 2 minutes
maxStalledCount: 3,
},
},
{
name: "low",
concurrency: 2,
settings: {
stalledInterval: 300000, // 5 minutes
maxStalledCount: 5,
},
},
{
name: "bulk",
concurrency: 1, // Bulk operations run one at a time
settings: {
stalledInterval: 600000, // 10 minutes
maxStalledCount: 2,
},
},
];
queueConfigs.forEach((config) => {
const queue = new Queue(config.name, {
redis: {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT || "6379"),
},
defaultJobOptions: {
removeOnComplete: 50, // Keep last 50 completed jobs
removeOnFail: 100, // Keep last 100 failed jobs
attempts: this.getAttemptsForQueue(config.name),
backoff: {
type: "exponential",
delay: 2000,
},
},
settings: config.settings,
});
this.queues.set(config.name, queue);
this.setupQueueProcessing(queue, config.concurrency);
});
}
private getAttemptsForQueue(queueName: string): number {
const attemptMap = {
critical: 3, // Critical jobs retry less to fail fast
high: 5,
default: 5,
low: 10, // Low priority jobs can retry more
bulk: 3, // Bulk jobs usually shouldn't retry much
};
return attemptMap[queueName] || 5;
}
private setupQueueProcessing(queue: Queue.Queue, concurrency: number): void {
// Process jobs with proper error handling and monitoring
queue.process("*", concurrency, async (job: Queue.Job) => {
const startTime = Date.now();
const processor = this.jobProcessors.get(job.name);
if (!processor) {
throw new Error(`No processor found for job type: ${job.name}`);
}
try {
// Set up job timeout
const timeoutMs = processor.timeout || 300000; // Default 5 minutes
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error("Job timeout")), timeoutMs);
});
// Race between job execution and timeout
const result = await Promise.race([
processor.handler(job.data, job),
timeoutPromise,
]);
// Log successful completion
this.logJobCompletion(job, Date.now() - startTime, true);
return result;
} catch (error) {
// Enhanced error logging
this.logJobCompletion(job, Date.now() - startTime, false, error);
// Update job progress for monitoring
await job.progress(100);
throw error; // Re-throw to let Bull handle retries
}
});
// Set up event listeners for monitoring
this.setupQueueEventListeners(queue);
}
private setupQueueEventListeners(queue: Queue.Queue): void {
queue.on("completed", (job: Queue.Job, result: any) => {
console.log(`Job ${job.id} completed in queue ${queue.name}`);
this.updateMetrics("completed", queue.name);
});
queue.on("failed", (job: Queue.Job, error: Error) => {
console.error(`Job ${job.id} failed in queue ${queue.name}:`, error);
this.updateMetrics("failed", queue.name);
// Alert on critical job failures
if (queue.name === "critical") {
this.alertOnCriticalJobFailure(job, error);
}
});
queue.on("stalled", (job: Queue.Job) => {
console.warn(`Job ${job.id} stalled in queue ${queue.name}`);
this.updateMetrics("stalled", queue.name);
});
queue.on("progress", (job: Queue.Job, progress: number) => {
// Update job progress for long-running tasks
this.updateJobProgress(job.id, progress);
});
}
// Register job processors
registerProcessor(jobType: string, processor: JobProcessor): void {
this.jobProcessors.set(jobType, processor);
}
// Add jobs with intelligent queue selection
async addJob(
jobType: string,
data: any,
options: JobOptions = {}
): Promise<Queue.Job> {
const queueName = this.selectQueue(jobType, options.priority);
const queue = this.queues.get(queueName);
if (!queue) {
throw new Error(`Queue ${queueName} not found`);
}
const jobOptions: Queue.JobOptions = {
priority: this.getPriorityScore(options.priority),
delay: options.delay || 0,
attempts: options.attempts || this.getAttemptsForQueue(queueName),
backoff: options.backoff || {
type: "exponential",
delay: 2000,
},
removeOnComplete: options.removeOnComplete ?? true,
removeOnFail: options.removeOnFail ?? false,
};
if (options.jobId) {
jobOptions.jobId = options.jobId;
}
if (options.repeat) {
jobOptions.repeat = options.repeat;
}
const job = await queue.add(jobType, data, jobOptions);
this.logJobCreation(job, queueName);
return job;
}
private selectQueue(jobType: string, priority?: JobPriority): string {
if (priority === "critical") return "critical";
if (priority === "high") return "high";
if (priority === "low") return "low";
// Job type based routing
const jobTypeQueues = {
"send-email": "default",
"process-payment": "critical",
"generate-report": "bulk",
"resize-image": "default",
"send-notification": "high",
"cleanup-data": "low",
"backup-database": "bulk",
};
return jobTypeQueues[jobType] || "default";
}
private getPriorityScore(priority?: JobPriority): number {
const priorityScores = {
critical: 100,
high: 75,
normal: 50,
low: 25,
bulk: 1,
};
return priorityScores[priority || "normal"];
}
// Job status and monitoring
async getJobStatus(jobId: string): Promise<JobStatus | null> {
for (const [queueName, queue] of this.queues) {
try {
const job = await queue.getJob(jobId);
if (job) {
return {
id: job.id,
name: job.name,
data: job.data,
progress: job.progress(),
state: await job.getState(),
queueName,
attempts: job.attemptsMade,
maxAttempts: job.opts.attempts || 0,
createdAt: new Date(job.timestamp),
processedAt: job.processedOn ? new Date(job.processedOn) : null,
finishedAt: job.finishedOn ? new Date(job.finishedOn) : null,
failedReason: job.failedReason,
};
}
} catch (error) {
// Continue searching other queues
continue;
}
}
return null;
}
// Queue statistics and health monitoring
async getQueueStats(): Promise<QueueStats[]> {
const stats: QueueStats[] = [];
for (const [queueName, queue] of this.queues) {
const waiting = await queue.getWaiting();
const active = await queue.getActive();
const completed = await queue.getCompleted();
const failed = await queue.getFailed();
const delayed = await queue.getDelayed();
stats.push({
queueName,
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
delayed: delayed.length,
totalJobs:
waiting.length +
active.length +
completed.length +
failed.length +
delayed.length,
});
}
return stats;
}
// Clean up completed and failed jobs
async cleanupJobs(olderThanMs: number = 24 * 60 * 60 * 1000): Promise<void> {
for (const [queueName, queue] of this.queues) {
try {
await queue.clean(olderThanMs, "completed");
await queue.clean(olderThanMs, "failed");
console.log(`Cleaned up old jobs in queue: ${queueName}`);
} catch (error) {
console.error(`Failed to clean queue ${queueName}:`, error);
}
}
}
// Graceful shutdown
async shutdown(): Promise<void> {
console.log("Shutting down job queues...");
const shutdownPromises = Array.from(this.queues.values()).map((queue) =>
queue.close()
);
await Promise.all(shutdownPromises);
await this.redis.disconnect();
console.log("Job queues shut down successfully");
}
private logJobCreation(job: Queue.Job, queueName: string): void {
console.log(`Job ${job.id} (${job.name}) added to queue ${queueName}`);
}
private logJobCompletion(
job: Queue.Job,
duration: number,
success: boolean,
error?: Error
): void {
const message = `Job ${job.id} (${job.name}) ${
success ? "completed" : "failed"
} in ${duration}ms`;
if (success) {
console.log(message);
} else {
console.error(message, error);
}
}
private updateMetrics(event: string, queueName: string): void {
// Update your metrics collection system
// This could be Prometheus, StatsD, etc.
}
private updateJobProgress(jobId: string, progress: number): void {
// Update progress in external system for UI display
}
private async alertOnCriticalJobFailure(
job: Queue.Job,
error: Error
): Promise<void> {
// Send alert to your alerting system (PagerDuty, Slack, etc.)
console.error(`CRITICAL: Job ${job.id} failed:`, error);
}
}
// Job processor interface
interface JobProcessor {
handler: (data: any, job?: Queue.Job) => Promise<any>;
timeout?: number; // Timeout in milliseconds
}
// Job options interface
interface JobOptions {
priority?: JobPriority;
delay?: number;
attempts?: number;
backoff?: {
type: string;
delay: number;
};
removeOnComplete?: boolean;
removeOnFail?: boolean;
jobId?: string;
repeat?: {
cron?: string;
every?: number;
};
}
type JobPriority = "critical" | "high" | "normal" | "low" | "bulk";
interface JobStatus {
id: string;
name: string;
data: any;
progress: number;
state: string;
queueName: string;
attempts: number;
maxAttempts: number;
createdAt: Date;
processedAt: Date | null;
finishedAt: Date | null;
failedReason?: string;
}
interface QueueStats {
queueName: string;
waiting: number;
active: number;
completed: number;
failed: number;
delayed: number;
totalJobs: number;
}
Cron Jobs and Scheduling: Time-Based Task Automation
Building Robust Scheduling Systems
// Advanced cron job management with clustering support
import { CronJob } from "cron";
import Redis from "ioredis";
class AdvancedScheduler {
private jobs: Map<string, CronJob> = new Map();
private redis: Redis;
private nodeId: string;
private isLeader: boolean = false;
private leaderElectionInterval: NodeJS.Timeout | null = null;
constructor() {
this.redis = new Redis(process.env.REDIS_URL);
this.nodeId = `scheduler-${Date.now()}-${Math.random()
.toString(36)
.substr(2, 9)}`;
this.startLeaderElection();
this.setupGracefulShutdown();
}
// Leader election for distributed scheduling
private startLeaderElection(): void {
this.leaderElectionInterval = setInterval(async () => {
try {
const result = await this.redis.set(
"scheduler:leader",
this.nodeId,
"PX",
30000, // 30 second TTL
"NX" // Only set if key doesn't exist
);
if (result === "OK") {
if (!this.isLeader) {
console.log(`Node ${this.nodeId} became leader`);
this.isLeader = true;
this.startScheduledJobs();
}
} else {
// Check if we're still the leader
const currentLeader = await this.redis.get("scheduler:leader");
if (currentLeader !== this.nodeId && this.isLeader) {
console.log(`Node ${this.nodeId} lost leadership`);
this.isLeader = false;
this.stopScheduledJobs();
}
}
// Refresh leadership if we're still leader
if (this.isLeader) {
await this.redis.pexpire("scheduler:leader", 30000);
}
} catch (error) {
console.error("Leader election error:", error);
if (this.isLeader) {
this.isLeader = false;
this.stopScheduledJobs();
}
}
}, 10000); // Check every 10 seconds
}
// Schedule a new cron job
schedule(
jobId: string,
cronPattern: string,
handler: () => Promise<void>,
options: ScheduleOptions = {}
): void {
const job = new CronJob({
cronTime: cronPattern,
onTick: async () => {
if (!this.isLeader) {
return; // Only leader executes scheduled jobs
}
await this.executeScheduledJob(jobId, handler, options);
},
start: false,
timeZone: options.timeZone || "UTC",
});
this.jobs.set(jobId, job);
// Start immediately if we're already leader
if (this.isLeader) {
job.start();
}
console.log(`Scheduled job ${jobId} with pattern ${cronPattern}`);
}
private async executeScheduledJob(
jobId: string,
handler: () => Promise<void>,
options: ScheduleOptions
): Promise<void> {
const startTime = Date.now();
const lockKey = `scheduler:lock:${jobId}`;
const lockTTL = options.timeout || 300000; // Default 5 minutes
try {
// Acquire distributed lock to prevent concurrent execution
const lockAcquired = await this.redis.set(
lockKey,
this.nodeId,
"PX",
lockTTL,
"NX"
);
if (lockAcquired !== "OK") {
console.log(`Job ${jobId} skipped - already running`);
return;
}
console.log(`Executing scheduled job: ${jobId}`);
// Set up timeout
const timeoutPromise = new Promise<void>((_, reject) => {
setTimeout(() => reject(new Error("Job timeout")), lockTTL);
});
// Execute job with timeout
await Promise.race([handler(), timeoutPromise]);
const duration = Date.now() - startTime;
console.log(`Scheduled job ${jobId} completed in ${duration}ms`);
// Track execution metrics
await this.recordJobExecution(jobId, duration, true);
} catch (error) {
const duration = Date.now() - startTime;
console.error(
`Scheduled job ${jobId} failed after ${duration}ms:`,
error
);
await this.recordJobExecution(jobId, duration, false, error);
// Handle job failure based on options
if (options.onError) {
try {
await options.onError(error);
} catch (handlerError) {
console.error(`Error handler for job ${jobId} failed:`, handlerError);
}
}
} finally {
// Release lock
await this.redis.del(lockKey);
}
}
// Dynamic scheduling with database persistence
async scheduleFromDatabase(): Promise<void> {
try {
const scheduledJobs = await this.getScheduledJobsFromDB();
for (const jobConfig of scheduledJobs) {
this.schedule(
jobConfig.id,
jobConfig.cronPattern,
() => this.executeDatabaseJob(jobConfig),
{
timeZone: jobConfig.timeZone,
timeout: jobConfig.timeout,
onError: async (error) => {
await this.logJobError(jobConfig.id, error);
},
}
);
}
console.log(`Loaded ${scheduledJobs.length} jobs from database`);
} catch (error) {
console.error("Failed to load scheduled jobs from database:", error);
}
}
private async executeDatabaseJob(
jobConfig: DatabaseJobConfig
): Promise<void> {
// Execute job based on configuration
switch (jobConfig.type) {
case "cleanup":
await this.executeCleanupJob(jobConfig);
break;
case "report":
await this.executeReportJob(jobConfig);
break;
case "backup":
await this.executeBackupJob(jobConfig);
break;
case "maintenance":
await this.executeMaintenanceJob(jobConfig);
break;
default:
throw new Error(`Unknown job type: ${jobConfig.type}`);
}
}
// Specific job implementations
private async executeCleanupJob(config: DatabaseJobConfig): Promise<void> {
console.log(`Running cleanup job: ${config.id}`);
// Example: Clean up old logs
if (config.parameters?.cleanupType === "logs") {
const olderThanDays = config.parameters.olderThanDays || 30;
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
// Your cleanup logic here
const deletedCount = await this.cleanupOldLogs(cutoffDate);
console.log(`Cleaned up ${deletedCount} old log entries`);
}
}
private async executeReportJob(config: DatabaseJobConfig): Promise<void> {
console.log(`Running report job: ${config.id}`);
// Generate and send reports
const reportData = await this.generateReport(config.parameters?.reportType);
await this.sendReport(reportData, config.parameters?.recipients);
}
private async executeBackupJob(config: DatabaseJobConfig): Promise<void> {
console.log(`Running backup job: ${config.id}`);
// Perform backup
await this.performBackup(config.parameters?.backupType);
}
// Advanced scheduling patterns
setupCommonSchedules(): void {
// Every minute health check
this.schedule(
"health-check",
"* * * * *",
async () => {
await this.performHealthCheck();
},
{ timeout: 30000 } // 30 second timeout
);
// Every 5 minutes queue monitoring
this.schedule("queue-monitor", "*/5 * * * *", async () => {
await this.monitorQueueHealth();
});
// Hourly cleanup of temporary files
this.schedule("temp-cleanup", "0 * * * *", async () => {
await this.cleanupTempFiles();
});
// Daily database maintenance at 2 AM
this.schedule(
"db-maintenance",
"0 2 * * *",
async () => {
await this.performDatabaseMaintenance();
},
{ timeout: 1800000 } // 30 minute timeout
);
// Weekly report generation on Sundays at 6 AM
this.schedule(
"weekly-reports",
"0 6 * * 0",
async () => {
await this.generateWeeklyReports();
},
{ timeout: 3600000 } // 1 hour timeout
);
// Monthly billing processing on 1st at midnight
this.schedule(
"monthly-billing",
"0 0 1 * *",
async () => {
await this.processMonthlyBilling();
},
{ timeout: 7200000 } // 2 hour timeout
);
}
// Job management methods
unschedule(jobId: string): boolean {
const job = this.jobs.get(jobId);
if (job) {
job.stop();
this.jobs.delete(jobId);
console.log(`Unscheduled job: ${jobId}`);
return true;
}
return false;
}
pauseJob(jobId: string): boolean {
const job = this.jobs.get(jobId);
if (job) {
job.stop();
console.log(`Paused job: ${jobId}`);
return true;
}
return false;
}
resumeJob(jobId: string): boolean {
const job = this.jobs.get(jobId);
if (job && this.isLeader) {
job.start();
console.log(`Resumed job: ${jobId}`);
return true;
}
return false;
}
getJobStatus(): Map<string, JobScheduleStatus> {
const status = new Map<string, JobScheduleStatus>();
for (const [jobId, job] of this.jobs) {
status.set(jobId, {
jobId,
isRunning: job.running,
nextRun: job.nextDate()?.toDate() || null,
lastRun: job.lastDate()?.toDate() || null,
});
}
return status;
}
private startScheduledJobs(): void {
if (this.isLeader) {
for (const job of this.jobs.values()) {
job.start();
}
console.log("Started all scheduled jobs");
}
}
private stopScheduledJobs(): void {
for (const job of this.jobs.values()) {
job.stop();
}
console.log("Stopped all scheduled jobs");
}
private async recordJobExecution(
jobId: string,
duration: number,
success: boolean,
error?: Error
): Promise<void> {
const record = {
jobId,
nodeId: this.nodeId,
startTime: Date.now() - duration,
duration,
success,
error: error?.message,
};
await this.redis.lpush(
`scheduler:executions:${jobId}`,
JSON.stringify(record)
);
await this.redis.ltrim(`scheduler:executions:${jobId}`, 0, 100); // Keep last 100 executions
}
private setupGracefulShutdown(): void {
const shutdown = async () => {
console.log("Shutting down scheduler...");
if (this.leaderElectionInterval) {
clearInterval(this.leaderElectionInterval);
}
// Release leadership
if (this.isLeader) {
await this.redis.del("scheduler:leader");
}
// Stop all jobs
this.stopScheduledJobs();
await this.redis.disconnect();
process.exit(0);
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
}
// Placeholder methods for actual implementations
private async getScheduledJobsFromDB(): Promise<DatabaseJobConfig[]> {
// Load from your database
return [];
}
private async cleanupOldLogs(cutoffDate: Date): Promise<number> {
// Implement log cleanup
return 0;
}
private async generateReport(reportType: string): Promise<any> {
// Implement report generation
return {};
}
private async sendReport(
reportData: any,
recipients: string[]
): Promise<void> {
// Implement report sending
}
private async performBackup(backupType: string): Promise<void> {
// Implement backup logic
}
private async performHealthCheck(): Promise<void> {
// Implement health check
}
private async monitorQueueHealth(): Promise<void> {
// Implement queue monitoring
}
private async cleanupTempFiles(): Promise<void> {
// Implement temp file cleanup
}
private async performDatabaseMaintenance(): Promise<void> {
// Implement database maintenance
}
private async generateWeeklyReports(): Promise<void> {
// Implement weekly report generation
}
private async processMonthlyBilling(): Promise<void> {
// Implement monthly billing
}
private async logJobError(jobId: string, error: Error): Promise<void> {
// Implement error logging
}
}
interface ScheduleOptions {
timeZone?: string;
timeout?: number;
onError?: (error: Error) => Promise<void>;
}
interface DatabaseJobConfig {
id: string;
type: "cleanup" | "report" | "backup" | "maintenance";
cronPattern: string;
timeZone?: string;
timeout?: number;
parameters?: Record<string, any>;
}
interface JobScheduleStatus {
jobId: string;
isRunning: boolean;
nextRun: Date | null;
lastRun: Date | null;
}
Background Task Patterns: Organizing Work for Scale
Common Background Processing Patterns
// Producer-Consumer Pattern with multiple workers
class ProducerConsumerJobSystem {
private producers: Map<string, Producer> = new Map();
private consumers: Map<string, Consumer> = new Map();
private jobQueue: ProductionJobQueue;
constructor(jobQueue: ProductionJobQueue) {
this.jobQueue = jobQueue;
this.setupCommonPatterns();
}
private setupCommonPatterns(): void {
this.setupEmailProcessingPattern();
this.setupImageProcessingPattern();
this.setupDataProcessingPattern();
this.setupReportGenerationPattern();
}
// Email processing pattern
private setupEmailProcessingPattern(): void {
// Producer: Creates email jobs
this.producers.set("email", {
name: "email-producer",
generate: async (data: EmailBatchData) => {
const { recipients, template, variables } = data;
// Create individual email jobs for each recipient
for (const recipient of recipients) {
await this.jobQueue.addJob(
"send-email",
{
to: recipient.email,
template,
variables: { ...variables, ...recipient.customVars },
},
{
priority: "normal",
attempts: 5,
}
);
}
},
});
// Consumer: Processes email jobs
this.jobQueue.registerProcessor("send-email", {
handler: async (data: EmailJobData) => {
const { to, template, variables } = data;
// Load email template
const emailTemplate = await this.loadEmailTemplate(template);
// Render with variables
const renderedEmail = this.renderEmailTemplate(
emailTemplate,
variables
);
// Send email
await this.sendEmail({
to,
subject: renderedEmail.subject,
html: renderedEmail.html,
text: renderedEmail.text,
});
console.log(`Email sent to ${to}`);
},
timeout: 30000, // 30 seconds
});
}
// Image processing pipeline pattern
private setupImageProcessingPattern(): void {
// Multi-stage image processing
this.jobQueue.registerProcessor("process-image", {
handler: async (data: ImageProcessingData, job) => {
const { imageUrl, transformations, outputPath } = data;
await job?.progress(10);
// Download original image
const originalImage = await this.downloadImage(imageUrl);
await job?.progress(30);
// Apply transformations sequentially
let processedImage = originalImage;
for (const [index, transformation] of transformations.entries()) {
processedImage = await this.applyTransformation(
processedImage,
transformation
);
const progress = 30 + (index + 1) * (60 / transformations.length);
await job?.progress(progress);
}
await job?.progress(90);
// Upload processed image
const finalUrl = await this.uploadProcessedImage(
processedImage,
outputPath
);
await job?.progress(100);
return { processedImageUrl: finalUrl };
},
timeout: 300000, // 5 minutes
});
// Batch image processing producer
this.producers.set("image-batch", {
name: "image-batch-producer",
generate: async (data: ImageBatchData) => {
const { images, transformations } = data;
// Create processing job for each image
const jobs = images.map((image) =>
this.jobQueue.addJob(
"process-image",
{
imageUrl: image.url,
transformations,
outputPath: image.outputPath,
},
{
priority: "normal",
}
)
);
// Wait for all jobs to be queued
await Promise.all(jobs);
return { totalImages: images.length };
},
});
}
// Data processing with Map-Reduce pattern
private setupDataProcessingPattern(): void {
// Map phase: Split large datasets
this.jobQueue.registerProcessor("data-map", {
handler: async (data: DataMapData) => {
const { dataset, chunkSize, mapFunction } = data;
const chunks = this.chunkArray(dataset, chunkSize);
const results = [];
for (const chunk of chunks) {
const result = await this.executeMapFunction(mapFunction, chunk);
results.push(result);
}
return { mappedResults: results };
},
timeout: 600000, // 10 minutes
});
// Reduce phase: Combine results
this.jobQueue.registerProcessor("data-reduce", {
handler: async (data: DataReduceData) => {
const { mappedResults, reduceFunction } = data;
const finalResult = await this.executeReduceFunction(
reduceFunction,
mappedResults
);
return { reducedResult: finalResult };
},
timeout: 300000, // 5 minutes
});
// Orchestrator for Map-Reduce jobs
this.producers.set("map-reduce", {
name: "map-reduce-orchestrator",
generate: async (data: MapReduceData) => {
const { dataset, chunkSize, mapFunction, reduceFunction } = data;
// Start map jobs
const mapJobs = [];
const chunks = this.chunkArray(dataset, chunkSize);
for (const chunk of chunks) {
const mapJob = await this.jobQueue.addJob("data-map", {
dataset: chunk,
chunkSize: chunkSize,
mapFunction,
});
mapJobs.push(mapJob);
}
// Wait for all map jobs to complete
const mapResults = await Promise.all(
mapJobs.map((job) => this.waitForJobCompletion(job.id))
);
// Start reduce job
await this.jobQueue.addJob("data-reduce", {
mappedResults: mapResults.map((r) => r.mappedResults).flat(),
reduceFunction,
});
return { mapJobCount: mapJobs.length };
},
});
}
// Report generation with dependency management
private setupReportGenerationPattern(): void {
this.jobQueue.registerProcessor("generate-report", {
handler: async (data: ReportGenerationData, job) => {
const { reportType, dateRange, recipients } = data;
await job?.progress(10);
// Collect data from multiple sources
const dataSources = await this.identifyDataSources(reportType);
const collectedData = {};
for (const [index, source] of dataSources.entries()) {
collectedData[source.name] = await this.collectDataFromSource(
source,
dateRange
);
const progress = 10 + (index + 1) * (60 / dataSources.length);
await job?.progress(progress);
}
await job?.progress(70);
// Generate report
const report = await this.generateReport(
reportType,
collectedData,
dateRange
);
await job?.progress(90);
// Distribute report
await this.distributeReport(report, recipients);
await job?.progress(100);
return { reportId: report.id, recipients: recipients.length };
},
timeout: 1800000, // 30 minutes
});
}
// Chain pattern for sequential processing
async createProcessingChain(steps: ChainStep[]): Promise<string> {
const chainId = `chain-${Date.now()}-${Math.random()
.toString(36)
.substr(2, 9)}`;
// Create jobs in sequence, each depending on the previous
let previousJobId: string | null = null;
for (const [index, step] of steps.entries()) {
const jobId = `${chainId}-step-${index}`;
await this.jobQueue.addJob(
"chain-step",
{
stepIndex: index,
step,
chainId,
previousJobId,
},
{
jobId,
priority: "normal",
delay: previousJobId ? 1000 : 0, // Small delay to ensure order
}
);
previousJobId = jobId;
}
return chainId;
}
// Chain step processor
private setupChainProcessor(): void {
this.jobQueue.registerProcessor("chain-step", {
handler: async (data: ChainStepData) => {
const { stepIndex, step, chainId, previousJobId } = data;
// Wait for previous step if exists
if (previousJobId) {
await this.waitForJobCompletion(previousJobId);
}
// Execute current step
console.log(`Executing step ${stepIndex} of chain ${chainId}`);
const result = await step.handler(step.data);
// Store result for next step
await this.redis.setex(
`chain:${chainId}:step:${stepIndex}`,
3600, // 1 hour TTL
JSON.stringify(result)
);
return result;
},
});
}
// Fan-out/Fan-in pattern
async createFanOutFanIn(
fanOutData: any,
fanOutJobs: FanOutJob[],
fanInHandler: (results: any[]) => Promise<any>
): Promise<string> {
const fanOutId = `fanout-${Date.now()}-${Math.random()
.toString(36)
.substr(2, 9)}`;
// Create fan-out jobs
const jobIds = [];
for (const [index, fanOutJob] of fanOutJobs.entries()) {
const jobId = `${fanOutId}-fanout-${index}`;
await this.jobQueue.addJob(
fanOutJob.jobType,
{
...fanOutJob.data,
fanOutId,
fanOutIndex: index,
},
{
jobId,
priority: fanOutJob.priority || "normal",
}
);
jobIds.push(jobId);
}
// Create fan-in job that waits for all fan-out jobs
await this.jobQueue.addJob(
"fan-in",
{
fanOutId,
fanOutJobIds: jobIds,
fanInHandler: fanInHandler.toString(), // Serialize function
},
{
jobId: `${fanOutId}-fanin`,
delay: 5000, // Give fan-out jobs time to start
}
);
return fanOutId;
}
// Utility methods
private chunkArray<T>(array: T[], chunkSize: number): T[][] {
const chunks = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}
private async waitForJobCompletion(jobId: string): Promise<any> {
// Poll job status until completion
let attempts = 0;
const maxAttempts = 300; // 5 minutes with 1-second intervals
while (attempts < maxAttempts) {
const status = await this.jobQueue.getJobStatus(jobId);
if (!status) {
throw new Error(`Job ${jobId} not found`);
}
if (status.state === "completed") {
return status.data;
}
if (status.state === "failed") {
throw new Error(`Job ${jobId} failed: ${status.failedReason}`);
}
await new Promise((resolve) => setTimeout(resolve, 1000));
attempts++;
}
throw new Error(`Job ${jobId} timed out waiting for completion`);
}
// Placeholder implementations
private async loadEmailTemplate(template: string): Promise<any> {
return {};
}
private renderEmailTemplate(template: any, variables: any): any {
return {};
}
private async sendEmail(emailData: any): Promise<void> {}
private async downloadImage(url: string): Promise<any> {
return null;
}
private async applyTransformation(
image: any,
transformation: any
): Promise<any> {
return image;
}
private async uploadProcessedImage(
image: any,
path: string
): Promise<string> {
return "";
}
private async executeMapFunction(func: any, data: any): Promise<any> {
return null;
}
private async executeReduceFunction(func: any, data: any): Promise<any> {
return null;
}
private async identifyDataSources(reportType: string): Promise<any[]> {
return [];
}
private async collectDataFromSource(
source: any,
dateRange: any
): Promise<any> {
return {};
}
private async generateReport(
type: string,
data: any,
dateRange: any
): Promise<any> {
return {};
}
private async distributeReport(
report: any,
recipients: any[]
): Promise<void> {}
}
// Pattern interfaces
interface Producer {
name: string;
generate: (data: any) => Promise<any>;
}
interface Consumer {
name: string;
process: (data: any) => Promise<any>;
}
interface ChainStep {
handler: (data: any) => Promise<any>;
data: any;
}
interface ChainStepData {
stepIndex: number;
step: ChainStep;
chainId: string;
previousJobId: string | null;
}
interface FanOutJob {
jobType: string;
data: any;
priority?: JobPriority;
}
// Data interfaces for different job types
interface EmailBatchData {
recipients: Array<{ email: string; customVars?: Record<string, any> }>;
template: string;
variables: Record<string, any>;
}
interface EmailJobData {
to: string;
template: string;
variables: Record<string, any>;
}
interface ImageProcessingData {
imageUrl: string;
transformations: any[];
outputPath: string;
}
interface ImageBatchData {
images: Array<{ url: string; outputPath: string }>;
transformations: any[];
}
interface DataMapData {
dataset: any[];
chunkSize: number;
mapFunction: any;
}
interface DataReduceData {
mappedResults: any[];
reduceFunction: any;
}
interface MapReduceData {
dataset: any[];
chunkSize: number;
mapFunction: any;
reduceFunction: any;
}
interface ReportGenerationData {
reportType: string;
dateRange: { start: Date; end: Date };
recipients: string[];
}
Worker Processes and Threading: Maximizing Concurrent Processing
Building Scalable Worker Architecture
// Advanced worker pool management with clustering
import cluster from "cluster";
import os from "os";
import { Worker, isMainThread, parentPort, workerData } from "worker_threads";
class WorkerPoolManager {
private workers: Map<string, WorkerInfo> = new Map();
private taskQueue: Task[] = [];
private completedTasks: Map<string, TaskResult> = new Map();
private workerConfig: WorkerConfig;
constructor(config: WorkerConfig) {
this.workerConfig = config;
this.initializeWorkerPool();
this.setupTaskDistribution();
this.setupHealthMonitoring();
}
private initializeWorkerPool(): void {
const numWorkers = this.workerConfig.maxWorkers || os.cpus().length;
console.log(`Initializing worker pool with ${numWorkers} workers`);
for (let i = 0; i < numWorkers; i++) {
this.createWorker(`worker-${i}`);
}
}
private createWorker(workerId: string): void {
const worker = new Worker(__filename, {
workerData: {
workerId,
workerType: this.workerConfig.workerType,
},
});
const workerInfo: WorkerInfo = {
id: workerId,
worker,
isAvailable: true,
currentTask: null,
tasksCompleted: 0,
tasksErrored: 0,
startedAt: Date.now(),
lastActivity: Date.now(),
memoryUsage: 0,
cpuUsage: 0,
};
// Handle worker messages
worker.on("message", (message: WorkerMessage) => {
this.handleWorkerMessage(workerId, message);
});
// Handle worker errors
worker.on("error", (error) => {
console.error(`Worker ${workerId} error:`, error);
this.handleWorkerError(workerId, error);
});
// Handle worker exit
worker.on("exit", (code) => {
console.log(`Worker ${workerId} exited with code ${code}`);
this.handleWorkerExit(workerId, code);
});
this.workers.set(workerId, workerInfo);
console.log(`Worker ${workerId} created and ready`);
}
private handleWorkerMessage(workerId: string, message: WorkerMessage): void {
const workerInfo = this.workers.get(workerId);
if (!workerInfo) return;
switch (message.type) {
case "task-completed":
this.handleTaskCompleted(workerId, message.payload);
break;
case "task-failed":
this.handleTaskFailed(workerId, message.payload);
break;
case "progress-update":
this.handleProgressUpdate(workerId, message.payload);
break;
case "health-stats":
this.handleHealthStats(workerId, message.payload);
break;
}
workerInfo.lastActivity = Date.now();
}
private handleTaskCompleted(workerId: string, payload: any): void {
const workerInfo = this.workers.get(workerId);
if (!workerInfo || !workerInfo.currentTask) return;
const task = workerInfo.currentTask;
// Store result
this.completedTasks.set(task.id, {
taskId: task.id,
result: payload.result,
completedAt: Date.now(),
workerId,
executionTime: payload.executionTime,
});
// Update worker stats
workerInfo.tasksCompleted++;
workerInfo.currentTask = null;
workerInfo.isAvailable = true;
console.log(`Task ${task.id} completed by worker ${workerId}`);
// Process next task if available
this.assignTaskToWorker(workerId);
}
private handleTaskFailed(workerId: string, payload: any): void {
const workerInfo = this.workers.get(workerId);
if (!workerInfo || !workerInfo.currentTask) return;
const task = workerInfo.currentTask;
// Update worker stats
workerInfo.tasksErrored++;
workerInfo.currentTask = null;
workerInfo.isAvailable = true;
console.error(
`Task ${task.id} failed on worker ${workerId}:`,
payload.error
);
// Handle retry logic
if (task.retryCount < task.maxRetries) {
task.retryCount++;
task.lastError = payload.error;
this.taskQueue.unshift(task); // Add back to front of queue
console.log(
`Task ${task.id} queued for retry (attempt ${task.retryCount}/${task.maxRetries})`
);
} else {
// Store failed result
this.completedTasks.set(task.id, {
taskId: task.id,
error: payload.error,
completedAt: Date.now(),
workerId,
failed: true,
});
}
// Process next task
this.assignTaskToWorker(workerId);
}
// Task submission and management
async submitTask(taskData: TaskData): Promise<string> {
const task: Task = {
id: `task-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
type: taskData.type,
data: taskData.data,
priority: taskData.priority || "normal",
submittedAt: Date.now(),
retryCount: 0,
maxRetries: taskData.maxRetries || 3,
timeout: taskData.timeout || 300000, // 5 minutes default
};
// Insert task into queue based on priority
this.insertTaskByPriority(task);
// Try to assign immediately to available worker
this.assignTaskToAvailableWorker();
console.log(`Task ${task.id} submitted to queue`);
return task.id;
}
private insertTaskByPriority(task: Task): void {
const priorityValues = { critical: 4, high: 3, normal: 2, low: 1 };
const taskPriority = priorityValues[task.priority];
// Find insertion point based on priority
let insertIndex = this.taskQueue.length;
for (let i = 0; i < this.taskQueue.length; i++) {
const queuedTaskPriority = priorityValues[this.taskQueue[i].priority];
if (taskPriority > queuedTaskPriority) {
insertIndex = i;
break;
}
}
this.taskQueue.splice(insertIndex, 0, task);
}
private assignTaskToAvailableWorker(): void {
if (this.taskQueue.length === 0) return;
for (const [workerId, workerInfo] of this.workers) {
if (workerInfo.isAvailable) {
this.assignTaskToWorker(workerId);
break;
}
}
}
private assignTaskToWorker(workerId: string): void {
const workerInfo = this.workers.get(workerId);
if (!workerInfo || !workerInfo.isAvailable || this.taskQueue.length === 0) {
return;
}
const task = this.taskQueue.shift()!;
workerInfo.currentTask = task;
workerInfo.isAvailable = false;
// Send task to worker
workerInfo.worker.postMessage({
type: "execute-task",
payload: {
task: {
id: task.id,
type: task.type,
data: task.data,
timeout: task.timeout,
},
},
});
console.log(`Task ${task.id} assigned to worker ${workerId}`);
}
private setupTaskDistribution(): void {
// Periodically check for pending tasks
setInterval(() => {
if (this.taskQueue.length > 0) {
this.assignTaskToAvailableWorker();
}
}, 1000);
}
// Health monitoring and auto-scaling
private setupHealthMonitoring(): void {
setInterval(async () => {
await this.monitorWorkerHealth();
await this.autoScale();
}, 30000); // Every 30 seconds
}
private async monitorWorkerHealth(): Promise<void> {
for (const [workerId, workerInfo] of this.workers) {
// Check for stuck workers
if (
workerInfo.currentTask &&
Date.now() - workerInfo.lastActivity > 300000
) {
// 5 minutes
console.warn(`Worker ${workerId} appears stuck, restarting...`);
await this.restartWorker(workerId);
}
// Request health stats from worker
workerInfo.worker.postMessage({
type: "health-check",
payload: {},
});
}
}
private async autoScale(): Promise<void> {
const totalWorkers = this.workers.size;
const availableWorkers = Array.from(this.workers.values()).filter(
(w) => w.isAvailable
).length;
const queueLength = this.taskQueue.length;
// Scale up if queue is building up
if (
queueLength > totalWorkers * 2 &&
totalWorkers < this.workerConfig.maxWorkers!
) {
const newWorkerId = `worker-${Date.now()}`;
this.createWorker(newWorkerId);
console.log(`Scaled up: Created worker ${newWorkerId}`);
}
// Scale down if too many idle workers
if (
availableWorkers > totalWorkers * 0.7 &&
totalWorkers > this.workerConfig.minWorkers! &&
queueLength === 0
) {
// Find least busy worker to terminate
const workerToTerminate = this.findLeastBusyWorker();
if (workerToTerminate) {
await this.terminateWorker(workerToTerminate.id);
console.log(`Scaled down: Terminated worker ${workerToTerminate.id}`);
}
}
}
private async restartWorker(workerId: string): Promise<void> {
const workerInfo = this.workers.get(workerId);
if (!workerInfo) return;
// If worker has a current task, add it back to queue
if (workerInfo.currentTask) {
this.taskQueue.unshift(workerInfo.currentTask);
}
// Terminate and recreate worker
await this.terminateWorker(workerId);
this.createWorker(workerId);
}
private async terminateWorker(workerId: string): Promise<void> {
const workerInfo = this.workers.get(workerId);
if (!workerInfo) return;
try {
await workerInfo.worker.terminate();
} catch (error) {
console.error(`Error terminating worker ${workerId}:`, error);
}
this.workers.delete(workerId);
}
private findLeastBusyWorker(): WorkerInfo | null {
let leastBusy: WorkerInfo | null = null;
for (const workerInfo of this.workers.values()) {
if (
workerInfo.isAvailable &&
(!leastBusy || workerInfo.tasksCompleted < leastBusy.tasksCompleted)
) {
leastBusy = workerInfo;
}
}
return leastBusy;
}
// Task result retrieval
async getTaskResult(
taskId: string,
timeoutMs: number = 300000
): Promise<any> {
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
const result = this.completedTasks.get(taskId);
if (result) {
return result;
}
// Check if task is still in queue or being processed
const inQueue = this.taskQueue.some((t) => t.id === taskId);
const beingProcessed = Array.from(this.workers.values()).some(
(w) => w.currentTask?.id === taskId
);
if (!inQueue && !beingProcessed) {
throw new Error(`Task ${taskId} not found`);
}
// Wait before checking again
await new Promise((resolve) => setTimeout(resolve, 100));
}
throw new Error(`Task ${taskId} timed out`);
}
// Statistics and monitoring
getPoolStats(): WorkerPoolStats {
const workers = Array.from(this.workers.values());
return {
totalWorkers: workers.length,
availableWorkers: workers.filter((w) => w.isAvailable).length,
busyWorkers: workers.filter((w) => !w.isAvailable).length,
queueLength: this.taskQueue.length,
totalTasksCompleted: workers.reduce(
(sum, w) => sum + w.tasksCompleted,
0
),
totalTasksErrored: workers.reduce((sum, w) => sum + w.tasksErrored, 0),
avgTasksPerWorker:
workers.reduce((sum, w) => sum + w.tasksCompleted, 0) / workers.length,
oldestWorkerAge: Math.max(
...workers.map((w) => Date.now() - w.startedAt)
),
};
}
// Graceful shutdown
async shutdown(): Promise<void> {
console.log("Shutting down worker pool...");
// Wait for current tasks to complete (with timeout)
const shutdownTimeout = 60000; // 1 minute
const startTime = Date.now();
while (Date.now() - startTime < shutdownTimeout) {
const busyWorkers = Array.from(this.workers.values()).filter(
(w) => !w.isAvailable
);
if (busyWorkers.length === 0) {
break;
}
await new Promise((resolve) => setTimeout(resolve, 1000));
}
// Terminate all workers
const terminatePromises = Array.from(this.workers.keys()).map((workerId) =>
this.terminateWorker(workerId)
);
await Promise.all(terminatePromises);
console.log("Worker pool shut down successfully");
}
}
// Worker thread implementation
if (!isMainThread) {
const { workerId, workerType } = workerData;
// Worker message handler
parentPort?.on("message", async (message: WorkerMessage) => {
switch (message.type) {
case "execute-task":
await executeTask(message.payload.task);
break;
case "health-check":
sendHealthStats();
break;
}
});
async function executeTask(task: any): Promise<void> {
const startTime = Date.now();
try {
// Set up task timeout
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => reject(new Error("Task timeout")), task.timeout);
});
// Execute task based on type
const taskPromise = executeTaskByType(task);
// Race between task execution and timeout
const result = await Promise.race([taskPromise, timeoutPromise]);
// Send success message
parentPort?.postMessage({
type: "task-completed",
payload: {
taskId: task.id,
result,
executionTime: Date.now() - startTime,
},
});
} catch (error) {
// Send error message
parentPort?.postMessage({
type: "task-failed",
payload: {
taskId: task.id,
error: error.message,
executionTime: Date.now() - startTime,
},
});
}
}
async function executeTaskByType(task: any): Promise<any> {
switch (task.type) {
case "cpu-intensive":
return executeCPUIntensiveTask(task.data);
case "io-intensive":
return executeIOIntensiveTask(task.data);
case "data-processing":
return executeDataProcessingTask(task.data);
default:
throw new Error(`Unknown task type: ${task.type}`);
}
}
function sendHealthStats(): void {
const memUsage = process.memoryUsage();
parentPort?.postMessage({
type: "health-stats",
payload: {
workerId,
memoryUsage: memUsage,
uptime: process.uptime(),
},
});
}
// Task type implementations
async function executeCPUIntensiveTask(data: any): Promise<any> {
// Simulate CPU-intensive work
const { iterations = 1000000 } = data;
let result = 0;
for (let i = 0; i < iterations; i++) {
result += Math.sqrt(i);
}
return { result, iterations };
}
async function executeIOIntensiveTask(data: any): Promise<any> {
// Simulate I/O intensive work
const { url, timeout = 10000 } = data;
// Simulate network request
await new Promise((resolve) => setTimeout(resolve, Math.random() * 1000));
return { url, responseTime: Date.now() };
}
async function executeDataProcessingTask(data: any): Promise<any> {
// Simulate data processing
const { dataset, operation } = data;
let processedData;
switch (operation) {
case "filter":
processedData = dataset.filter((item: any) => item.active);
break;
case "transform":
processedData = dataset.map((item: any) => ({
...item,
processed: true,
timestamp: Date.now(),
}));
break;
case "aggregate":
processedData = {
count: dataset.length,
sum: dataset.reduce(
(sum: number, item: any) => sum + (item.value || 0),
0
),
};
break;
default:
throw new Error(`Unknown operation: ${operation}`);
}
return { processedData, operation };
}
}
// Interfaces
interface WorkerConfig {
maxWorkers?: number;
minWorkers?: number;
workerType: string;
}
interface WorkerInfo {
id: string;
worker: Worker;
isAvailable: boolean;
currentTask: Task | null;
tasksCompleted: number;
tasksErrored: number;
startedAt: number;
lastActivity: number;
memoryUsage: number;
cpuUsage: number;
}
interface Task {
id: string;
type: string;
data: any;
priority: "critical" | "high" | "normal" | "low";
submittedAt: number;
retryCount: number;
maxRetries: number;
timeout: number;
lastError?: string;
}
interface TaskData {
type: string;
data: any;
priority?: "critical" | "high" | "normal" | "low";
maxRetries?: number;
timeout?: number;
}
interface TaskResult {
taskId: string;
result?: any;
error?: string;
completedAt: number;
workerId: string;
executionTime?: number;
failed?: boolean;
}
interface WorkerMessage {
type: string;
payload: any;
}
interface WorkerPoolStats {
totalWorkers: number;
availableWorkers: number;
busyWorkers: number;
queueLength: number;
totalTasksCompleted: number;
totalTasksErrored: number;
avgTasksPerWorker: number;
oldestWorkerAge: number;
}
Job Retry and Failure Handling: Building Resilient Processing
Intelligent Retry Strategies and Dead Letter Queues
// Advanced retry mechanisms with exponential backoff and circuit breakers
class RetryHandler {
private retryConfigs: Map<string, RetryConfig> = new Map();
private circuitBreakers: Map<string, CircuitBreaker> = new Map();
private deadLetterQueue: DeadLetterQueue;
private redis: Redis;
constructor() {
this.redis = new Redis(process.env.REDIS_URL);
this.deadLetterQueue = new DeadLetterQueue(this.redis);
this.setupDefaultRetryConfigs();
this.setupCircuitBreakers();
}
private setupDefaultRetryConfigs(): void {
// Email sending - gentle retries
this.retryConfigs.set("send-email", {
maxAttempts: 5,
baseDelayMs: 1000,
maxDelayMs: 300000, // 5 minutes
backoffStrategy: "exponential",
jitterFactor: 0.1,
retryableErrors: ["TIMEOUT", "RATE_LIMITED", "TEMPORARY_FAILURE"],
nonRetryableErrors: ["INVALID_EMAIL", "AUTHENTICATION_FAILED"],
});
// Payment processing - aggressive retries with strict limits
this.retryConfigs.set("process-payment", {
maxAttempts: 3,
baseDelayMs: 500,
maxDelayMs: 30000, // 30 seconds
backoffStrategy: "linear",
jitterFactor: 0.05,
retryableErrors: ["NETWORK_ERROR", "GATEWAY_TIMEOUT"],
nonRetryableErrors: [
"INSUFFICIENT_FUNDS",
"INVALID_CARD",
"FRAUD_DETECTED",
],
});
// Data processing - patient retries
this.retryConfigs.set("process-data", {
maxAttempts: 10,
baseDelayMs: 2000,
maxDelayMs: 1800000, // 30 minutes
backoffStrategy: "exponential",
jitterFactor: 0.2,
retryableErrors: ["DATABASE_LOCKED", "RESOURCE_BUSY", "TEMPORARY_ERROR"],
nonRetryableErrors: ["INVALID_DATA", "SCHEMA_ERROR"],
});
// External API calls - circuit breaker friendly
this.retryConfigs.set("external-api", {
maxAttempts: 7,
baseDelayMs: 1500,
maxDelayMs: 600000, // 10 minutes
backoffStrategy: "exponential",
jitterFactor: 0.15,
retryableErrors: ["API_UNAVAILABLE", "RATE_LIMITED", "TIMEOUT"],
nonRetryableErrors: ["UNAUTHORIZED", "NOT_FOUND", "BAD_REQUEST"],
enableCircuitBreaker: true,
});
}
private setupCircuitBreakers(): void {
// Email service circuit breaker
this.circuitBreakers.set(
"email-service",
new CircuitBreaker({
failureThreshold: 50, // 50% failure rate
recoveryTimeout: 60000, // 1 minute
monitoringPeriod: 300000, // 5 minutes
})
);
// Payment gateway circuit breaker
this.circuitBreakers.set(
"payment-gateway",
new CircuitBreaker({
failureThreshold: 25, // 25% failure rate (stricter for payments)
recoveryTimeout: 120000, // 2 minutes
monitoringPeriod: 180000, // 3 minutes
})
);
// External API circuit breaker
this.circuitBreakers.set(
"external-api",
new CircuitBreaker({
failureThreshold: 60, // 60% failure rate
recoveryTimeout: 300000, // 5 minutes
monitoringPeriod: 600000, // 10 minutes
})
);
}
async handleJobFailure(
job: FailedJob,
error: JobError,
retryContext: RetryContext
): Promise<RetryDecision> {
const config = this.retryConfigs.get(job.type);
if (!config) {
return { shouldRetry: false, reason: "No retry config found" };
}
// Check if error is retryable
if (!this.isRetryableError(error, config)) {
await this.deadLetterQueue.addJob(job, error, "NON_RETRYABLE_ERROR");
return {
shouldRetry: false,
reason: `Non-retryable error: ${error.code}`,
};
}
// Check circuit breaker if enabled
if (config.enableCircuitBreaker) {
const circuitBreaker = this.circuitBreakers.get(job.service || job.type);
if (circuitBreaker && circuitBreaker.isOpen()) {
await this.deadLetterQueue.addJob(job, error, "CIRCUIT_BREAKER_OPEN");
return {
shouldRetry: false,
reason: "Circuit breaker is open",
};
}
}
// Check retry limits
if (retryContext.attemptNumber >= config.maxAttempts) {
await this.deadLetterQueue.addJob(job, error, "MAX_RETRIES_EXCEEDED");
return {
shouldRetry: false,
reason: "Maximum retry attempts exceeded",
};
}
// Calculate retry delay
const delay = this.calculateRetryDelay(
config,
retryContext.attemptNumber,
retryContext.previousDelays
);
// Update circuit breaker
if (config.enableCircuitBreaker) {
const circuitBreaker = this.circuitBreakers.get(job.service || job.type);
circuitBreaker?.recordFailure();
}
// Store retry context
await this.storeRetryContext(job.id, {
...retryContext,
attemptNumber: retryContext.attemptNumber + 1,
previousDelays: [...retryContext.previousDelays, delay],
lastError: error,
lastAttemptAt: Date.now(),
});
return {
shouldRetry: true,
delayMs: delay,
reason: `Retry attempt ${retryContext.attemptNumber + 1}/${
config.maxAttempts
}`,
};
}
private isRetryableError(error: JobError, config: RetryConfig): boolean {
// Check non-retryable errors first
if (config.nonRetryableErrors.includes(error.code)) {
return false;
}
// Check retryable errors
if (config.retryableErrors.length > 0) {
return config.retryableErrors.includes(error.code);
}
// Default behavior - retry unless explicitly non-retryable
return true;
}
private calculateRetryDelay(
config: RetryConfig,
attemptNumber: number,
previousDelays: number[]
): number {
let delay: number;
switch (config.backoffStrategy) {
case "linear":
delay = config.baseDelayMs * attemptNumber;
break;
case "exponential":
delay = config.baseDelayMs * Math.pow(2, attemptNumber - 1);
break;
case "fibonacci":
delay = config.baseDelayMs * this.fibonacci(attemptNumber);
break;
case "custom":
delay = this.calculateCustomDelay(
config,
attemptNumber,
previousDelays
);
break;
default:
delay = config.baseDelayMs;
}
// Apply jitter to prevent thundering herd
if (config.jitterFactor > 0) {
const jitter = delay * config.jitterFactor * (Math.random() - 0.5) * 2;
delay += jitter;
}
// Ensure delay is within bounds
delay = Math.max(config.baseDelayMs, Math.min(delay, config.maxDelayMs));
return Math.round(delay);
}
private fibonacci(n: number): number {
if (n <= 1) return 1;
let a = 1,
b = 1;
for (let i = 2; i < n; i++) {
[a, b] = [b, a + b];
}
return b;
}
private calculateCustomDelay(
config: RetryConfig,
attemptNumber: number,
previousDelays: number[]
): number {
// Custom delay calculation based on specific business logic
// For example, time-aware delays for email sending
const now = new Date();
const hour = now.getHours();
// Avoid business hours for non-critical emails
if (hour >= 9 && hour <= 17) {
return config.baseDelayMs * 3; // Longer delay during business hours
}
// Shorter delays during off hours
return config.baseDelayMs;
}
// Dead letter queue management
async getDeadLetterJobs(
limit: number = 50,
offset: number = 0
): Promise<DeadLetterJob[]> {
return await this.deadLetterQueue.getJobs(limit, offset);
}
async reprocessDeadLetterJob(jobId: string): Promise<boolean> {
const deadJob = await this.deadLetterQueue.getJob(jobId);
if (!deadJob) {
return false;
}
// Reset retry context
await this.storeRetryContext(deadJob.originalJob.id, {
attemptNumber: 0,
previousDelays: [],
lastError: null,
lastAttemptAt: Date.now(),
});
// Remove from dead letter queue
await this.deadLetterQueue.removeJob(jobId);
// Re-queue the job
await this.requeue(deadJob.originalJob);
return true;
}
// Retry context management
private async storeRetryContext(
jobId: string,
context: RetryContext
): Promise<void> {
await this.redis.setex(
`retry_context:${jobId}`,
3600, // 1 hour TTL
JSON.stringify(context)
);
}
async getRetryContext(jobId: string): Promise<RetryContext | null> {
const contextData = await this.redis.get(`retry_context:${jobId}`);
if (!contextData) {
return {
attemptNumber: 0,
previousDelays: [],
lastError: null,
lastAttemptAt: Date.now(),
};
}
return JSON.parse(contextData);
}
// Monitoring and analytics
async getRetryStatistics(
timeframe: string = "24h"
): Promise<RetryStatistics> {
const stats = await this.redis.hgetall(`retry_stats:${timeframe}`);
return {
totalJobs: parseInt(stats.total_jobs || "0"),
succeededJobs: parseInt(stats.succeeded_jobs || "0"),
failedJobs: parseInt(stats.failed_jobs || "0"),
retriedJobs: parseInt(stats.retried_jobs || "0"),
deadLetterJobs: parseInt(stats.dead_letter_jobs || "0"),
avgRetryAttempts: parseFloat(stats.avg_retry_attempts || "0"),
successRateAfterRetry: parseFloat(stats.success_rate_after_retry || "0"),
};
}
// Helper methods
private async requeue(job: FailedJob): Promise<void> {
// Implementation depends on your job queue system
console.log(`Re-queuing job ${job.id}`);
}
}
// Circuit breaker implementation
class CircuitBreaker {
private failureCount: number = 0;
private successCount: number = 0;
private lastFailureTime: number = 0;
private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";
constructor(private config: CircuitBreakerConfig) {}
isOpen(): boolean {
if (this.state === "OPEN") {
// Check if we should try half-open
if (Date.now() - this.lastFailureTime > this.config.recoveryTimeout) {
this.state = "HALF_OPEN";
return false;
}
return true;
}
return false;
}
recordSuccess(): void {
this.successCount++;
if (this.state === "HALF_OPEN") {
// Recovery successful
this.state = "CLOSED";
this.failureCount = 0;
}
}
recordFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
// Calculate failure rate
const totalRequests = this.failureCount + this.successCount;
const failureRate = (this.failureCount / totalRequests) * 100;
if (failureRate >= this.config.failureThreshold) {
this.state = "OPEN";
}
// Reset counters periodically
if (Date.now() - this.lastFailureTime > this.config.monitoringPeriod) {
this.failureCount = 0;
this.successCount = 0;
}
}
}
// Dead letter queue implementation
class DeadLetterQueue {
constructor(private redis: Redis) {}
async addJob(job: FailedJob, error: JobError, reason: string): Promise<void> {
const deadLetterJob: DeadLetterJob = {
id: `dlq-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
originalJob: job,
error,
reason,
addedAt: Date.now(),
retryCount: job.retryCount || 0,
};
await this.redis.lpush("dead_letter_queue", JSON.stringify(deadLetterJob));
console.log(`Job ${job.id} added to dead letter queue: ${reason}`);
}
async getJobs(limit: number, offset: number): Promise<DeadLetterJob[]> {
const jobs = await this.redis.lrange(
"dead_letter_queue",
offset,
offset + limit - 1
);
return jobs.map((job) => JSON.parse(job));
}
async getJob(jobId: string): Promise<DeadLetterJob | null> {
const jobs = await this.redis.lrange("dead_letter_queue", 0, -1);
for (const jobData of jobs) {
const job = JSON.parse(jobData);
if (job.id === jobId || job.originalJob.id === jobId) {
return job;
}
}
return null;
}
async removeJob(jobId: string): Promise<boolean> {
const job = await this.getJob(jobId);
if (!job) return false;
await this.redis.lrem("dead_letter_queue", 1, JSON.stringify(job));
return true;
}
}
// Interfaces
interface RetryConfig {
maxAttempts: number;
baseDelayMs: number;
maxDelayMs: number;
backoffStrategy: "linear" | "exponential" | "fibonacci" | "custom";
jitterFactor: number;
retryableErrors: string[];
nonRetryableErrors: string[];
enableCircuitBreaker?: boolean;
}
interface CircuitBreakerConfig {
failureThreshold: number; // Percentage
recoveryTimeout: number; // Milliseconds
monitoringPeriod: number; // Milliseconds
}
interface FailedJob {
id: string;
type: string;
data: any;
service?: string;
retryCount?: number;
}
interface JobError {
code: string;
message: string;
stack?: string;
metadata?: any;
}
interface RetryContext {
attemptNumber: number;
previousDelays: number[];
lastError: JobError | null;
lastAttemptAt: number;
}
interface RetryDecision {
shouldRetry: boolean;
delayMs?: number;
reason: string;
}
interface DeadLetterJob {
id: string;
originalJob: FailedJob;
error: JobError;
reason: string;
addedAt: number;
retryCount: number;
}
interface RetryStatistics {
totalJobs: number;
succeededJobs: number;
failedJobs: number;
retriedJobs: number;
deadLetterJobs: number;
avgRetryAttempts: number;
successRateAfterRetry: number;
}
Key Takeaways
Background processing isn’t just about moving slow tasks out of the request cycle—it’s about building resilient, distributed systems that can handle failures gracefully and scale independently of your main application.
Essential background processing patterns:
- Queue-based architecture with priority-based task distribution and intelligent routing
- Robust scheduling systems with leader election and distributed coordination
- Worker pool management with auto-scaling and health monitoring
- Intelligent retry mechanisms with exponential backoff and circuit breakers
- Dead letter queues for handling permanently failed jobs
The production-ready job processing framework:
- Use Redis or similar for reliable job persistence and cross-server coordination
- Implement multiple queue priorities to handle critical vs. bulk operations differently
- Build comprehensive monitoring with metrics on queue depth, processing time, and failure rates
- Design graceful degradation so job failures don’t cascade to user-facing features
- Plan horizontal scaling with worker auto-scaling based on queue metrics
Background processing best practices:
- Always implement timeouts for job execution to prevent stuck workers
- Use exponential backoff for retries to avoid overwhelming failing services
- Monitor queue health as aggressively as you monitor application health
- Plan for job isolation so one type of job can’t block others
- Build idempotent jobs that can be safely retried without side effects
The architecture decision framework:
- Use simple cron jobs for basic scheduled tasks with low failure tolerance
- Use Redis/Bull queues for high-throughput job processing with retry logic
- Use worker threads for CPU-intensive tasks that can be parallelized
- Use distributed job systems (like Celery) for complex multi-service processing
- Use cloud functions for sporadic, event-driven background tasks
What’s Next?
In the next blog, we’ll complete our background processing journey by diving into distributed job processing, queue monitoring and management, resource management for background jobs, and scaling background processing across multiple servers.
We’ll also explore the operational challenges of maintaining job processing systems at scale—from handling queue backlog during traffic spikes to debugging why critical background jobs aren’t processing.
Because building background processing that works during development is the easy part. Making it reliable enough to handle Black Friday traffic while processing millions of background tasks—that’s where systems engineering becomes art.