Microservices Architecture - 2/3

From Microservices Foundations to Distributed System Resilience

You’ve mastered microservices architecture fundamentals with clear service boundaries using domain-driven design, implemented professional service decomposition strategies that break monoliths into maintainable distributed systems, established robust inter-service communication patterns using HTTP, messaging, and gRPC, built production-grade API gateways that handle routing, security, and cross-cutting concerns, deployed service mesh infrastructure that provides traffic management and security policies, and implemented data consistency patterns with saga orchestration and event sourcing. Your microservices now operate as a well-architected distributed system with proper boundaries and communication. But here’s the operational reality that separates functional microservices from bulletproof distributed systems: perfect service architecture means nothing if your services can’t find each other dynamically, lack centralized configuration management that enables rapid deployment changes, have no distributed tracing to debug cross-service issues, operate without circuit breakers that prevent cascade failures, and depend on fragile communication patterns instead of resilient event-driven architectures that handle real-world network partitions and service outages.

The distributed systems operational nightmare that destroys scalable architectures:

# Your distributed systems horror story
# VP Engineering: "We have 50 microservices that need to work together flawlessly"

# Disaster 1: Service Discovery Failure
$ kubectl get pods -n production
NAME                    READY   STATUS    RESTARTS   AGE
user-service-64b7f      1/1     Running   0          2d
order-service-97c2a     1/1     Running   0          1d
payment-service-f8d3e   0/1     Pending   0          30m
inventory-service-k2l9  1/1     CrashLoopBackOff   12   45m

# Services hardcoded with IP addresses - chaos when pods restart
$ grep -r "http://10.244" .
./order-service/src/clients/user-client.js:  baseURL: 'http://10.244.1.15:3001'
./payment-service/src/clients/order-client.js:  baseURL: 'http://10.244.1.22:3002'
# Pod restarts get new IPs = broken connections everywhere

# Emergency IP hunting at 3 AM
$ kubectl get pods -o wide | grep user-service
user-service-64b7f   1/1     Running   10.244.1.27   node-1
# Old code still pointing to 10.244.1.15, new pod at 10.244.1.27
# Every service restart breaks dependent services

# Disaster 2: Configuration Management Chaos
$ kubectl logs payment-service-f8d3e
Error: DATABASE_URL environment variable not set
Error: STRIPE_API_KEY not found
Error: REDIS_HOST undefined
# Different environments have different configs scattered everywhere

$ find . -name "*.env*" | wc -l
23
# 23 different config files for 8 services across 3 environments
# No one knows which config is actually being used

# Disaster 3: Debugging Distributed Transactions
# Customer: "My order payment failed but my card was charged"
$ kubectl logs order-service -n production --since=1h | grep order-12345
order-service: Processing order-12345, calling payment service
order-service: Payment service timeout after 30s
order-service: Marked order as failed

$ kubectl logs payment-service -n production --since=1h | grep order-12345
payment-service: Received payment request for order-12345
payment-service: Stripe payment successful, payment-id: pay_abc123
payment-service: Failed to notify order service (connection refused)
# Payment succeeded, order service never got notification
# No correlation IDs, no distributed tracing, debugging nightmare

# Disaster 4: Cascade Failure Avalanche
$ curl https://api.myapp.com/orders
{"error": "Service Unavailable", "message": "Unable to process request"}

# The cascade begins...
$ kubectl logs inventory-service
inventory-service: Database connection pool exhausted
inventory-service: 500 concurrent requests queued
inventory-service: CPU usage: 100%, Memory: 95%

$ kubectl logs order-service
order-service: Inventory service timeout (attempt 1/3)
order-service: Inventory service timeout (attempt 2/3)
order-service: Inventory service timeout (attempt 3/3)
order-service: All inventory requests failing, backing up request queue

$ kubectl logs api-gateway
gateway: Order service responding slowly (5000ms average)
gateway: Request timeout count: 1,247 in last 5 minutes
gateway: Circuit breaker SHOULD be open but isn't implemented
# One slow service brings down entire system

# Disaster 5: Event-Driven Architecture Breakdown
# When network partitions happen...
$ kubectl describe pod rabbitmq-cluster-0
Events:
  Warning  NetworkNotReady   Node network is not ready

# Services can't communicate via events, fall back to direct calls
# Synchronous calls during network issues = complete system failure
# Eventual consistency becomes "never consistency"

# The brutal distributed systems truth:
# Perfect microservices architecture is worthless without:
# - Dynamic service discovery and health management
# - Centralized configuration with environment-specific overrides
# - Distributed tracing for debugging cross-service transactions
# - Circuit breakers and bulkheads for fault isolation
# - Event-driven patterns that handle network partitions gracefully

# Result: 6-hour production outage during Black Friday
# 47 services, 23 configuration files, 0 observability
# $8M in lost revenue, engineering team works through weekend
# CTO fired, architecture team disbanded
# The painful lesson: Microservices need distributed system patterns

The uncomfortable distributed truth: Perfect microservice boundaries and communication patterns can’t save you from distributed system failures when your services can’t discover each other reliably, have no centralized configuration strategy, lack distributed tracing for debugging complex workflows, operate without fault tolerance patterns, and rely on synchronous communication instead of resilient event-driven architectures. Professional microservices architecture requires mastering distributed system operational patterns.

Real-world distributed systems failure consequences:

// What happens when distributed systems lack operational patterns:
const distributedSystemsFailureImpact = {
  serviceDiscoveryFailures: {
    problem:
      "Service restarts break all dependent services due to hardcoded IPs",
    cause: "No service registry, manual IP management, static configuration",
    impact: "30-minute outages every time Kubernetes reschedules pods",
    cost: "$200K monthly in lost revenue from deployment-induced outages",
  },

  configurationChaos: {
    problem: "Wrong database credentials deployed to production",
    cause: "23 different config files, no centralized management, human error",
    impact: "Data corruption from services connecting to wrong databases",
    consequences: "Compliance violations and customer data integrity issues",
  },

  debuggingNightmares: {
    problem:
      "Customer transaction fails across 8 services, impossible to trace",
    cause: "No distributed tracing, no correlation IDs, fragmented logging",
    impact: "Engineering team spends 40 hours debugging simple payment issue",
    reality: "Mean time to resolution increases 10x with each added service",
  },

  cascadeFailures: {
    problem: "One slow service brings down entire platform",
    cause: "No circuit breakers, no bulkheads, synchronous failure propagation",
    impact: "Complete system outage from single service performance issue",
    prevention:
      "Fault tolerance patterns would isolate failures to affected service",
  },

  networkPartitionProblems: {
    problem: "Services become inconsistent during network issues",
    cause: "Synchronous communication only, no event-driven fallbacks",
    impact:
      "Data inconsistency and business logic failures during network splits",
    solution: "Event-driven architecture provides natural partition tolerance",
  },

  // Microservices without distributed system patterns create
  // more problems than monolithic architecture ever could
};

Distributed microservices mastery requires understanding:

  • Service discovery and registry that enables dynamic service location and health management at scale
  • Configuration management that centralizes settings while supporting environment-specific overrides and runtime updates
  • Distributed tracing and monitoring that provides end-to-end visibility across service boundaries for debugging complex workflows
  • Circuit breakers and fault tolerance that prevent cascade failures and provide graceful degradation during service outages
  • Advanced event-driven architecture that handles network partitions, provides eventual consistency, and enables loose coupling

This article transforms your microservices from fragile distributed components into resilient, observable, and maintainable distributed systems that handle real-world operational challenges with confidence.


Service Discovery and Registry: Dynamic Service Location

From Static IPs to Dynamic Service Mesh

Understanding service discovery evolution:

// Service discovery pattern evolution in distributed systems
const serviceDiscoveryEvolution = {
  // Anti-pattern: Hardcoded service locations
  staticConfiguration: {
    approach: "Hardcode IP addresses and ports in configuration",
    implementation: {
      orderService: "http://10.244.1.15:3001",
      paymentService: "http://10.244.1.22:3002",
      inventoryService: "http://10.244.1.33:3003",
    },
    problems: [
      "Service restarts get new IP addresses",
      "Load balancing requires manual configuration updates",
      "Service scaling means updating all dependent services",
      "No health checking - calls to dead services",
      "Environment differences require different configs",
    ],
    realityCheck: "Breaks every time containers restart in Kubernetes",
  },

  // Basic pattern: DNS-based discovery
  dnsBasedDiscovery: {
    approach: "Use DNS names for service resolution",
    implementation: {
      orderService: "http://user-service.production.svc.cluster.local:3001",
      paymentService:
        "http://payment-service.production.svc.cluster.local:3002",
    },
    advantages: [
      "Services survive pod restarts",
      "Built-in load balancing via DNS round-robin",
      "Standard networking protocol",
    ],
    limitations: [
      "No health-aware load balancing",
      "DNS caching can route to dead services",
      "Limited load balancing algorithms",
      "No service metadata or versioning",
    ],
  },

  // Professional pattern: Service registry with health checking
  serviceRegistryPattern: {
    approach: "Centralized registry with health checking and metadata",
    components: {
      serviceRegistry: "Central database of available services",
      healthChecking: "Continuous monitoring of service health",
      loadBalancing: "Intelligent routing based on service health and load",
      serviceMetadata: "Version information, capabilities, and constraints",
    },

    advantages: [
      "Health-aware load balancing",
      "Automatic service registration and deregistration",
      "Support for multiple service versions",
      "Rich service metadata and discovery features",
      "Circuit breaker integration",
    ],
  },

  // Advanced pattern: Service mesh discovery
  serviceMeshDiscovery: {
    approach: "Infrastructure layer handles all service communication",
    features: [
      "Automatic service discovery via sidecar proxies",
      "Mutual TLS between services",
      "Advanced traffic shaping and routing",
      "Comprehensive observability and tracing",
      "Policy enforcement at network layer",
    ],
    outcome:
      "Services focus on business logic while mesh handles communication",
  },
};

Professional Service Discovery Implementation:

// service-discovery.js - Production-ready service registry with health checking
const EventEmitter = require("events");
const axios = require("axios");

class ServiceRegistry extends EventEmitter {
  constructor(options = {}) {
    super();
    this.services = new Map();
    this.healthCheckInterval = options.healthCheckInterval || 30000; // 30 seconds
    this.healthCheckTimeout = options.healthCheckTimeout || 5000; // 5 seconds
    this.maxHealthCheckFailures = options.maxHealthCheckFailures || 3;

    this.startHealthChecking();

    // Graceful shutdown
    process.on("SIGTERM", () => this.shutdown());
    process.on("SIGINT", () => this.shutdown());
  }

  // Register a service instance
  register(serviceName, serviceInstance) {
    if (!serviceName || !serviceInstance) {
      throw new Error("Service name and instance details are required");
    }

    const instance = {
      id: serviceInstance.id || this.generateInstanceId(serviceName),
      host: serviceInstance.host,
      port: serviceInstance.port,
      health: serviceInstance.health || "/health",
      metadata: serviceInstance.metadata || {},
      version: serviceInstance.version || "1.0.0",
      status: "starting",
      lastHealthCheck: null,
      healthCheckFailures: 0,
      registeredAt: new Date(),
      lastSeen: new Date(),
    };

    // Initialize service entry if doesn't exist
    if (!this.services.has(serviceName)) {
      this.services.set(serviceName, {
        instances: new Map(),
        loadBalancingStrategy: "round-robin",
        lastInstanceIndex: 0,
      });
    }

    const service = this.services.get(serviceName);
    service.instances.set(instance.id, instance);

    console.log(
      `Service registered: ${serviceName}/${instance.id} at ${instance.host}:${instance.port}`
    );

    // Immediate health check
    this.checkInstanceHealth(serviceName, instance);

    this.emit("serviceRegistered", {
      serviceName,
      instanceId: instance.id,
      instance,
    });

    return instance.id;
  }

  // Deregister a service instance
  deregister(serviceName, instanceId) {
    const service = this.services.get(serviceName);
    if (!service || !service.instances.has(instanceId)) {
      console.warn(
        `Instance ${serviceName}/${instanceId} not found for deregistration`
      );
      return false;
    }

    const instance = service.instances.get(instanceId);
    service.instances.delete(instanceId);

    // Remove service entirely if no instances remain
    if (service.instances.size === 0) {
      this.services.delete(serviceName);
    }

    console.log(`Service deregistered: ${serviceName}/${instanceId}`);

    this.emit("serviceDeregistered", {
      serviceName,
      instanceId,
      instance,
    });

    return true;
  }

  // Discover a healthy service instance
  discover(serviceName, options = {}) {
    const service = this.services.get(serviceName);
    if (!service) {
      throw new Error(`Service ${serviceName} not found in registry`);
    }

    // Get healthy instances
    const healthyInstances = Array.from(service.instances.values()).filter(
      (instance) => instance.status === "healthy"
    );

    if (healthyInstances.length === 0) {
      throw new Error(
        `No healthy instances available for service ${serviceName}`
      );
    }

    // Apply filters if provided
    let filteredInstances = healthyInstances;

    if (options.version) {
      filteredInstances = filteredInstances.filter(
        (instance) => instance.version === options.version
      );
    }

    if (options.metadata) {
      filteredInstances = filteredInstances.filter((instance) => {
        return Object.keys(options.metadata).every(
          (key) => instance.metadata[key] === options.metadata[key]
        );
      });
    }

    if (filteredInstances.length === 0) {
      throw new Error(
        `No instances match the specified criteria for ${serviceName}`
      );
    }

    // Load balancing
    const selectedInstance = this.selectInstance(
      service,
      filteredInstances,
      options.strategy
    );

    return {
      id: selectedInstance.id,
      url: `http://${selectedInstance.host}:${selectedInstance.port}`,
      host: selectedInstance.host,
      port: selectedInstance.port,
      metadata: selectedInstance.metadata,
      version: selectedInstance.version,
    };
  }

  // Load balancing strategies
  selectInstance(service, instances, strategy) {
    strategy = strategy || service.loadBalancingStrategy;

    switch (strategy) {
      case "round-robin":
        service.lastInstanceIndex =
          (service.lastInstanceIndex + 1) % instances.length;
        return instances[service.lastInstanceIndex];

      case "random":
        return instances[Math.floor(Math.random() * instances.length)];

      case "least-connections":
        // In a real implementation, this would track active connections
        return instances.reduce((least, current) =>
          (current.activeConnections || 0) < (least.activeConnections || 0)
            ? current
            : least
        );

      case "health-weighted":
        // Prefer instances with better health scores
        return instances.reduce((best, current) =>
          (current.healthScore || 1.0) > (best.healthScore || 1.0)
            ? current
            : best
        );

      default:
        return instances[0];
    }
  }

  // Health checking
  async checkInstanceHealth(serviceName, instance) {
    const healthUrl = `http://${instance.host}:${instance.port}${instance.health}`;

    try {
      const response = await axios.get(healthUrl, {
        timeout: this.healthCheckTimeout,
        headers: {
          "User-Agent": "ServiceRegistry/1.0",
          "X-Health-Check": "true",
        },
      });

      // Health check successful
      if (response.status === 200) {
        const wasUnhealthy = instance.status !== "healthy";

        instance.status = "healthy";
        instance.lastHealthCheck = new Date();
        instance.healthCheckFailures = 0;
        instance.lastSeen = new Date();

        // Extract health metadata if provided
        if (response.data && typeof response.data === "object") {
          instance.healthScore = response.data.score || 1.0;
          instance.activeConnections = response.data.connections || 0;
          instance.systemLoad = response.data.load || 0;
        }

        if (wasUnhealthy) {
          console.log(`Instance recovered: ${serviceName}/${instance.id}`);
          this.emit("instanceHealthy", { serviceName, instance });
        }

        return true;
      }
    } catch (error) {
      // Health check failed
      instance.healthCheckFailures++;
      instance.lastHealthCheck = new Date();

      const wasHealthy = instance.status === "healthy";

      if (instance.healthCheckFailures >= this.maxHealthCheckFailures) {
        instance.status = "unhealthy";

        if (wasHealthy) {
          console.warn(
            `Instance unhealthy: ${serviceName}/${instance.id} (${error.message})`
          );
          this.emit("instanceUnhealthy", {
            serviceName,
            instance,
            error: error.message,
          });
        }
      }

      return false;
    }
  }

  startHealthChecking() {
    this.healthCheckTimer = setInterval(async () => {
      const healthCheckPromises = [];

      for (const [serviceName, service] of this.services) {
        for (const instance of service.instances.values()) {
          healthCheckPromises.push(
            this.checkInstanceHealth(serviceName, instance)
          );
        }
      }

      // Run all health checks concurrently
      await Promise.allSettled(healthCheckPromises);

      // Clean up instances that haven't been seen for a long time
      this.cleanupStaleInstances();
    }, this.healthCheckInterval);
  }

  cleanupStaleInstances() {
    const staleThreshold = this.healthCheckInterval * 5; // 5 missed health checks
    const cutoffTime = new Date(Date.now() - staleThreshold);

    for (const [serviceName, service] of this.services) {
      for (const [instanceId, instance] of service.instances) {
        if (instance.lastSeen < cutoffTime) {
          console.log(
            `Cleaning up stale instance: ${serviceName}/${instanceId}`
          );
          this.deregister(serviceName, instanceId);
        }
      }
    }
  }

  // Service heartbeat - call this from service instances
  heartbeat(serviceName, instanceId) {
    const service = this.services.get(serviceName);
    if (!service) return false;

    const instance = service.instances.get(instanceId);
    if (!instance) return false;

    instance.lastSeen = new Date();
    return true;
  }

  // Get service statistics
  getServiceStats(serviceName) {
    const service = this.services.get(serviceName);
    if (!service) {
      throw new Error(`Service ${serviceName} not found`);
    }

    const instances = Array.from(service.instances.values());
    const healthy = instances.filter((i) => i.status === "healthy");
    const unhealthy = instances.filter((i) => i.status === "unhealthy");

    return {
      serviceName,
      totalInstances: instances.length,
      healthyInstances: healthy.length,
      unhealthyInstances: unhealthy.length,
      loadBalancingStrategy: service.loadBalancingStrategy,
      instances: instances.map((instance) => ({
        id: instance.id,
        host: instance.host,
        port: instance.port,
        status: instance.status,
        version: instance.version,
        healthCheckFailures: instance.healthCheckFailures,
        lastHealthCheck: instance.lastHealthCheck,
        registeredAt: instance.registeredAt,
        metadata: instance.metadata,
      })),
    };
  }

  // Get all services overview
  getAllServices() {
    const services = {};

    for (const serviceName of this.services.keys()) {
      services[serviceName] = this.getServiceStats(serviceName);
    }

    return services;
  }

  generateInstanceId(serviceName) {
    return `${serviceName}-${Date.now()}-${Math.random()
      .toString(36)
      .substr(2, 9)}`;
  }

  async shutdown() {
    console.log("Service registry shutting down...");

    if (this.healthCheckTimer) {
      clearInterval(this.healthCheckTimer);
    }

    this.emit("shutdown");
  }
}

// Service Discovery Client
class ServiceDiscoveryClient {
  constructor(registryUrl, serviceName, serviceInstance) {
    this.registryUrl = registryUrl;
    this.serviceName = serviceName;
    this.serviceInstance = serviceInstance;
    this.instanceId = null;
    this.heartbeatInterval = 15000; // 15 seconds
    this.heartbeatTimer = null;

    // Auto-register on startup
    this.register();
  }

  async register() {
    try {
      const response = await axios.post(
        `${this.registryUrl}/services/${this.serviceName}/register`,
        this.serviceInstance
      );

      this.instanceId = response.data.instanceId;
      console.log(
        `Registered with service registry: ${this.serviceName}/${this.instanceId}`
      );

      // Start heartbeat
      this.startHeartbeat();
    } catch (error) {
      console.error("Failed to register with service registry:", error.message);
      // Retry after delay
      setTimeout(() => this.register(), 5000);
    }
  }

  async deregister() {
    if (!this.instanceId) return;

    try {
      await axios.delete(
        `${this.registryUrl}/services/${this.serviceName}/instances/${this.instanceId}`
      );
      console.log(
        `Deregistered from service registry: ${this.serviceName}/${this.instanceId}`
      );
    } catch (error) {
      console.error(
        "Failed to deregister from service registry:",
        error.message
      );
    }

    this.stopHeartbeat();
  }

  async discover(targetServiceName, options = {}) {
    try {
      const response = await axios.get(
        `${this.registryUrl}/services/${targetServiceName}/discover`,
        {
          params: options,
        }
      );

      return response.data;
    } catch (error) {
      throw new Error(
        `Failed to discover service ${targetServiceName}: ${error.message}`
      );
    }
  }

  startHeartbeat() {
    this.heartbeatTimer = setInterval(async () => {
      try {
        await axios.post(
          `${this.registryUrl}/services/${this.serviceName}/instances/${this.instanceId}/heartbeat`
        );
      } catch (error) {
        console.warn("Heartbeat failed:", error.message);
        // If heartbeat fails consistently, try to re-register
        clearInterval(this.heartbeatTimer);
        setTimeout(() => this.register(), 1000);
      }
    }, this.heartbeatInterval);
  }

  stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer);
      this.heartbeatTimer = null;
    }
  }
}

module.exports = {
  ServiceRegistry,
  ServiceDiscoveryClient,
};

Kubernetes-Native Service Discovery:

# kubernetes-service-discovery.yaml - Native Kubernetes service discovery
apiVersion: v1
kind: Service
metadata:
  name: user-service
  namespace: production
  labels:
    app: user-service
    version: v1.2.0
  annotations:
    service.discovery/health-check-path: "/health"
    service.discovery/metrics-path: "/metrics"
    service.discovery/load-balancing: "least-connections"
spec:
  selector:
    app: user-service
  ports:
    - name: http
      port: 3001
      targetPort: 3001
      protocol: TCP
    - name: metrics
      port: 9090
      targetPort: 9090
      protocol: TCP
  type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
  namespace: production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
        version: v1.2.0
    spec:
      containers:
        - name: user-service
          image: myapp/user-service:v1.2.0
          ports:
            - containerPort: 3001
              name: http
            - containerPort: 9090
              name: metrics
          env:
            - name: SERVICE_NAME
              value: "user-service"
            - name: SERVICE_VERSION
              value: "v1.2.0"
            - name: DISCOVERY_ENABLED
              value: "true"
          livenessProbe:
            httpGet:
              path: /health
              port: http
            initialDelaySeconds: 30
            periodSeconds: 10
            timeoutSeconds: 5
            failureThreshold: 3
          readinessProbe:
            httpGet:
              path: /ready
              port: http
            initialDelaySeconds: 5
            periodSeconds: 5
            timeoutSeconds: 3
            failureThreshold: 2
          resources:
            requests:
              cpu: 100m
              memory: 128Mi
            limits:
              cpu: 500m
              memory: 512Mi
---
# Service Monitor for Prometheus service discovery
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: user-service-metrics
  namespace: production
spec:
  selector:
    matchLabels:
      app: user-service
  endpoints:
    - port: metrics
      path: /metrics
      interval: 30s
      scrapeTimeout: 10s
---
# Headless service for direct pod discovery
apiVersion: v1
kind: Service
metadata:
  name: user-service-headless
  namespace: production
spec:
  clusterIP: None
  selector:
    app: user-service
  ports:
    - name: http
      port: 3001
      targetPort: 3001

Configuration Management: Centralized Settings with Environment Flexibility

From Scattered Config Files to Centralized Management

Understanding configuration management evolution:

// Configuration management pattern evolution
const configManagementEvolution = {
  // Anti-pattern: Hardcoded configuration
  hardcodedConfig: {
    problems: [
      "Configuration changes require code deployment",
      "Secrets stored in source code",
      "Different environments need different builds",
      "No runtime configuration updates possible",
    ],
    example: {
      databaseUrl: "postgresql://user:pass@localhost:5432/prod",
      apiKey: "sk_live_abcd1234...",
      timeout: 30000,
    },
    reality: "Impossible to manage across environments and teams",
  },

  // Basic pattern: Environment variables
  environmentVariables: {
    advantages: [
      "Configuration separated from code",
      "Different values per environment",
      "Standard deployment practice",
    ],
    limitations: [
      "No validation or type checking",
      "Difficult to manage complex nested configuration",
      "No audit trail of configuration changes",
      "No runtime updates without restart",
    ],
  },

  // Professional pattern: Configuration service
  configurationService: {
    features: [
      "Centralized configuration storage",
      "Environment-specific overrides",
      "Configuration validation and schemas",
      "Audit trail and change tracking",
      "Runtime configuration updates",
      "Secret management integration",
      "Feature flag support",
    ],

    benefits: [
      "Single source of truth for all configuration",
      "Safe configuration changes with rollback",
      "Environment promotion without rebuilds",
      "Dynamic feature enabling/disabling",
    ],
  },
};

// Configuration service implementation patterns
const configServicePatterns = {
  // Pull-based configuration
  pullBased: {
    description: "Services periodically fetch configuration from central store",
    advantages: [
      "Simple implementation",
      "Centralized control",
      "Consistent configuration",
    ],
    disadvantages: ["Configuration update delay", "Polling overhead"],
  },

  // Push-based configuration
  pushBased: {
    description: "Configuration service pushes updates to services",
    advantages: ["Immediate updates", "No polling overhead", "Event-driven"],
    disadvantages: [
      "More complex implementation",
      "Service availability dependencies",
    ],
  },

  // Hybrid approach
  hybrid: {
    description: "Combination of pull for initial config and push for updates",
    advantages: ["Best of both worlds", "Resilient to network issues"],
    implementation: "Initial pull + WebSocket/webhook for real-time updates",
  },
};

Professional Configuration Management Implementation:

// config-service.js - Production-ready configuration management service
const EventEmitter = require("events");
const crypto = require("crypto");

class ConfigurationService extends EventEmitter {
  constructor(options = {}) {
    super();
    this.configurations = new Map(); // In production: use Redis/database
    this.schemas = new Map();
    this.auditLog = [];
    this.subscribers = new Map(); // Services subscribed to config changes

    this.secretsProvider = options.secretsProvider;
    this.encryptionKey = options.encryptionKey;

    // Initialize with default configurations
    this.initializeDefaults();
  }

  // Set configuration for a service/environment
  async setConfiguration(serviceName, environment, config, options = {}) {
    const configKey = `${serviceName}:${environment}`;

    // Validate against schema if available
    const schema = this.schemas.get(serviceName);
    if (schema) {
      const validation = this.validateConfiguration(config, schema);
      if (!validation.valid) {
        throw new Error(
          `Configuration validation failed: ${validation.errors.join(", ")}`
        );
      }
    }

    // Encrypt sensitive values
    const processedConfig = await this.processConfiguration(config);

    // Store previous version for rollback
    const previousConfig = this.configurations.get(configKey);

    const configEntry = {
      serviceName,
      environment,
      configuration: processedConfig,
      version: this.generateVersion(),
      createdBy: options.user || "system",
      createdAt: new Date(),
      previousVersion: previousConfig?.version,
      checksum: this.calculateChecksum(processedConfig),
    };

    this.configurations.set(configKey, configEntry);

    // Audit log entry
    this.auditLog.push({
      action: "configuration_updated",
      serviceName,
      environment,
      version: configEntry.version,
      user: options.user || "system",
      timestamp: new Date(),
      changes: this.calculateChanges(previousConfig?.configuration, config),
    });

    console.log(`Configuration updated: ${configKey} v${configEntry.version}`);

    // Notify subscribers
    await this.notifySubscribers(serviceName, environment, configEntry);

    this.emit("configurationUpdated", {
      serviceName,
      environment,
      version: configEntry.version,
      configuration: config,
    });

    return configEntry.version;
  }

  // Get configuration for a service/environment
  async getConfiguration(serviceName, environment, options = {}) {
    const configKey = `${serviceName}:${environment}`;
    let configEntry = this.configurations.get(configKey);

    // Fall back to base environment if not found
    if (!configEntry && environment !== "default") {
      configKey = `${serviceName}:default`;
      configEntry = this.configurations.get(`${serviceName}:default`);
    }

    if (!configEntry) {
      throw new Error(
        `No configuration found for ${serviceName}:${environment}`
      );
    }

    // Decrypt sensitive values
    const decryptedConfig = await this.decryptConfiguration(
      configEntry.configuration
    );

    // Resolve references (e.g., to other configs or secrets)
    const resolvedConfig = await this.resolveConfigReferences(decryptedConfig);

    // Apply feature flags if requested
    if (options.includeFeatureFlags) {
      resolvedConfig.features = await this.getFeatureFlags(
        serviceName,
        environment
      );
    }

    return {
      configuration: resolvedConfig,
      version: configEntry.version,
      checksum: configEntry.checksum,
      lastModified: configEntry.createdAt,
    };
  }

  // Register configuration schema for validation
  registerSchema(serviceName, schema) {
    this.schemas.set(serviceName, schema);
    console.log(`Schema registered for service: ${serviceName}`);
  }

  // Validate configuration against schema
  validateConfiguration(config, schema) {
    const errors = [];

    // Required fields
    if (schema.required) {
      schema.required.forEach((field) => {
        if (!(field in config)) {
          errors.push(`Required field missing: ${field}`);
        }
      });
    }

    // Field types and constraints
    if (schema.properties) {
      Object.keys(schema.properties).forEach((field) => {
        if (field in config) {
          const fieldSchema = schema.properties[field];
          const value = config[field];

          // Type checking
          if (fieldSchema.type && typeof value !== fieldSchema.type) {
            errors.push(
              `Field ${field} should be type ${
                fieldSchema.type
              }, got ${typeof value}`
            );
          }

          // String length validation
          if (
            fieldSchema.type === "string" &&
            fieldSchema.minLength &&
            value.length < fieldSchema.minLength
          ) {
            errors.push(
              `Field ${field} should be at least ${fieldSchema.minLength} characters`
            );
          }

          // Number range validation
          if (fieldSchema.type === "number") {
            if (fieldSchema.min !== undefined && value < fieldSchema.min) {
              errors.push(`Field ${field} should be >= ${fieldSchema.min}`);
            }
            if (fieldSchema.max !== undefined && value > fieldSchema.max) {
              errors.push(`Field ${field} should be <= ${fieldSchema.max}`);
            }
          }

          // Pattern validation
          if (
            fieldSchema.pattern &&
            !new RegExp(fieldSchema.pattern).test(value)
          ) {
            errors.push(`Field ${field} does not match required pattern`);
          }
        }
      });
    }

    return {
      valid: errors.length === 0,
      errors,
    };
  }

  // Process configuration (encrypt secrets, resolve templates)
  async processConfiguration(config) {
    const processed = JSON.parse(JSON.stringify(config)); // Deep clone

    return await this.processConfigObject(processed);
  }

  async processConfigObject(obj) {
    for (const key in obj) {
      if (typeof obj[key] === "object" && obj[key] !== null) {
        obj[key] = await this.processConfigObject(obj[key]);
      } else if (typeof obj[key] === "string") {
        // Handle secret references like ${secret:database-password}
        if (obj[key].startsWith("${secret:")) {
          const secretName = obj[key].slice(9, -1);
          obj[key] = await this.getSecret(secretName);
        }

        // Handle environment variable references like ${env:DATABASE_URL}
        else if (obj[key].startsWith("${env:")) {
          const envVar = obj[key].slice(6, -1);
          obj[key] = process.env[envVar];
        }

        // Encrypt sensitive fields
        else if (this.isSensitiveField(key)) {
          obj[key] = this.encrypt(obj[key]);
        }
      }
    }
    return obj;
  }

  // Subscribe to configuration changes
  subscribe(serviceName, environment, callback, options = {}) {
    const subscriptionKey = `${serviceName}:${environment}`;

    if (!this.subscribers.has(subscriptionKey)) {
      this.subscribers.set(subscriptionKey, new Set());
    }

    const subscription = {
      callback,
      options,
      subscribedAt: new Date(),
    };

    this.subscribers.get(subscriptionKey).add(subscription);

    console.log(`Service subscribed to config updates: ${subscriptionKey}`);

    // Return unsubscribe function
    return () => {
      this.subscribers.get(subscriptionKey).delete(subscription);
    };
  }

  // Notify subscribers of configuration changes
  async notifySubscribers(serviceName, environment, configEntry) {
    const subscriptionKey = `${serviceName}:${environment}`;
    const subscribers = this.subscribers.get(subscriptionKey);

    if (!subscribers || subscribers.size === 0) {
      return;
    }

    const notifications = [];

    for (const subscription of subscribers) {
      try {
        const notification = subscription.callback({
          serviceName,
          environment,
          version: configEntry.version,
          configuration: configEntry.configuration,
        });

        if (notification instanceof Promise) {
          notifications.push(notification);
        }
      } catch (error) {
        console.error(
          `Failed to notify subscriber for ${subscriptionKey}:`,
          error
        );
      }
    }

    // Wait for all notifications to complete
    await Promise.allSettled(notifications);
  }

  // Feature flag management
  async setFeatureFlag(
    serviceName,
    environment,
    flagName,
    enabled,
    conditions = {}
  ) {
    const flagKey = `${serviceName}:${environment}:flags`;
    let flags = this.configurations.get(flagKey)?.configuration || {};

    flags[flagName] = {
      enabled,
      conditions,
      createdAt: new Date(),
      lastModified: new Date(),
    };

    await this.setConfiguration(`${serviceName}:flags`, environment, flags);
  }

  async getFeatureFlags(serviceName, environment) {
    const flagKey = `${serviceName}:${environment}:flags`;
    const configEntry = this.configurations.get(flagKey);
    return configEntry?.configuration || {};
  }

  // Rollback to previous configuration version
  async rollback(serviceName, environment, targetVersion) {
    const configKey = `${serviceName}:${environment}`;
    const currentConfig = this.configurations.get(configKey);

    if (!currentConfig) {
      throw new Error(
        `No current configuration found for ${serviceName}:${environment}`
      );
    }

    // Find target version in audit log
    const targetEntry = this.auditLog.find(
      (entry) =>
        entry.serviceName === serviceName &&
        entry.environment === environment &&
        entry.version === targetVersion
    );

    if (!targetEntry) {
      throw new Error(
        `Target version ${targetVersion} not found for ${serviceName}:${environment}`
      );
    }

    console.log(
      `Rolling back ${serviceName}:${environment} from v${currentConfig.version} to v${targetVersion}`
    );

    // This is simplified - in production, you'd store full configuration history
    throw new Error(
      "Rollback functionality requires full configuration versioning implementation"
    );
  }

  // Utility methods
  isSensitiveField(fieldName) {
    const sensitiveFields = [
      "password",
      "secret",
      "key",
      "token",
      "credential",
    ];
    return sensitiveFields.some((sensitive) =>
      fieldName.toLowerCase().includes(sensitive)
    );
  }

  encrypt(value) {
    if (!this.encryptionKey) return value;

    const cipher = crypto.createCipher("aes-256-ctr", this.encryptionKey);
    let encrypted = cipher.update(value, "utf8", "hex");
    encrypted += cipher.final("hex");
    return `encrypted:${encrypted}`;
  }

  decrypt(encryptedValue) {
    if (!encryptedValue.startsWith("encrypted:") || !this.encryptionKey) {
      return encryptedValue;
    }

    const encrypted = encryptedValue.slice(10);
    const decipher = crypto.createDecipher("aes-256-ctr", this.encryptionKey);
    let decrypted = decipher.update(encrypted, "hex", "utf8");
    decrypted += decipher.final("utf8");
    return decrypted;
  }

  async decryptConfiguration(config) {
    const decrypted = JSON.parse(JSON.stringify(config));
    return await this.decryptConfigObject(decrypted);
  }

  async decryptConfigObject(obj) {
    for (const key in obj) {
      if (typeof obj[key] === "object" && obj[key] !== null) {
        obj[key] = await this.decryptConfigObject(obj[key]);
      } else if (typeof obj[key] === "string") {
        obj[key] = this.decrypt(obj[key]);
      }
    }
    return obj;
  }

  async getSecret(secretName) {
    if (this.secretsProvider) {
      return await this.secretsProvider.getSecret(secretName);
    }
    return process.env[secretName.toUpperCase().replace("-", "_")];
  }

  calculateChecksum(config) {
    return crypto
      .createHash("sha256")
      .update(JSON.stringify(config))
      .digest("hex");
  }

  generateVersion() {
    return `v${Date.now()}-${Math.random().toString(36).substr(2, 5)}`;
  }

  calculateChanges(oldConfig, newConfig) {
    // Simplified change detection - in production, use a proper diff library
    const changes = {};

    if (!oldConfig) return { type: "created", fields: Object.keys(newConfig) };

    for (const key in newConfig) {
      if (JSON.stringify(oldConfig[key]) !== JSON.stringify(newConfig[key])) {
        changes[key] = {
          old: oldConfig[key],
          new: newConfig[key],
        };
      }
    }

    return changes;
  }

  initializeDefaults() {
    // Initialize with some default schemas
    this.registerSchema("user-service", {
      required: ["database", "redis"],
      properties: {
        database: {
          type: "object",
          required: ["url", "maxConnections"],
        },
        redis: {
          type: "object",
          required: ["host", "port"],
        },
        timeout: {
          type: "number",
          min: 1000,
          max: 60000,
        },
      },
    });
  }
}

// Configuration client for services
class ConfigurationClient {
  constructor(serviceUrl, serviceName, environment) {
    this.serviceUrl = serviceUrl;
    this.serviceName = serviceName;
    this.environment = environment;
    this.currentConfig = null;
    this.currentVersion = null;
    this.updateCallbacks = new Set();
  }

  async fetchConfiguration() {
    try {
      const response = await axios.get(
        `${this.serviceUrl}/config/${this.serviceName}/${this.environment}`
      );

      const { configuration, version, checksum } = response.data;

      // Check if configuration has changed
      if (version !== this.currentVersion) {
        console.log(
          `Configuration updated: ${this.serviceName}:${this.environment} v${version}`
        );

        const oldConfig = this.currentConfig;
        this.currentConfig = configuration;
        this.currentVersion = version;

        // Notify all registered callbacks
        for (const callback of this.updateCallbacks) {
          try {
            callback(configuration, oldConfig);
          } catch (error) {
            console.error("Configuration update callback failed:", error);
          }
        }
      }

      return configuration;
    } catch (error) {
      console.error("Failed to fetch configuration:", error.message);
      throw error;
    }
  }

  // Subscribe to configuration updates
  onConfigUpdate(callback) {
    this.updateCallbacks.add(callback);

    // Return unsubscribe function
    return () => {
      this.updateCallbacks.delete(callback);
    };
  }

  // Get current configuration
  getConfiguration() {
    return this.currentConfig;
  }

  // Start polling for configuration updates
  startPolling(intervalMs = 30000) {
    this.pollTimer = setInterval(() => {
      this.fetchConfiguration().catch((error) => {
        console.error("Configuration polling failed:", error.message);
      });
    }, intervalMs);
  }

  stopPolling() {
    if (this.pollTimer) {
      clearInterval(this.pollTimer);
      this.pollTimer = null;
    }
  }
}

module.exports = {
  ConfigurationService,
  ConfigurationClient,
};

Distributed Tracing and Monitoring: End-to-End Observability

From Service-Level Logs to Cross-Service Transaction Visibility

Understanding distributed tracing necessity:

// The distributed debugging nightmare without tracing
const distributedDebuggingChallenges = {
  // Without distributed tracing
  traditionalLogging: {
    problem: "Customer payment failed - error spans 8 services",
    debuggingProcess: [
      "Check API gateway logs: 'Payment request received'",
      "Check order service logs: 'Processing order-12345'",
      "Check user service logs: 'User validation successful'",
      "Check inventory service logs: 'Items reserved'",
      "Check payment service logs: 'Stripe API timeout'",
      "Check notification service logs: 'No payment confirmation event'",
      "Check audit service logs: 'Payment status unknown'",
    ],

    realChallenges: [
      "No correlation between logs across services",
      "Different timestamp formats make timeline reconstruction impossible",
      "Logs scattered across multiple systems and containers",
      "No visibility into service dependencies and call chains",
      "Performance bottlenecks invisible without request flow tracing",
    ],

    debuggingTime: "4-8 hours for simple payment flow issue",
    scalabilityProblem:
      "Debugging time increases exponentially with service count",
  },

  // With distributed tracing
  distributedTracing: {
    capability: "Single trace ID follows request across all services",
    benefits: [
      "Complete request flow visualization",
      "Service dependency mapping",
      "Performance bottleneck identification",
      "Error propagation analysis",
      "Database query optimization insights",
    ],

    debuggingProcess: [
      "Search by trace ID: trace-abc123",
      "See complete flow: Gateway → Order → User → Inventory → Payment",
      "Identify bottleneck: Payment service Stripe call took 15 seconds",
      "View error context: Timeout occurred after inventory reservation",
      "Analyze impact: Inventory locked but payment never processed",
    ],

    debuggingTime: "5-15 minutes for complex distributed issues",
    scalabilityBenefit:
      "Debugging time stays constant regardless of service count",
  },
};

// Distributed tracing concepts
const tracingConcepts = {
  trace: {
    definition: "Complete journey of a request through distributed system",
    contains: "Multiple spans representing work done in different services",
    lifespan: "From initial request to final response",
  },

  span: {
    definition: "Individual unit of work within a trace",
    represents:
      "Single operation like HTTP call, database query, or business logic",
    attributes: "Service name, operation name, start/end times, tags, logs",
  },

  context: {
    definition: "Metadata that travels with request across service boundaries",
    contains: "Trace ID, span ID, baggage items, sampling decisions",
    propagation: "Passed via HTTP headers, message queue metadata, etc.",
  },
};

Professional Distributed Tracing Implementation:

// distributed-tracing.js - Production-ready tracing implementation
const opentelemetry = require("@opentelemetry/api");
const { NodeSDK } = require("@opentelemetry/sdk-node");
const { JaegerExporter } = require("@opentelemetry/exporter-jaeger");
const { Resource } = require("@opentelemetry/resources");
const {
  SemanticResourceAttributes,
} = require("@opentelemetry/semantic-conventions");

class DistributedTracingService {
  constructor(serviceName, jaegerEndpoint) {
    this.serviceName = serviceName;
    this.jaegerEndpoint = jaegerEndpoint;
    this.tracer = null;
    this.initialized = false;

    this.initialize();
  }

  initialize() {
    // Configure OpenTelemetry SDK
    const sdk = new NodeSDK({
      resource: new Resource({
        [SemanticResourceAttributes.SERVICE_NAME]: this.serviceName,
        [SemanticResourceAttributes.SERVICE_VERSION]:
          process.env.SERVICE_VERSION || "1.0.0",
        [SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]:
          process.env.NODE_ENV || "development",
      }),

      traceExporter: new JaegerExporter({
        endpoint: this.jaegerEndpoint,
      }),
    });

    // Start the SDK
    sdk.start();

    this.tracer = opentelemetry.trace.getTracer(this.serviceName);
    this.initialized = true;

    console.log(
      `Distributed tracing initialized for service: ${this.serviceName}`
    );
  }

  // Create a new span
  startSpan(operationName, options = {}) {
    if (!this.initialized) {
      console.warn("Tracing not initialized");
      return { end: () => {}, setStatus: () => {}, setAttributes: () => {} };
    }

    const span = this.tracer.startSpan(operationName, {
      kind: options.kind || opentelemetry.SpanKind.INTERNAL,
      attributes: {
        "service.name": this.serviceName,
        ...options.attributes,
      },
    });

    // Add common attributes
    if (options.userId) {
      span.setAttributes({ "user.id": options.userId });
    }

    if (options.operation) {
      span.setAttributes({ "operation.type": options.operation });
    }

    return span;
  }

  // Trace an HTTP request
  traceHttpRequest(req, res, next) {
    const span = this.startSpan(
      `${req.method} ${req.route?.path || req.path}`,
      {
        kind: opentelemetry.SpanKind.SERVER,
        attributes: {
          "http.method": req.method,
          "http.url": req.url,
          "http.route": req.route?.path,
          "http.user_agent": req.get("User-Agent"),
          "http.request_content_length": req.get("Content-Length"),
        },
      }
    );

    // Extract trace context from headers
    const activeContext = opentelemetry.propagation.extract(
      opentelemetry.context.active(),
      req.headers
    );

    // Set the span as active
    opentelemetry.context.with(activeContext, () => {
      // Add request ID if available
      if (req.headers["x-request-id"]) {
        span.setAttributes({ "request.id": req.headers["x-request-id"] });
      }

      // Override res.end to capture response data
      const originalEnd = res.end;
      res.end = function (...args) {
        span.setAttributes({
          "http.status_code": res.statusCode,
          "http.response_content_length": res.get("Content-Length"),
        });

        if (res.statusCode >= 400) {
          span.setStatus({
            code: opentelemetry.SpanStatusCode.ERROR,
            message: `HTTP ${res.statusCode}`,
          });
        }

        span.end();
        originalEnd.apply(this, args);
      };

      next();
    });
  }

  // Trace database operations
  async traceDatabaseOperation(operationName, operation, options = {}) {
    const span = this.startSpan(`db.${operationName}`, {
      kind: opentelemetry.SpanKind.CLIENT,
      attributes: {
        "db.system": options.dbSystem || "postgresql",
        "db.name": options.dbName,
        "db.operation": operationName,
        "db.statement": options.query,
      },
    });

    try {
      const result = await operation();

      // Add result metadata
      if (result && typeof result === "object") {
        if (result.rowCount !== undefined) {
          span.setAttributes({ "db.rows_affected": result.rowCount });
        }
        if (result.length !== undefined) {
          span.setAttributes({ "db.rows_returned": result.length });
        }
      }

      span.setStatus({ code: opentelemetry.SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.setStatus({
        code: opentelemetry.SpanStatusCode.ERROR,
        message: error.message,
      });

      span.setAttributes({
        "error.name": error.name,
        "error.message": error.message,
      });

      throw error;
    } finally {
      span.end();
    }
  }

  // Trace external API calls
  async traceExternalCall(serviceName, operation, httpCall, options = {}) {
    const span = this.startSpan(`external.${serviceName}.${operation}`, {
      kind: opentelemetry.SpanKind.CLIENT,
      attributes: {
        "http.method": options.method || "GET",
        "http.url": options.url,
        "external.service": serviceName,
        "external.operation": operation,
      },
    });

    // Inject trace context into headers
    const headers = { ...options.headers };
    opentelemetry.propagation.inject(opentelemetry.context.active(), headers);

    try {
      const result = await httpCall({ ...options, headers });

      span.setAttributes({
        "http.status_code": result.status || 200,
        "http.response_size": result.data
          ? JSON.stringify(result.data).length
          : 0,
      });

      span.setStatus({ code: opentelemetry.SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.setStatus({
        code: opentelemetry.SpanStatusCode.ERROR,
        message: error.message,
      });

      span.setAttributes({
        "error.name": error.name,
        "error.message": error.message,
        "http.status_code": error.response?.status || 0,
      });

      throw error;
    } finally {
      span.end();
    }
  }

  // Trace business operations
  async traceBusinessOperation(operationName, operation, context = {}) {
    const span = this.startSpan(`business.${operationName}`, {
      attributes: {
        "business.operation": operationName,
        "business.entity.type": context.entityType,
        "business.entity.id": context.entityId,
        ...context.attributes,
      },
    });

    try {
      const result = await operation(span);

      // Add business-specific success metrics
      if (context.successMetrics) {
        span.setAttributes(context.successMetrics);
      }

      span.setStatus({ code: opentelemetry.SpanStatusCode.OK });
      return result;
    } catch (error) {
      span.setStatus({
        code: opentelemetry.SpanStatusCode.ERROR,
        message: error.message,
      });

      // Add business-specific error context
      span.setAttributes({
        "business.error.type": error.type || "unknown",
        "business.error.code": error.code || "UNKNOWN_ERROR",
      });

      throw error;
    } finally {
      span.end();
    }
  }

  // Add custom events to current span
  addEvent(eventName, attributes = {}) {
    const span = opentelemetry.trace.getActiveSpan();
    if (span) {
      span.addEvent(eventName, {
        timestamp: Date.now(),
        ...attributes,
      });
    }
  }

  // Add baggage item that propagates across services
  setBaggage(key, value) {
    const baggage = opentelemetry.baggage.getActiveBaggage();
    const newBaggage = baggage
      ? baggage.setEntry(key, { value })
      : opentelemetry.baggage.createBaggage({ [key]: { value } });

    return opentelemetry.baggage.setActiveBaggage(newBaggage);
  }

  // Get baggage item
  getBaggage(key) {
    const baggage = opentelemetry.baggage.getActiveBaggage();
    return baggage?.getEntry(key)?.value;
  }

  // Get current trace ID (useful for logging correlation)
  getCurrentTraceId() {
    const span = opentelemetry.trace.getActiveSpan();
    return span?.spanContext().traceId;
  }

  // Get current span ID
  getCurrentSpanId() {
    const span = opentelemetry.trace.getActiveSpan();
    return span?.spanContext().spanId;
  }
}

// Enhanced logging with trace correlation
class TracedLogger {
  constructor(serviceName, baseLogger = console) {
    this.serviceName = serviceName;
    this.baseLogger = baseLogger;
  }

  log(level, message, metadata = {}) {
    const traceId = this.getTraceId();
    const spanId = this.getSpanId();

    const logEntry = {
      timestamp: new Date().toISOString(),
      level,
      service: this.serviceName,
      message,
      traceId,
      spanId,
      ...metadata,
    };

    // Add to active span as event
    const span = opentelemetry.trace.getActiveSpan();
    if (span) {
      span.addEvent(`log.${level}`, {
        message,
        ...metadata,
      });
    }

    this.baseLogger.log(JSON.stringify(logEntry));
  }

  info(message, metadata) {
    this.log("info", message, metadata);
  }

  warn(message, metadata) {
    this.log("warn", message, metadata);
  }

  error(message, error, metadata = {}) {
    if (error instanceof Error) {
      metadata.error = {
        name: error.name,
        message: error.message,
        stack: error.stack,
      };
    }
    this.log("error", message, metadata);
  }

  getTraceId() {
    const span = opentelemetry.trace.getActiveSpan();
    return span?.spanContext().traceId || "no-trace";
  }

  getSpanId() {
    const span = opentelemetry.trace.getActiveSpan();
    return span?.spanContext().spanId || "no-span";
  }
}

module.exports = {
  DistributedTracingService,
  TracedLogger,
};

Circuit Breakers and Fault Tolerance: Preventing Cascade Failures

From Synchronous Failures to Resilient System Design

Understanding circuit breaker patterns:

// Circuit breaker pattern for fault tolerance
const circuitBreakerStates = {
  CLOSED: {
    description: "Normal operation - requests pass through",
    behavior: "Monitor failure rate and response times",
    transition: "Opens when failure threshold exceeded",
  },

  OPEN: {
    description: "Failing fast - reject requests immediately",
    behavior: "Return cached response or default value",
    transition: "Goes to HALF_OPEN after timeout period",
  },

  HALF_OPEN: {
    description: "Testing recovery - allow limited requests",
    behavior: "Let few requests through to test service health",
    transition: "CLOSED if success, OPEN if still failing",
  },
};

const faultTolerancePatterns = {
  circuitBreaker: {
    purpose: "Prevent cascade failures by failing fast",
    useCase: "External service calls that might be slow or failing",
  },

  bulkhead: {
    purpose: "Isolate resources to prevent failure spread",
    useCase: "Separate thread pools for different service calls",
  },

  timeout: {
    purpose: "Prevent hanging requests from consuming resources",
    useCase: "All network calls and database operations",
  },

  retry: {
    purpose: "Handle transient failures with smart backoff",
    useCase: "Network blips, temporary service unavailability",
  },

  fallback: {
    purpose: "Provide graceful degradation when services fail",
    useCase: "Cached data, default values, alternative workflows",
  },
};

Professional Circuit Breaker Implementation:

// circuit-breaker.js - Production-ready circuit breaker with metrics
const EventEmitter = require("events");

class CircuitBreaker extends EventEmitter {
  constructor(operation, options = {}) {
    super();

    this.operation = operation;
    this.state = "CLOSED";
    this.failureCount = 0;
    this.successCount = 0;
    this.requestCount = 0;
    this.lastFailureTime = null;

    // Configuration
    this.failureThreshold = options.failureThreshold || 5;
    this.recoveryTimeout = options.recoveryTimeout || 60000; // 1 minute
    this.monitoringPeriod = options.monitoringPeriod || 10000; // 10 seconds
    this.halfOpenMaxRequests = options.halfOpenMaxRequests || 3;
    this.timeoutDuration = options.timeout || 30000;

    // Metrics
    this.metrics = {
      totalRequests: 0,
      totalFailures: 0,
      totalSuccesses: 0,
      totalTimeouts: 0,
      totalCircuitBreakerRejects: 0,
      averageResponseTime: 0,
      lastResetTime: Date.now(),
    };

    // Fallback options
    this.fallbackFunction = options.fallback;
    this.cacheEnabled = options.enableCache || false;
    this.cache = new Map();
    this.cacheTimeout = options.cacheTimeout || 300000; // 5 minutes

    // Reset metrics periodically
    this.resetTimer = setInterval(
      () => this.resetMetrics(),
      this.monitoringPeriod
    );
  }

  async execute(args) {
    this.metrics.totalRequests++;

    // Check current state
    if (this.state === "OPEN") {
      if (this.shouldAttemptReset()) {
        this.state = "HALF_OPEN";
        this.successCount = 0;
        console.log("Circuit breaker transitioning to HALF_OPEN");
      } else {
        this.metrics.totalCircuitBreakerRejects++;
        return this.handleFallback("Circuit breaker is OPEN", args);
      }
    }

    if (
      this.state === "HALF_OPEN" &&
      this.successCount >= this.halfOpenMaxRequests
    ) {
      this.metrics.totalCircuitBreakerRejects++;
      return this.handleFallback(
        "Circuit breaker HALF_OPEN limit exceeded",
        args
      );
    }

    // Execute the operation with timeout
    const startTime = Date.now();

    try {
      const result = await this.executeWithTimeout(args);

      // Success
      const responseTime = Date.now() - startTime;
      this.onSuccess(responseTime);

      // Cache successful results if enabled
      if (this.cacheEnabled) {
        const cacheKey = this.generateCacheKey(args);
        this.cache.set(cacheKey, {
          result,
          timestamp: Date.now(),
        });
      }

      return result;
    } catch (error) {
      const responseTime = Date.now() - startTime;
      this.onFailure(error, responseTime);

      return this.handleFallback(error.message, args, error);
    }
  }

  async executeWithTimeout(args) {
    return new Promise((resolve, reject) => {
      const timeoutId = setTimeout(() => {
        this.metrics.totalTimeouts++;
        reject(
          new Error(`Operation timed out after ${this.timeoutDuration}ms`)
        );
      }, this.timeoutDuration);

      this.operation(args)
        .then((result) => {
          clearTimeout(timeoutId);
          resolve(result);
        })
        .catch((error) => {
          clearTimeout(timeoutId);
          reject(error);
        });
    });
  }

  onSuccess(responseTime) {
    this.successCount++;
    this.metrics.totalSuccesses++;
    this.updateAverageResponseTime(responseTime);

    if (this.state === "HALF_OPEN") {
      if (this.successCount >= this.halfOpenMaxRequests) {
        this.reset();
        console.log("Circuit breaker closed after successful recovery");
      }
    }

    this.emit("success", {
      state: this.state,
      successCount: this.successCount,
      responseTime,
    });
  }

  onFailure(error, responseTime) {
    this.failureCount++;
    this.metrics.totalFailures++;
    this.lastFailureTime = Date.now();
    this.updateAverageResponseTime(responseTime);

    if (this.state === "HALF_OPEN") {
      this.trip();
      console.log("Circuit breaker opened - failure during recovery test");
    } else if (this.failureCount >= this.failureThreshold) {
      this.trip();
      console.log(
        `Circuit breaker opened - failure threshold (${this.failureThreshold}) exceeded`
      );
    }

    this.emit("failure", {
      state: this.state,
      failureCount: this.failureCount,
      error: error.message,
      responseTime,
    });
  }

  shouldAttemptReset() {
    return Date.now() - this.lastFailureTime >= this.recoveryTimeout;
  }

  trip() {
    this.state = "OPEN";
    this.emit("open", {
      failureCount: this.failureCount,
      timestamp: new Date(),
    });
  }

  reset() {
    this.state = "CLOSED";
    this.failureCount = 0;
    this.successCount = 0;

    this.emit("closed", {
      timestamp: new Date(),
      metrics: this.getMetrics(),
    });
  }

  async handleFallback(errorMessage, args, originalError) {
    // Try cache first
    if (this.cacheEnabled) {
      const cached = this.getCachedResult(args);
      if (cached) {
        console.log("Returning cached result due to circuit breaker");
        return cached.result;
      }
    }

    // Try fallback function
    if (this.fallbackFunction) {
      try {
        console.log("Executing fallback function");
        return await this.fallbackFunction(args, originalError);
      } catch (fallbackError) {
        console.error("Fallback function failed:", fallbackError.message);
      }
    }

    // No fallback available
    const error = new Error(`Circuit breaker failure: ${errorMessage}`);
    error.circuitBreakerState = this.state;
    error.originalError = originalError;

    throw error;
  }

  getCachedResult(args) {
    if (!this.cacheEnabled) return null;

    const cacheKey = this.generateCacheKey(args);
    const cached = this.cache.get(cacheKey);

    if (cached && Date.now() - cached.timestamp < this.cacheTimeout) {
      return cached;
    }

    // Remove expired cache entry
    if (cached) {
      this.cache.delete(cacheKey);
    }

    return null;
  }

  generateCacheKey(args) {
    return JSON.stringify(args);
  }

  updateAverageResponseTime(responseTime) {
    const totalRequests =
      this.metrics.totalSuccesses + this.metrics.totalFailures;
    this.metrics.averageResponseTime =
      (this.metrics.averageResponseTime * (totalRequests - 1) + responseTime) /
      totalRequests;
  }

  resetMetrics() {
    // Reset counters for the monitoring period
    this.requestCount = 0;
    this.failureCount = 0;
    this.successCount = 0;

    this.emit("metricsReset", this.getMetrics());
  }

  getMetrics() {
    const failureRate =
      this.metrics.totalRequests > 0
        ? (this.metrics.totalFailures / this.metrics.totalRequests) * 100
        : 0;

    return {
      state: this.state,
      totalRequests: this.metrics.totalRequests,
      totalSuccesses: this.metrics.totalSuccesses,
      totalFailures: this.metrics.totalFailures,
      totalTimeouts: this.metrics.totalTimeouts,
      totalCircuitBreakerRejects: this.metrics.totalCircuitBreakerRejects,
      failureRate: Math.round(failureRate * 100) / 100,
      averageResponseTime: Math.round(this.metrics.averageResponseTime),
      uptime: Date.now() - this.metrics.lastResetTime,
      cacheHits: this.cacheEnabled ? this.cache.size : 0,
    };
  }

  // Manual controls
  forceOpen() {
    this.state = "OPEN";
    console.log("Circuit breaker manually opened");
  }

  forceClosed() {
    this.reset();
    console.log("Circuit breaker manually closed");
  }

  clearCache() {
    if (this.cacheEnabled) {
      this.cache.clear();
      console.log("Circuit breaker cache cleared");
    }
  }

  // Cleanup
  destroy() {
    if (this.resetTimer) {
      clearInterval(this.resetTimer);
    }
    this.removeAllListeners();
  }
}

// Bulkhead pattern implementation
class Bulkhead {
  constructor(name, options = {}) {
    this.name = name;
    this.maxConcurrent = options.maxConcurrent || 10;
    this.queue = [];
    this.activeRequests = 0;

    this.metrics = {
      totalRequests: 0,
      completedRequests: 0,
      rejectedRequests: 0,
      queuedRequests: 0,
      averageWaitTime: 0,
    };
  }

  async execute(operation) {
    this.metrics.totalRequests++;

    if (this.activeRequests >= this.maxConcurrent) {
      this.metrics.rejectedRequests++;
      throw new Error(
        `Bulkhead ${this.name} at capacity (${this.maxConcurrent})`
      );
    }

    this.activeRequests++;
    const startTime = Date.now();

    try {
      const result = await operation();
      this.metrics.completedRequests++;

      const waitTime = Date.now() - startTime;
      this.updateAverageWaitTime(waitTime);

      return result;
    } catch (error) {
      throw error;
    } finally {
      this.activeRequests--;
    }
  }

  updateAverageWaitTime(waitTime) {
    const completed = this.metrics.completedRequests;
    this.metrics.averageWaitTime =
      (this.metrics.averageWaitTime * (completed - 1) + waitTime) / completed;
  }

  getStatus() {
    return {
      name: this.name,
      activeRequests: this.activeRequests,
      maxConcurrent: this.maxConcurrent,
      utilizationRate: (this.activeRequests / this.maxConcurrent) * 100,
      ...this.metrics,
    };
  }
}

// Combined resilience patterns
class ResilientServiceClient {
  constructor(serviceName, options = {}) {
    this.serviceName = serviceName;

    // Circuit breaker
    this.circuitBreaker = new CircuitBreaker(this.makeRequest.bind(this), {
      failureThreshold: options.failureThreshold || 5,
      recoveryTimeout: options.recoveryTimeout || 60000,
      timeout: options.timeout || 30000,
      fallback: options.fallback,
      enableCache: options.enableCache || true,
    });

    // Bulkhead
    this.bulkhead = new Bulkhead(serviceName, {
      maxConcurrent: options.maxConcurrent || 20,
    });

    // Retry configuration
    this.retryAttempts = options.retryAttempts || 3;
    this.retryDelay = options.retryDelay || 1000;

    this.baseUrl = options.baseUrl;
    this.httpClient = options.httpClient || require("axios");
  }

  async call(endpoint, data, options = {}) {
    return await this.bulkhead.execute(async () => {
      return await this.circuitBreaker.execute({ endpoint, data, options });
    });
  }

  async makeRequest({ endpoint, data, options }) {
    let lastError;

    for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
      try {
        const config = {
          url: `${this.baseUrl}${endpoint}`,
          method: options.method || "GET",
          timeout: options.timeout || 30000,
          headers: {
            "X-Service-Name": this.serviceName,
            ...options.headers,
          },
        };

        if (data) {
          config.data = data;
        }

        const response = await this.httpClient(config);
        return response.data;
      } catch (error) {
        lastError = error;

        if (attempt < this.retryAttempts && this.shouldRetry(error)) {
          const delay = this.retryDelay * Math.pow(2, attempt - 1);
          console.log(
            `Retry attempt ${attempt} for ${this.serviceName} in ${delay}ms`
          );
          await this.sleep(delay);
          continue;
        }

        break;
      }
    }

    throw lastError;
  }

  shouldRetry(error) {
    // Retry on network errors or 5xx responses
    if (!error.response) return true;

    const status = error.response.status;
    return status >= 500 || status === 408 || status === 429;
  }

  sleep(ms) {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }

  getHealthStatus() {
    return {
      serviceName: this.serviceName,
      circuitBreaker: this.circuitBreaker.getMetrics(),
      bulkhead: this.bulkhead.getStatus(),
    };
  }
}

module.exports = {
  CircuitBreaker,
  Bulkhead,
  ResilientServiceClient,
};

Advanced Event-Driven Architecture: Resilient Asynchronous Communication

From Request-Response to Event-First System Design

Event-driven architecture patterns for resilience:

// Event-driven resilience patterns
const eventDrivenPatterns = {
  eventSourcing: {
    principle: "Store events, not current state",
    benefits: [
      "Complete audit trail of all changes",
      "Can rebuild any past state",
      "Natural integration with event-driven systems",
      "Temporal queries and time-travel debugging",
    ],
    challenges: [
      "Event versioning and schema evolution",
      "Snapshot strategies for performance",
      "Complex queries require projections",
    ],
  },

  cqrs: {
    principle: "Separate read and write models",
    benefits: [
      "Optimized read and write paths",
      "Independent scaling of queries and commands",
      "Multiple read models for different use cases",
    ],
    implementation: "Events bridge command side to query side",
  },

  sagaPattern: {
    principle: "Distributed transactions via compensating actions",
    choreography: "Services react to events and publish new ones",
    orchestration: "Central coordinator manages the workflow",
    resilience: "Natural handling of partial failures and recovery",
  },

  eventualConsistency: {
    principle: "Accept temporary inconsistency for availability",
    benefits: [
      "System remains available during network partitions",
      "Natural horizontal scaling",
      "Loose coupling between services",
    ],
    challenges: [
      "Complex conflict resolution",
      "User experience considerations",
      "Debugging distributed state",
    ],
  },
};

Professional Event-Driven Implementation:

// event-driven-system.js - Production event-driven architecture
const EventEmitter = require("events");

class EventStore {
  constructor(options = {}) {
    this.events = new Map(); // In production: use PostgreSQL/EventStore
    this.snapshots = new Map();
    this.subscriptions = new Map();
    this.streamVersions = new Map();

    this.snapshotFrequency = options.snapshotFrequency || 50;
    this.maxRetries = options.maxRetries || 3;
  }

  // Append events to a stream
  async appendToStream(streamId, expectedVersion, events) {
    const currentVersion = this.streamVersions.get(streamId) || 0;

    if (expectedVersion !== -1 && currentVersion !== expectedVersion) {
      throw new Error(
        `Concurrency conflict: expected version ${expectedVersion}, current version ${currentVersion}`
      );
    }

    const streamEvents = this.events.get(streamId) || [];
    let newVersion = currentVersion;

    for (const event of events) {
      newVersion++;
      const eventWithMetadata = {
        ...event,
        streamId,
        version: newVersion,
        eventId: this.generateEventId(),
        timestamp: new Date().toISOString(),
        correlationId: event.correlationId || this.generateCorrelationId(),
        causationId: event.causationId,
      };

      streamEvents.push(eventWithMetadata);

      // Publish event to subscribers
      await this.publishEvent(eventWithMetadata);
    }

    this.events.set(streamId, streamEvents);
    this.streamVersions.set(streamId, newVersion);

    // Create snapshot if needed
    if (newVersion % this.snapshotFrequency === 0) {
      await this.createSnapshot(streamId, newVersion);
    }

    console.log(
      `Appended ${events.length} events to stream ${streamId}, new version: ${newVersion}`
    );
    return newVersion;
  }

  // Read events from a stream
  async readStream(streamId, fromVersion = 0, maxCount = 1000) {
    const streamEvents = this.events.get(streamId) || [];

    return streamEvents
      .filter((event) => event.version > fromVersion)
      .slice(0, maxCount);
  }

  // Read all events by event type
  async readEventsByType(eventType, fromTimestamp = null) {
    const allEvents = [];

    for (const streamEvents of this.events.values()) {
      for (const event of streamEvents) {
        if (event.eventType === eventType) {
          if (!fromTimestamp || new Date(event.timestamp) >= fromTimestamp) {
            allEvents.push(event);
          }
        }
      }
    }

    return allEvents.sort(
      (a, b) => new Date(a.timestamp) - new Date(b.timestamp)
    );
  }

  // Subscribe to events
  subscribe(eventType, handler, options = {}) {
    if (!this.subscriptions.has(eventType)) {
      this.subscriptions.set(eventType, new Set());
    }

    const subscription = {
      handler,
      options: {
        fromTimestamp: options.fromTimestamp,
        retryPolicy: options.retryPolicy || "exponential",
        maxRetries: options.maxRetries || this.maxRetries,
        ...options,
      },
      failures: 0,
      lastProcessed: null,
    };

    this.subscriptions.get(eventType).add(subscription);

    console.log(`Subscribed to events of type: ${eventType}`);

    return () => {
      this.subscriptions.get(eventType).delete(subscription);
    };
  }

  // Publish event to subscribers
  async publishEvent(event) {
    const subscribers = this.subscriptions.get(event.eventType) || new Set();

    const publishPromises = [];

    for (const subscription of subscribers) {
      publishPromises.push(this.deliverEventToSubscriber(event, subscription));
    }

    await Promise.allSettled(publishPromises);
  }

  async deliverEventToSubscriber(event, subscription) {
    // Skip if we've already processed this event
    if (
      subscription.lastProcessed &&
      new Date(event.timestamp) <= new Date(subscription.lastProcessed)
    ) {
      return;
    }

    try {
      await subscription.handler(event);
      subscription.lastProcessed = event.timestamp;
      subscription.failures = 0;

      console.log(`Event delivered: ${event.eventType} to subscriber`);
    } catch (error) {
      subscription.failures++;

      console.error(`Event delivery failed: ${event.eventType}`, error.message);

      if (subscription.failures < subscription.options.maxRetries) {
        // Schedule retry
        setTimeout(() => {
          this.deliverEventToSubscriber(event, subscription);
        }, this.calculateRetryDelay(subscription.failures, subscription.options.retryPolicy));
      } else {
        console.error(`Max retries exceeded for event: ${event.eventId}`);
        // Send to dead letter queue in production
      }
    }
  }

  calculateRetryDelay(attempt, retryPolicy) {
    switch (retryPolicy) {
      case "linear":
        return attempt * 1000;
      case "exponential":
        return Math.pow(2, attempt) * 1000;
      default:
        return 1000;
    }
  }

  // Create snapshot for performance
  async createSnapshot(streamId, version) {
    // Rebuild aggregate state from events
    const events = await this.readStream(streamId);
    const aggregateState = this.rebuildAggregateState(events);

    const snapshot = {
      streamId,
      version,
      state: aggregateState,
      timestamp: new Date().toISOString(),
    };

    this.snapshots.set(`${streamId}:${version}`, snapshot);
    console.log(
      `Snapshot created for stream ${streamId} at version ${version}`
    );
  }

  // Load aggregate with snapshot optimization
  async loadAggregate(streamId, maxVersion = null) {
    // Find latest snapshot before maxVersion
    let latestSnapshot = null;
    let snapshotVersion = 0;

    for (const [key, snapshot] of this.snapshots) {
      if (
        key.startsWith(`${streamId}:`) &&
        (!maxVersion || snapshot.version <= maxVersion) &&
        snapshot.version > snapshotVersion
      ) {
        latestSnapshot = snapshot;
        snapshotVersion = snapshot.version;
      }
    }

    // Start from snapshot or beginning
    let aggregateState = latestSnapshot?.state || {};
    const fromVersion = snapshotVersion;

    // Apply events after snapshot
    const events = await this.readStream(streamId, fromVersion);
    const eventsToApply = maxVersion
      ? events.filter((e) => e.version <= maxVersion)
      : events;

    for (const event of eventsToApply) {
      aggregateState = this.applyEvent(aggregateState, event);
    }

    return {
      streamId,
      version:
        eventsToApply.length > 0
          ? Math.max(...eventsToApply.map((e) => e.version))
          : snapshotVersion,
      state: aggregateState,
    };
  }

  // Helper methods
  rebuildAggregateState(events) {
    let state = {};
    for (const event of events) {
      state = this.applyEvent(state, event);
    }
    return state;
  }

  applyEvent(state, event) {
    // This would be aggregate-specific logic in production
    switch (event.eventType) {
      case "OrderCreated":
        return {
          ...state,
          id: event.data.orderId,
          status: "created",
          items: event.data.items,
          total: event.data.total,
          createdAt: event.timestamp,
        };

      case "OrderPaid":
        return {
          ...state,
          status: "paid",
          paidAt: event.timestamp,
          paymentId: event.data.paymentId,
        };

      case "OrderShipped":
        return {
          ...state,
          status: "shipped",
          shippedAt: event.timestamp,
          trackingNumber: event.data.trackingNumber,
        };

      default:
        return state;
    }
  }

  generateEventId() {
    return `event-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }

  generateCorrelationId() {
    return `corr-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
  }

  // Projections for read models
  async createProjection(projectionName, eventTypes, projectionHandler) {
    console.log(
      `Creating projection: ${projectionName} for events: ${eventTypes.join(
        ", "
      )}`
    );

    for (const eventType of eventTypes) {
      this.subscribe(eventType, async (event) => {
        try {
          await projectionHandler(projectionName, event);
        } catch (error) {
          console.error(
            `Projection ${projectionName} failed for event ${event.eventId}:`,
            error
          );
        }
      });
    }
  }
}

// Projection management for read models
class ProjectionManager {
  constructor(eventStore) {
    this.eventStore = eventStore;
    this.projections = new Map();
    this.readModels = new Map(); // In production: use appropriate database
  }

  // Register a projection
  registerProjection(name, config) {
    const projection = {
      name,
      eventTypes: config.eventTypes,
      handler: config.handler,
      readModel: config.readModel || name,
      state: { lastProcessedEvent: null, version: 0 },
    };

    this.projections.set(name, projection);

    // Subscribe to relevant events
    for (const eventType of config.eventTypes) {
      this.eventStore.subscribe(eventType, async (event) => {
        await this.processEventForProjection(name, event);
      });
    }

    console.log(`Projection registered: ${name}`);
  }

  async processEventForProjection(projectionName, event) {
    const projection = this.projections.get(projectionName);
    if (!projection) return;

    try {
      // Get current read model state
      const currentReadModel = this.readModels.get(projection.readModel) || {};

      // Apply projection logic
      const updatedReadModel = await projection.handler(
        currentReadModel,
        event
      );

      // Save updated read model
      this.readModels.set(projection.readModel, updatedReadModel);

      // Update projection state
      projection.state.lastProcessedEvent = event.eventId;
      projection.state.version++;

      console.log(
        `Projection ${projectionName} processed event ${event.eventType}`
      );
    } catch (error) {
      console.error(`Projection ${projectionName} failed:`, error);
      throw error;
    }
  }

  // Get read model data
  getReadModel(readModelName) {
    return this.readModels.get(readModelName);
  }

  // Rebuild projection from events
  async rebuildProjection(projectionName, fromTimestamp = null) {
    const projection = this.projections.get(projectionName);
    if (!projection) {
      throw new Error(`Projection ${projectionName} not found`);
    }

    console.log(`Rebuilding projection: ${projectionName}`);

    // Clear current read model
    this.readModels.delete(projection.readModel);

    // Process all relevant events
    for (const eventType of projection.eventTypes) {
      const events = await this.eventStore.readEventsByType(
        eventType,
        fromTimestamp
      );

      for (const event of events) {
        await this.processEventForProjection(projectionName, event);
      }
    }

    console.log(`Projection rebuilt: ${projectionName}`);
  }
}

// Saga orchestrator for distributed transactions
class SagaOrchestrator extends EventEmitter {
  constructor(eventStore) {
    super();
    this.eventStore = eventStore;
    this.activeSagas = new Map();
    this.sagaDefinitions = new Map();
  }

  // Define a saga workflow
  defineSaga(sagaType, definition) {
    this.sagaDefinitions.set(sagaType, {
      steps: definition.steps,
      compensations: definition.compensations,
      timeout: definition.timeout || 300000, // 5 minutes
    });

    console.log(
      `Saga defined: ${sagaType} with ${definition.steps.length} steps`
    );
  }

  // Start a saga instance
  async startSaga(sagaType, sagaId, initialData) {
    const definition = this.sagaDefinitions.get(sagaType);
    if (!definition) {
      throw new Error(`Saga definition not found: ${sagaType}`);
    }

    const sagaInstance = {
      sagaId,
      sagaType,
      currentStep: 0,
      status: "running",
      data: initialData,
      completedSteps: [],
      startedAt: new Date(),
      timeoutAt: new Date(Date.now() + definition.timeout),
    };

    this.activeSagas.set(sagaId, sagaInstance);

    // Store saga start event
    await this.eventStore.appendToStream(`saga-${sagaId}`, -1, [
      {
        eventType: "SagaStarted",
        data: { sagaId, sagaType, initialData },
      },
    ]);

    // Execute first step
    await this.executeNextStep(sagaInstance);

    return sagaId;
  }

  // Execute next step in saga
  async executeNextStep(sagaInstance) {
    const definition = this.sagaDefinitions.get(sagaInstance.sagaType);
    const step = definition.steps[sagaInstance.currentStep];

    if (!step) {
      // Saga completed successfully
      await this.completeSaga(sagaInstance);
      return;
    }

    try {
      console.log(
        `Executing saga step: ${step.name} for ${sagaInstance.sagaId}`
      );

      const result = await step.execute(sagaInstance.data);

      // Update saga state
      sagaInstance.data = { ...sagaInstance.data, ...result };
      sagaInstance.completedSteps.push({
        stepIndex: sagaInstance.currentStep,
        stepName: step.name,
        completedAt: new Date(),
        result,
      });
      sagaInstance.currentStep++;

      // Store step completion event
      await this.eventStore.appendToStream(`saga-${sagaInstance.sagaId}`, -1, [
        {
          eventType: "SagaStepCompleted",
          data: {
            sagaId: sagaInstance.sagaId,
            stepName: step.name,
            result,
          },
        },
      ]);

      // Execute next step
      await this.executeNextStep(sagaInstance);
    } catch (error) {
      console.error(`Saga step failed: ${step.name}`, error.message);
      await this.compensateSaga(sagaInstance, error);
    }
  }

  // Compensate saga on failure
  async compensateSaga(sagaInstance, error) {
    sagaInstance.status = "compensating";

    const definition = this.sagaDefinitions.get(sagaInstance.sagaType);

    // Execute compensations in reverse order
    for (const completedStep of sagaInstance.completedSteps.reverse()) {
      const compensation = definition.compensations[completedStep.stepIndex];

      if (compensation) {
        try {
          console.log(`Compensating step: ${completedStep.stepName}`);
          await compensation.execute(sagaInstance.data, completedStep.result);
        } catch (compensationError) {
          console.error(
            `Compensation failed for ${completedStep.stepName}:`,
            compensationError.message
          );
        }
      }
    }

    sagaInstance.status = "failed";
    sagaInstance.failedAt = new Date();
    sagaInstance.error = error.message;

    // Store saga failure event
    await this.eventStore.appendToStream(`saga-${sagaInstance.sagaId}`, -1, [
      {
        eventType: "SagaFailed",
        data: {
          sagaId: sagaInstance.sagaId,
          error: error.message,
          completedSteps: sagaInstance.completedSteps,
        },
      },
    ]);

    this.emit("sagaFailed", sagaInstance);
  }

  // Complete saga successfully
  async completeSaga(sagaInstance) {
    sagaInstance.status = "completed";
    sagaInstance.completedAt = new Date();

    await this.eventStore.appendToStream(`saga-${sagaInstance.sagaId}`, -1, [
      {
        eventType: "SagaCompleted",
        data: {
          sagaId: sagaInstance.sagaId,
          result: sagaInstance.data,
          duration: sagaInstance.completedAt - sagaInstance.startedAt,
        },
      },
    ]);

    this.activeSagas.delete(sagaInstance.sagaId);
    this.emit("sagaCompleted", sagaInstance);

    console.log(`Saga completed: ${sagaInstance.sagaId}`);
  }

  // Get saga status
  getSagaStatus(sagaId) {
    return this.activeSagas.get(sagaId);
  }

  // Get all active sagas
  getAllActiveSagas() {
    return Array.from(this.activeSagas.values());
  }
}

module.exports = {
  EventStore,
  ProjectionManager,
  SagaOrchestrator,
};

The Reality Check: When Distributed Systems Actually Work

Here’s the uncomfortable truth nobody talks about in those fancy conference presentations: Most teams implement distributed systems patterns poorly and end up with more problems than they started with.

The difference between success and disaster isn’t just technical implementation—it’s operational maturity.

Ask yourself honestly:

  • Service Discovery: Can your services find each other when pods restart at 3 AM?
  • Configuration Management: Can you change a database connection string without redeploying 12 services?
  • Distributed Tracing: When payments fail, can you trace the entire flow in under 10 minutes?
  • Circuit Breakers: When your recommendation service goes down, does your entire checkout flow break?
  • Event-Driven Architecture: Can your system handle network partitions without losing data?

If you answered “no” to any of these, you’re not ready for production microservices. And that’s okay—most teams aren’t.

The progression that actually works:

  1. Start with a well-structured monolith with clear domain boundaries
  2. Master your operational basics (CI/CD, monitoring, logging)
  3. Extract your first service when you have a clear business need and operational capability
  4. Implement these distributed patterns one at a time, not all at once
  5. Scale your team’s expertise along with your architecture

Remember: Netflix didn’t start with 700+ microservices. They evolved there over years, with massive engineering teams and world-class operational expertise.

The real test isn’t whether you can build these patterns—it’s whether you can sleep peacefully after deploying them to production.


What’s Next?

You’ve now mastered the distributed systems patterns that make microservices actually work in production. You understand service discovery, configuration management, distributed tracing, fault tolerance, and event-driven architecture—the operational backbone of resilient distributed systems.

In our next blog (part 3/3), we’ll cover the final piece of the microservices puzzle: Deployment Strategies, Testing Patterns, and Migration from Monoliths—the practical implementation guide that turns your architectural knowledge into production-ready systems.

But first, go implement these patterns in your development environment. Set up a service registry, add distributed tracing to a few services, implement a simple circuit breaker. Feel the complexity before you need to handle it in production.

The difference between microservices that work and microservices that become your worst nightmare is mastering these distributed system fundamentals. Your future self (and your on-call rotation) will thank you.