Real-time Communications - 1/2

The $200,000 Real-time Disaster That Made Headlines

Picture this catastrophe: A major gaming company launches their highly anticipated multiplayer battle royale game. Within hours, 500,000 players flood their servers. The game has stunning graphics, incredible gameplay mechanics, and one fatal flaw—their real-time communication system.

Players start experiencing:

  • 5-10 second delays in seeing other players’ movements
  • Messages in team chat arriving 30 seconds late
  • Players appearing to teleport across the map
  • Complete disconnections during crucial moments
  • Server crashes every 20 minutes during peak hours

By day three, their player base had dropped to 50,000. Streamers were mocking the game live on Twitch. Reddit was filled with memes about “the lag simulator.” The company’s stock dropped 15%.

The devastating part? Their backend could handle the load. Their database was optimized. Their API responses were sub-100ms. But they had built their real-time system using HTTP polling every 500ms instead of proper WebSocket connections.

Within two weeks, they had to shut down servers, refund millions in pre-orders, and lay off 30% of their development team. All because they treated real-time communication like regular REST API calls.

The Uncomfortable Truth About Real-time Systems

Here’s what separates applications that feel responsive from those that feel like they’re running underwater: Real-time communication isn’t just about speed—it’s about building persistent, bidirectional connections that can handle thousands of concurrent users without breaking a sweat.

Most developers approach real-time features like this:

  1. Start with HTTP polling (“just check every second”)
  2. Realize polling is inefficient
  3. Try to optimize by polling less frequently
  4. Users complain about lag
  5. Switch to WebSockets without understanding the architecture
  6. Experience connection drops and memory leaks
  7. Panic and go back to polling

But experienced developers build real-time systems differently:

  1. Design for persistent connections from day one
  2. Implement proper connection lifecycle management
  3. Build in reconnection and state synchronization
  4. Plan for horizontal scaling across multiple servers
  5. Monitor connection health and performance continuously

The difference isn’t just user experience—it’s the difference between systems that can support real-time collaboration at scale and systems that collapse under the weight of their own connection overhead.

Ready to build real-time features that work like Discord instead of that group chat app you built in college? Let’s dive into the world of WebSockets, connection management, and the architecture patterns that power real-time applications.


WebSocket Fundamentals: Beyond Request-Response

Understanding the WebSocket Protocol

WebSockets aren’t just “fast HTTP”—they’re a completely different communication paradigm that enables truly bidirectional communication.

// Poor real-time implementation with HTTP polling
class BadRealTimeService {
  private pollInterval: NodeJS.Timer;
  private lastMessageId: string = "";

  startPolling(userId: string): void {
    // This approach will kill your servers and frustrate users
    this.pollInterval = setInterval(async () => {
      try {
        const messages = await fetch(
          `/api/messages?after=${this.lastMessageId}&userId=${userId}`
        );
        const newMessages = await messages.json();

        if (newMessages.length > 0) {
          this.lastMessageId = newMessages[newMessages.length - 1].id;
          this.displayMessages(newMessages);
        }
      } catch (error) {
        console.error("Polling failed:", error);
        // What happens when the network is slow? More requests pile up!
      }
    }, 500); // Every 500ms = 2 requests per second per user

    // 10,000 users = 20,000 requests per second just for chat!
  }

  stopPolling(): void {
    clearInterval(this.pollInterval);
  }

  sendMessage(message: string): void {
    // Still need HTTP for sending
    fetch("/api/messages", {
      method: "POST",
      body: JSON.stringify({ message }),
      headers: { "Content-Type": "application/json" },
    });
  }
}
// Proper WebSocket implementation
class WebSocketRealTimeService {
  private ws: WebSocket | null = null;
  private reconnectAttempts: number = 0;
  private maxReconnectAttempts: number = 5;
  private reconnectDelay: number = 1000;
  private heartbeatInterval: NodeJS.Timer | null = null;
  private messageQueue: QueuedMessage[] = [];
  private connectionState:
    | "connecting"
    | "connected"
    | "disconnected"
    | "reconnecting" = "disconnected";

  async connect(userId: string, token: string): Promise<void> {
    if (this.ws?.readyState === WebSocket.OPEN) {
      return; // Already connected
    }

    this.connectionState = "connecting";

    try {
      // Establish WebSocket connection with authentication
      this.ws = new WebSocket(
        `wss://api.example.com/ws?token=${token}&userId=${userId}`
      );

      this.setupEventHandlers();

      // Wait for connection to establish
      await this.waitForConnection();

      this.connectionState = "connected";
      this.reconnectAttempts = 0;

      // Start heartbeat to keep connection alive
      this.startHeartbeat();

      // Process any queued messages
      await this.processMessageQueue();

      console.log("WebSocket connected successfully");
    } catch (error) {
      console.error("WebSocket connection failed:", error);
      this.handleConnectionError();
    }
  }

  private setupEventHandlers(): void {
    if (!this.ws) return;

    this.ws.onopen = (event) => {
      console.log("WebSocket connection opened");
      this.onConnectionOpen(event);
    };

    this.ws.onmessage = (event) => {
      this.handleIncomingMessage(event.data);
    };

    this.ws.onerror = (error) => {
      console.error("WebSocket error:", error);
      this.handleConnectionError();
    };

    this.ws.onclose = (event) => {
      console.log("WebSocket connection closed:", event.code, event.reason);
      this.handleConnectionClose(event);
    };
  }

  private handleIncomingMessage(data: string): void {
    try {
      const message = JSON.parse(data);

      switch (message.type) {
        case "chat_message":
          this.onChatMessage(message.data);
          break;
        case "user_joined":
          this.onUserJoined(message.data);
          break;
        case "user_left":
          this.onUserLeft(message.data);
          break;
        case "typing_indicator":
          this.onTypingIndicator(message.data);
          break;
        case "heartbeat_response":
          this.onHeartbeatResponse(message.data);
          break;
        case "error":
          this.onServerError(message.data);
          break;
        default:
          console.warn("Unknown message type:", message.type);
      }
    } catch (error) {
      console.error("Failed to parse WebSocket message:", error, data);
    }
  }

  sendMessage(type: string, data: any): void {
    const message = {
      id: this.generateMessageId(),
      type,
      data,
      timestamp: Date.now(),
    };

    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    } else {
      // Queue message if connection is not ready
      this.messageQueue.push(message);

      // Attempt to reconnect if disconnected
      if (this.connectionState === "disconnected") {
        this.reconnect();
      }
    }
  }

  // Intelligent reconnection with exponential backoff
  private async reconnect(): Promise<void> {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error("Max reconnection attempts reached");
      this.onMaxReconnectAttemptsReached();
      return;
    }

    this.connectionState = "reconnecting";
    this.reconnectAttempts++;

    const delay = Math.min(
      this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1),
      30000 // Max 30 seconds
    );

    console.log(
      `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`
    );

    setTimeout(() => {
      this.connect(this.userId, this.token);
    }, delay);
  }

  // Connection health monitoring
  private startHeartbeat(): void {
    this.heartbeatInterval = setInterval(() => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        this.sendMessage("heartbeat", { timestamp: Date.now() });
      }
    }, 30000); // Send heartbeat every 30 seconds
  }

  private stopHeartbeat(): void {
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
      this.heartbeatInterval = null;
    }
  }

  // Graceful disconnection
  disconnect(): void {
    this.stopHeartbeat();

    if (this.ws) {
      this.ws.close(1000, "Client disconnecting");
      this.ws = null;
    }

    this.connectionState = "disconnected";
  }

  // Connection state management
  getConnectionState(): string {
    return this.connectionState;
  }

  isConnected(): boolean {
    return (
      this.ws?.readyState === WebSocket.OPEN &&
      this.connectionState === "connected"
    );
  }
}

interface QueuedMessage {
  id: string;
  type: string;
  data: any;
  timestamp: number;
}

Server-Side WebSocket Implementation

// Production-ready WebSocket server with Node.js
import WebSocket from "ws";
import { createServer } from "http";
import { parse as parseUrl } from "url";
import jwt from "jsonwebtoken";

class WebSocketServer {
  private wss: WebSocket.Server;
  private connections: Map<string, ConnectionInfo> = new Map();
  private rooms: Map<string, Set<string>> = new Map();
  private messageHandlers: Map<string, MessageHandler> = new Map();

  constructor(port: number = 8080) {
    const server = createServer();

    this.wss = new WebSocket.Server({
      server,
      verifyClient: this.verifyClient.bind(this),
    });

    this.setupMessageHandlers();
    this.setupConnectionHandlers();

    server.listen(port, () => {
      console.log(`WebSocket server listening on port ${port}`);
    });

    // Cleanup inactive connections
    setInterval(() => this.cleanupConnections(), 30000);
  }

  private verifyClient(info: any): boolean {
    const { query } = parseUrl(info.req.url, true);
    const token = query.token as string;

    if (!token) {
      console.log("WebSocket connection rejected: No token provided");
      return false;
    }

    try {
      const decoded = jwt.verify(token, process.env.JWT_SECRET!) as any;
      info.req.userId = decoded.userId;
      return true;
    } catch (error) {
      console.log("WebSocket connection rejected: Invalid token");
      return false;
    }
  }

  private setupConnectionHandlers(): void {
    this.wss.on("connection", (ws: WebSocket, req: any) => {
      const userId = req.userId;
      const connectionId = this.generateConnectionId();

      // Store connection info
      const connectionInfo: ConnectionInfo = {
        id: connectionId,
        userId,
        ws,
        lastActivity: Date.now(),
        rooms: new Set(),
        metadata: {},
      };

      this.connections.set(connectionId, connectionInfo);

      console.log(`User ${userId} connected (${connectionId})`);

      // Setup message handling
      ws.on("message", (data: string) => {
        this.handleMessage(connectionId, data);
      });

      ws.on("close", () => {
        this.handleDisconnection(connectionId);
      });

      ws.on("error", (error) => {
        console.error(`WebSocket error for ${connectionId}:`, error);
        this.handleDisconnection(connectionId);
      });

      // Send welcome message
      this.sendToConnection(connectionId, {
        type: "connection_established",
        data: { connectionId, timestamp: Date.now() },
      });
    });
  }

  private setupMessageHandlers(): void {
    // Chat message handler
    this.messageHandlers.set("chat_message", async (connectionId, data) => {
      const connection = this.connections.get(connectionId);
      if (!connection) return;

      // Validate and sanitize message
      const sanitizedMessage = this.sanitizeMessage(data.message);

      const messageData = {
        id: this.generateMessageId(),
        userId: connection.userId,
        message: sanitizedMessage,
        timestamp: Date.now(),
        roomId: data.roomId,
      };

      // Save to database
      await this.saveMessage(messageData);

      // Broadcast to room
      this.broadcastToRoom(data.roomId, {
        type: "chat_message",
        data: messageData,
      });

      // Update user activity
      connection.lastActivity = Date.now();
    });

    // Join room handler
    this.messageHandlers.set("join_room", (connectionId, data) => {
      const connection = this.connections.get(connectionId);
      if (!connection) return;

      const roomId = data.roomId;

      // Add connection to room
      this.joinRoom(connectionId, roomId);

      // Notify room about new user
      this.broadcastToRoom(
        roomId,
        {
          type: "user_joined",
          data: {
            userId: connection.userId,
            roomId,
            timestamp: Date.now(),
          },
        },
        [connectionId]
      ); // Exclude the user who just joined

      // Send room state to the new user
      const roomUsers = this.getRoomUsers(roomId);
      this.sendToConnection(connectionId, {
        type: "room_state",
        data: {
          roomId,
          users: roomUsers,
          timestamp: Date.now(),
        },
      });
    });

    // Leave room handler
    this.messageHandlers.set("leave_room", (connectionId, data) => {
      const connection = this.connections.get(connectionId);
      if (!connection) return;

      const roomId = data.roomId;

      // Remove from room
      this.leaveRoom(connectionId, roomId);

      // Notify room
      this.broadcastToRoom(roomId, {
        type: "user_left",
        data: {
          userId: connection.userId,
          roomId,
          timestamp: Date.now(),
        },
      });
    });

    // Typing indicator handler
    this.messageHandlers.set("typing_start", (connectionId, data) => {
      const connection = this.connections.get(connectionId);
      if (!connection) return;

      this.broadcastToRoom(
        data.roomId,
        {
          type: "typing_indicator",
          data: {
            userId: connection.userId,
            roomId: data.roomId,
            isTyping: true,
            timestamp: Date.now(),
          },
        },
        [connectionId]
      );
    });

    this.messageHandlers.set("typing_stop", (connectionId, data) => {
      const connection = this.connections.get(connectionId);
      if (!connection) return;

      this.broadcastToRoom(
        data.roomId,
        {
          type: "typing_indicator",
          data: {
            userId: connection.userId,
            roomId: data.roomId,
            isTyping: false,
            timestamp: Date.now(),
          },
        },
        [connectionId]
      );
    });

    // Heartbeat handler
    this.messageHandlers.set("heartbeat", (connectionId, data) => {
      const connection = this.connections.get(connectionId);
      if (!connection) return;

      connection.lastActivity = Date.now();

      this.sendToConnection(connectionId, {
        type: "heartbeat_response",
        data: { timestamp: Date.now() },
      });
    });
  }

  private handleMessage(connectionId: string, data: string): void {
    try {
      const message = JSON.parse(data);
      const handler = this.messageHandlers.get(message.type);

      if (handler) {
        handler(connectionId, message.data);
      } else {
        console.warn(`Unknown message type: ${message.type}`);

        this.sendToConnection(connectionId, {
          type: "error",
          data: { message: "Unknown message type", type: message.type },
        });
      }
    } catch (error) {
      console.error("Failed to parse WebSocket message:", error);

      this.sendToConnection(connectionId, {
        type: "error",
        data: { message: "Invalid message format" },
      });
    }
  }

  // Room management
  private joinRoom(connectionId: string, roomId: string): void {
    const connection = this.connections.get(connectionId);
    if (!connection) return;

    // Add connection to room
    if (!this.rooms.has(roomId)) {
      this.rooms.set(roomId, new Set());
    }

    this.rooms.get(roomId)!.add(connectionId);
    connection.rooms.add(roomId);

    console.log(`User ${connection.userId} joined room ${roomId}`);
  }

  private leaveRoom(connectionId: string, roomId: string): void {
    const connection = this.connections.get(connectionId);
    if (!connection) return;

    const room = this.rooms.get(roomId);
    if (room) {
      room.delete(connectionId);

      if (room.size === 0) {
        this.rooms.delete(roomId);
      }
    }

    connection.rooms.delete(roomId);

    console.log(`User ${connection.userId} left room ${roomId}`);
  }

  private broadcastToRoom(
    roomId: string,
    message: any,
    excludeConnections: string[] = []
  ): void {
    const room = this.rooms.get(roomId);
    if (!room) return;

    const messageStr = JSON.stringify(message);

    room.forEach((connectionId) => {
      if (!excludeConnections.includes(connectionId)) {
        const connection = this.connections.get(connectionId);
        if (connection && connection.ws.readyState === WebSocket.OPEN) {
          connection.ws.send(messageStr);
        }
      }
    });
  }

  private sendToConnection(connectionId: string, message: any): void {
    const connection = this.connections.get(connectionId);
    if (connection && connection.ws.readyState === WebSocket.OPEN) {
      connection.ws.send(JSON.stringify(message));
    }
  }

  // Connection cleanup
  private cleanupConnections(): void {
    const now = Date.now();
    const staleTimeout = 5 * 60 * 1000; // 5 minutes

    for (const [connectionId, connection] of this.connections.entries()) {
      if (now - connection.lastActivity > staleTimeout) {
        console.log(`Cleaning up stale connection: ${connectionId}`);
        this.handleDisconnection(connectionId);
      }
    }
  }

  private handleDisconnection(connectionId: string): void {
    const connection = this.connections.get(connectionId);
    if (!connection) return;

    // Leave all rooms
    connection.rooms.forEach((roomId) => {
      this.leaveRoom(connectionId, roomId);
    });

    // Remove connection
    this.connections.delete(connectionId);

    console.log(`User ${connection.userId} disconnected (${connectionId})`);
  }

  // Utility methods
  private getRoomUsers(roomId: string): string[] {
    const room = this.rooms.get(roomId);
    if (!room) return [];

    const users: string[] = [];
    room.forEach((connectionId) => {
      const connection = this.connections.get(connectionId);
      if (connection) {
        users.push(connection.userId);
      }
    });

    return users;
  }

  private generateConnectionId(): string {
    return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  private generateMessageId(): string {
    return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  private sanitizeMessage(message: string): string {
    // Basic sanitization - in production, use a proper library
    return message.trim().slice(0, 1000); // Limit to 1000 characters
  }

  private async saveMessage(messageData: any): Promise<void> {
    // Save to database - implement based on your database choice
    try {
      await this.database.messages.create(messageData);
    } catch (error) {
      console.error("Failed to save message to database:", error);
    }
  }

  // Get server statistics
  getStats(): ServerStats {
    return {
      totalConnections: this.connections.size,
      totalRooms: this.rooms.size,
      connectionsPerRoom: Array.from(this.rooms.entries()).map(
        ([roomId, connections]) => ({
          roomId,
          connectionCount: connections.size,
        })
      ),
    };
  }
}

interface ConnectionInfo {
  id: string;
  userId: string;
  ws: WebSocket;
  lastActivity: number;
  rooms: Set<string>;
  metadata: Record<string, any>;
}

interface ServerStats {
  totalConnections: number;
  totalRooms: number;
  connectionsPerRoom: Array<{ roomId: string; connectionCount: number }>;
}

type MessageHandler = (connectionId: string, data: any) => void;

Socket.io: WebSockets Made Developer-Friendly

Building Robust Real-time Applications

// Socket.io server implementation with advanced features
import { Server, Socket } from "socket.io";
import { createServer } from "http";
import { createClient } from "redis";
import jwt from "jsonwebtoken";

class SocketIOService {
  private io: Server;
  private redis: any;
  private connectedUsers: Map<string, UserConnection> = new Map();

  constructor(port: number = 3000) {
    const httpServer = createServer();

    this.io = new Server(httpServer, {
      cors: {
        origin: process.env.CLIENT_URL || "http://localhost:3000",
        methods: ["GET", "POST"],
      },
      // Connection handling options
      pingTimeout: 60000, // 60 seconds
      pingInterval: 25000, // 25 seconds
      transports: ["websocket", "polling"],
    });

    this.setupRedisAdapter();
    this.setupAuthentication();
    this.setupEventHandlers();

    httpServer.listen(port, () => {
      console.log(`Socket.io server running on port ${port}`);
    });
  }

  private async setupRedisAdapter(): Promise<void> {
    // Redis adapter for horizontal scaling
    const pubClient = createClient({
      url: process.env.REDIS_URL || "redis://localhost:6379",
    });

    const subClient = pubClient.duplicate();

    await Promise.all([pubClient.connect(), subClient.connect()]);

    this.redis = pubClient;

    // Configure Socket.io to use Redis for multi-server communication
    const { createAdapter } = await import("@socket.io/redis-adapter");
    this.io.adapter(createAdapter(pubClient, subClient));
  }

  private setupAuthentication(): void {
    this.io.use(async (socket: Socket, next) => {
      try {
        const token =
          socket.handshake.auth.token || socket.handshake.headers.authorization;

        if (!token) {
          return next(new Error("Authentication token required"));
        }

        const decoded = jwt.verify(token, process.env.JWT_SECRET!) as any;

        // Attach user info to socket
        socket.userId = decoded.userId;
        socket.userEmail = decoded.email;
        socket.userRole = decoded.role;

        next();
      } catch (error) {
        next(new Error("Authentication failed"));
      }
    });
  }

  private setupEventHandlers(): void {
    this.io.on("connection", (socket: Socket) => {
      console.log(`User connected: ${socket.userId} (${socket.id})`);

      // Store user connection info
      this.connectedUsers.set(socket.userId, {
        socketId: socket.id,
        userId: socket.userId,
        connectedAt: Date.now(),
        currentRooms: new Set(),
        lastActivity: Date.now(),
      });

      this.setupUserEventHandlers(socket);

      // Handle disconnection
      socket.on("disconnect", (reason) => {
        this.handleUserDisconnection(socket, reason);
      });
    });
  }

  private setupUserEventHandlers(socket: Socket): void {
    // Join chat room
    socket.on("join_room", async (data: JoinRoomData) => {
      try {
        const { roomId, roomType } = data;

        // Validate user has permission to join room
        const canJoin = await this.validateRoomAccess(
          socket.userId,
          roomId,
          roomType
        );
        if (!canJoin) {
          socket.emit("error", { message: "Access denied to room" });
          return;
        }

        // Join the room
        await socket.join(roomId);

        // Update user connection info
        const userConnection = this.connectedUsers.get(socket.userId);
        if (userConnection) {
          userConnection.currentRooms.add(roomId);
          userConnection.lastActivity = Date.now();
        }

        // Get room info and recent messages
        const roomInfo = await this.getRoomInfo(roomId);
        const recentMessages = await this.getRecentMessages(roomId, 50);

        // Send room data to user
        socket.emit("room_joined", {
          roomId,
          roomInfo,
          recentMessages,
          onlineUsers: await this.getRoomOnlineUsers(roomId),
        });

        // Notify other users in room
        socket.to(roomId).emit("user_joined", {
          userId: socket.userId,
          userEmail: socket.userEmail,
          timestamp: Date.now(),
        });

        console.log(`User ${socket.userId} joined room ${roomId}`);
      } catch (error) {
        console.error("Error joining room:", error);
        socket.emit("error", { message: "Failed to join room" });
      }
    });

    // Leave room
    socket.on("leave_room", async (data: { roomId: string }) => {
      const { roomId } = data;

      await socket.leave(roomId);

      // Update user connection info
      const userConnection = this.connectedUsers.get(socket.userId);
      if (userConnection) {
        userConnection.currentRooms.delete(roomId);
      }

      // Notify other users
      socket.to(roomId).emit("user_left", {
        userId: socket.userId,
        timestamp: Date.now(),
      });

      console.log(`User ${socket.userId} left room ${roomId}`);
    });

    // Send chat message
    socket.on("send_message", async (data: SendMessageData) => {
      try {
        const { roomId, message, messageType = "text", metadata = {} } = data;

        // Validate message
        if (!message || message.trim().length === 0) {
          socket.emit("error", { message: "Message cannot be empty" });
          return;
        }

        if (message.length > 2000) {
          socket.emit("error", { message: "Message too long" });
          return;
        }

        // Check if user is in room
        const rooms = Array.from(socket.rooms);
        if (!rooms.includes(roomId)) {
          socket.emit("error", { message: "You are not in this room" });
          return;
        }

        // Rate limiting check
        const canSend = await this.checkRateLimit(socket.userId);
        if (!canSend) {
          socket.emit("error", {
            message: "Rate limit exceeded. Please slow down.",
          });
          return;
        }

        // Create message object
        const messageData: ChatMessage = {
          id: this.generateMessageId(),
          roomId,
          userId: socket.userId,
          userEmail: socket.userEmail,
          message: this.sanitizeMessage(message),
          messageType,
          metadata,
          timestamp: Date.now(),
          edited: false,
          reactions: {},
        };

        // Save to database
        await this.saveMessage(messageData);

        // Broadcast to room
        this.io.to(roomId).emit("new_message", messageData);

        // Update user activity
        const userConnection = this.connectedUsers.get(socket.userId);
        if (userConnection) {
          userConnection.lastActivity = Date.now();
        }

        console.log(`Message sent in room ${roomId} by user ${socket.userId}`);
      } catch (error) {
        console.error("Error sending message:", error);
        socket.emit("error", { message: "Failed to send message" });
      }
    });

    // Typing indicators
    socket.on("typing_start", (data: { roomId: string }) => {
      socket.to(data.roomId).emit("user_typing", {
        userId: socket.userId,
        userEmail: socket.userEmail,
        isTyping: true,
      });
    });

    socket.on("typing_stop", (data: { roomId: string }) => {
      socket.to(data.roomId).emit("user_typing", {
        userId: socket.userId,
        userEmail: socket.userEmail,
        isTyping: false,
      });
    });

    // Message reactions
    socket.on("add_reaction", async (data: AddReactionData) => {
      try {
        const { messageId, reaction } = data;

        // Validate reaction
        const validReactions = ["👍", "👎", "❤️", "😂", "😮", "😢", "😡"];
        if (!validReactions.includes(reaction)) {
          socket.emit("error", { message: "Invalid reaction" });
          return;
        }

        // Update message with reaction
        const updated = await this.addMessageReaction(
          messageId,
          socket.userId,
          reaction
        );
        if (updated) {
          // Get the room ID for this message
          const message = await this.getMessage(messageId);
          if (message) {
            this.io.to(message.roomId).emit("message_reaction_added", {
              messageId,
              userId: socket.userId,
              reaction,
              timestamp: Date.now(),
            });
          }
        }
      } catch (error) {
        console.error("Error adding reaction:", error);
        socket.emit("error", { message: "Failed to add reaction" });
      }
    });

    // Private messaging
    socket.on("send_private_message", async (data: PrivateMessageData) => {
      try {
        const { recipientId, message } = data;

        // Validate recipient exists and is online
        const recipientConnection = this.connectedUsers.get(recipientId);
        if (!recipientConnection) {
          socket.emit("error", { message: "User is not online" });
          return;
        }

        const messageData: PrivateMessage = {
          id: this.generateMessageId(),
          senderId: socket.userId,
          recipientId,
          message: this.sanitizeMessage(message),
          timestamp: Date.now(),
          read: false,
        };

        // Save to database
        await this.savePrivateMessage(messageData);

        // Send to recipient
        this.io
          .to(recipientConnection.socketId)
          .emit("private_message", messageData);

        // Confirm to sender
        socket.emit("private_message_sent", {
          messageId: messageData.id,
          recipientId,
          timestamp: messageData.timestamp,
        });
      } catch (error) {
        console.error("Error sending private message:", error);
        socket.emit("error", { message: "Failed to send private message" });
      }
    });

    // Activity tracking
    socket.on("activity", () => {
      const userConnection = this.connectedUsers.get(socket.userId);
      if (userConnection) {
        userConnection.lastActivity = Date.now();
      }
    });
  }

  private handleUserDisconnection(socket: Socket, reason: string): void {
    console.log(`User disconnected: ${socket.userId} (${reason})`);

    // Get user's current rooms and notify them
    const userConnection = this.connectedUsers.get(socket.userId);
    if (userConnection) {
      userConnection.currentRooms.forEach((roomId) => {
        socket.to(roomId).emit("user_left", {
          userId: socket.userId,
          timestamp: Date.now(),
        });
      });
    }

    // Remove from connected users
    this.connectedUsers.delete(socket.userId);
  }

  // Rate limiting implementation
  private async checkRateLimit(userId: string): Promise<boolean> {
    const key = `rate_limit:${userId}`;
    const limit = 10; // 10 messages per minute
    const window = 60; // 60 seconds

    const count = await this.redis.incr(key);
    if (count === 1) {
      await this.redis.expire(key, window);
    }

    return count <= limit;
  }

  // Utility methods for database operations
  private async validateRoomAccess(
    userId: string,
    roomId: string,
    roomType: string
  ): Promise<boolean> {
    // Implement room access validation based on your business logic
    return true; // Simplified for example
  }

  private async getRoomInfo(roomId: string): Promise<RoomInfo> {
    // Fetch room information from database
    return {
      id: roomId,
      name: `Room ${roomId}`,
      description: "",
      createdAt: Date.now(),
      memberCount: 0,
    };
  }

  private async getRecentMessages(
    roomId: string,
    limit: number
  ): Promise<ChatMessage[]> {
    // Fetch recent messages from database
    return [];
  }

  private async getRoomOnlineUsers(roomId: string): Promise<OnlineUser[]> {
    // Get list of online users in room
    const socketsInRoom = await this.io.in(roomId).fetchSockets();

    return socketsInRoom.map((socket) => ({
      userId: (socket as any).userId,
      userEmail: (socket as any).userEmail,
      socketId: socket.id,
    }));
  }

  // Broadcasting methods
  broadcastToRoom(roomId: string, event: string, data: any): void {
    this.io.to(roomId).emit(event, data);
  }

  broadcastToUser(userId: string, event: string, data: any): void {
    const userConnection = this.connectedUsers.get(userId);
    if (userConnection) {
      this.io.to(userConnection.socketId).emit(event, data);
    }
  }

  broadcastToAll(event: string, data: any): void {
    this.io.emit(event, data);
  }

  // Server statistics
  getServerStats(): ServerStatistics {
    return {
      connectedUsers: this.connectedUsers.size,
      totalRooms: this.io.sockets.adapter.rooms.size,
      serverUptime: process.uptime(),
      memoryUsage: process.memoryUsage(),
    };
  }
}

// Interfaces
interface UserConnection {
  socketId: string;
  userId: string;
  connectedAt: number;
  currentRooms: Set<string>;
  lastActivity: number;
}

interface JoinRoomData {
  roomId: string;
  roomType: "public" | "private" | "direct";
}

interface SendMessageData {
  roomId: string;
  message: string;
  messageType?: "text" | "image" | "file" | "system";
  metadata?: Record<string, any>;
}

interface ChatMessage {
  id: string;
  roomId: string;
  userId: string;
  userEmail: string;
  message: string;
  messageType: string;
  metadata: Record<string, any>;
  timestamp: number;
  edited: boolean;
  reactions: Record<string, string[]>; // reaction -> array of user IDs
}

interface AddReactionData {
  messageId: string;
  reaction: string;
}

interface PrivateMessageData {
  recipientId: string;
  message: string;
}

interface PrivateMessage {
  id: string;
  senderId: string;
  recipientId: string;
  message: string;
  timestamp: number;
  read: boolean;
}

interface RoomInfo {
  id: string;
  name: string;
  description: string;
  createdAt: number;
  memberCount: number;
}

interface OnlineUser {
  userId: string;
  userEmail: string;
  socketId: string;
}

interface ServerStatistics {
  connectedUsers: number;
  totalRooms: number;
  serverUptime: number;
  memoryUsage: NodeJS.MemoryUsage;
}

Real-time Chat Applications: Beyond Hello World

Building Production-Ready Chat Systems

// Complete chat application backend
class ChatApplicationService {
  private socketService: SocketIOService;
  private messageService: MessageService;
  private userService: UserService;
  private notificationService: NotificationService;
  private redis: Redis;

  constructor() {
    this.socketService = new SocketIOService();
    this.messageService = new MessageService();
    this.userService = new UserService();
    this.notificationService = new NotificationService();
    this.redis = new Redis(process.env.REDIS_URL);

    this.setupAdvancedFeatures();
  }

  private setupAdvancedFeatures(): void {
    this.setupMessageThreads();
    this.setupMessageSearch();
    this.setupFileSharing();
    this.setupVoiceMessages();
    this.setupMessageScheduling();
    this.setupChatBots();
  }

  // Message threading system
  private setupMessageThreads(): void {
    this.socketService.onEvent("start_thread", async (socket, data) => {
      const { parentMessageId, message } = data;

      try {
        // Create thread
        const thread = await this.messageService.createThread(parentMessageId, {
          creatorId: socket.userId,
          initialMessage: message,
        });

        // Notify room about new thread
        const parentMessage = await this.messageService.getMessage(
          parentMessageId
        );
        if (parentMessage) {
          this.socketService.broadcastToRoom(
            parentMessage.roomId,
            "thread_created",
            {
              threadId: thread.id,
              parentMessageId,
              createdBy: socket.userId,
              timestamp: Date.now(),
            }
          );
        }

        socket.emit("thread_created", thread);
      } catch (error) {
        socket.emit("error", { message: "Failed to create thread" });
      }
    });

    this.socketService.onEvent("reply_to_thread", async (socket, data) => {
      const { threadId, message } = data;

      try {
        const reply = await this.messageService.addThreadReply(threadId, {
          userId: socket.userId,
          message,
          timestamp: Date.now(),
        });

        // Get thread participants
        const participants = await this.messageService.getThreadParticipants(
          threadId
        );

        // Notify all participants
        participants.forEach((participantId) => {
          if (participantId !== socket.userId) {
            this.socketService.broadcastToUser(
              participantId,
              "thread_reply",
              reply
            );
          }
        });

        socket.emit("thread_reply_sent", reply);
      } catch (error) {
        socket.emit("error", { message: "Failed to send thread reply" });
      }
    });
  }

  // Advanced message search
  private setupMessageSearch(): void {
    this.socketService.onEvent("search_messages", async (socket, data) => {
      const { query, roomId, filters = {} } = data;

      try {
        const searchResults = await this.messageService.searchMessages({
          query,
          roomId,
          userId: socket.userId,
          filters: {
            dateFrom: filters.dateFrom,
            dateTo: filters.dateTo,
            messageType: filters.messageType,
            fromUser: filters.fromUser,
            hasAttachments: filters.hasAttachments,
            inThread: filters.inThread,
          },
          limit: filters.limit || 20,
          offset: filters.offset || 0,
        });

        socket.emit("search_results", {
          query,
          results: searchResults.messages,
          totalCount: searchResults.totalCount,
          hasMore: searchResults.hasMore,
        });
      } catch (error) {
        socket.emit("error", { message: "Search failed" });
      }
    });
  }

  // File sharing with progress tracking
  private setupFileSharing(): void {
    this.socketService.onEvent("upload_file", async (socket, data) => {
      const { roomId, fileName, fileSize, fileType, checksum } = data;

      try {
        // Validate file
        if (fileSize > 100 * 1024 * 1024) {
          // 100MB limit
          socket.emit("error", { message: "File too large" });
          return;
        }

        // Create upload session
        const uploadSession = await this.createUploadSession({
          userId: socket.userId,
          roomId,
          fileName,
          fileSize,
          fileType,
          checksum,
        });

        socket.emit("upload_session_created", {
          sessionId: uploadSession.id,
          uploadUrl: uploadSession.uploadUrl,
          chunkSize: uploadSession.chunkSize,
        });
      } catch (error) {
        socket.emit("error", { message: "Failed to create upload session" });
      }
    });

    this.socketService.onEvent("upload_progress", async (socket, data) => {
      const { sessionId, progress } = data;

      try {
        await this.updateUploadProgress(sessionId, progress);

        // Broadcast progress to room if desired
        const session = await this.getUploadSession(sessionId);
        if (session && session.broadcastProgress) {
          this.socketService.broadcastToRoom(
            session.roomId,
            "file_upload_progress",
            {
              sessionId,
              fileName: session.fileName,
              progress,
              uploadedBy: socket.userId,
            }
          );
        }
      } catch (error) {
        console.error("Failed to update upload progress:", error);
      }
    });

    this.socketService.onEvent("upload_complete", async (socket, data) => {
      const { sessionId } = data;

      try {
        const fileInfo = await this.completeFileUpload(sessionId);

        // Create file message
        const messageData = {
          roomId: fileInfo.roomId,
          userId: socket.userId,
          messageType: "file",
          message: `Shared file: ${fileInfo.fileName}`,
          metadata: {
            fileName: fileInfo.fileName,
            fileSize: fileInfo.fileSize,
            fileType: fileInfo.fileType,
            fileUrl: fileInfo.url,
            thumbnailUrl: fileInfo.thumbnailUrl,
          },
        };

        const savedMessage = await this.messageService.saveMessage(messageData);

        this.socketService.broadcastToRoom(
          fileInfo.roomId,
          "new_message",
          savedMessage
        );

        socket.emit("file_uploaded", {
          messageId: savedMessage.id,
          fileUrl: fileInfo.url,
        });
      } catch (error) {
        socket.emit("error", { message: "Failed to complete file upload" });
      }
    });
  }

  // Voice message support
  private setupVoiceMessages(): void {
    this.socketService.onEvent("send_voice_message", async (socket, data) => {
      const { roomId, audioBlob, duration } = data;

      try {
        // Process audio (convert format, compress, etc.)
        const processedAudio = await this.processVoiceMessage(audioBlob);

        // Create voice message
        const voiceMessage = {
          roomId,
          userId: socket.userId,
          messageType: "voice",
          message: "🎵 Voice message",
          metadata: {
            audioUrl: processedAudio.url,
            duration: duration,
            waveform: processedAudio.waveform, // For visual representation
            transcription: null, // Can be added later with speech-to-text
          },
        };

        const savedMessage = await this.messageService.saveMessage(
          voiceMessage
        );

        this.socketService.broadcastToRoom(roomId, "new_message", savedMessage);

        // Optional: Generate transcription asynchronously
        this.generateTranscription(savedMessage.id, processedAudio.url);
      } catch (error) {
        socket.emit("error", { message: "Failed to send voice message" });
      }
    });
  }

  // Message scheduling
  private setupMessageScheduling(): void {
    this.socketService.onEvent("schedule_message", async (socket, data) => {
      const { roomId, message, scheduledAt } = data;

      try {
        const scheduledMessage = await this.messageService.scheduleMessage({
          roomId,
          userId: socket.userId,
          message,
          scheduledAt: new Date(scheduledAt),
          status: "pending",
        });

        socket.emit("message_scheduled", {
          scheduledMessageId: scheduledMessage.id,
          scheduledAt: scheduledMessage.scheduledAt,
        });

        // Schedule the actual sending
        this.scheduleMessageDelivery(scheduledMessage.id, scheduledAt);
      } catch (error) {
        socket.emit("error", { message: "Failed to schedule message" });
      }
    });
  }

  // Chat bot integration
  private setupChatBots(): void {
    // Listen for bot commands
    this.socketService.onMessageReceived(async (messageData) => {
      if (messageData.message.startsWith("/")) {
        await this.handleBotCommand(messageData);
      }

      // Check for @bot mentions
      if (messageData.message.includes("@bot")) {
        await this.handleBotMention(messageData);
      }
    });
  }

  private async handleBotCommand(messageData: ChatMessage): Promise<void> {
    const command = messageData.message.substring(1).toLowerCase();
    const args = command.split(" ").slice(1);

    let botResponse: string;

    switch (command.split(" ")[0]) {
      case "weather":
        botResponse = await this.getBotWeatherResponse(
          args[0] || "current location"
        );
        break;

      case "translate":
        const text = args.join(" ");
        botResponse = await this.getBotTranslationResponse(text);
        break;

      case "remind":
        const reminder = args.join(" ");
        botResponse = await this.setBotReminder(messageData.userId, reminder);
        break;

      case "poll":
        const question = args.join(" ");
        botResponse = await this.createBotPoll(messageData.roomId, question);
        break;

      default:
        botResponse = "Unknown command. Type /help for available commands.";
    }

    // Send bot response
    const botMessage = {
      roomId: messageData.roomId,
      userId: "bot",
      userEmail: "bot@system",
      message: botResponse,
      messageType: "bot",
      metadata: {
        isBot: true,
        replyTo: messageData.id,
      },
    };

    const savedBotMessage = await this.messageService.saveMessage(botMessage);
    this.socketService.broadcastToRoom(
      messageData.roomId,
      "new_message",
      savedBotMessage
    );
  }

  // Message analytics and insights
  async generateChatAnalytics(
    roomId: string,
    timeframe: string
  ): Promise<ChatAnalytics> {
    const analytics = await this.messageService.getChatAnalytics(
      roomId,
      timeframe
    );

    return {
      messageCount: analytics.totalMessages,
      activeUsers: analytics.activeUsers.length,
      topUsers: analytics.topUsers,
      peakHours: analytics.peakHours,
      messageTypes: analytics.messageTypeBreakdown,
      averageResponseTime: analytics.averageResponseTime,
      sentimentAnalysis: analytics.sentimentAnalysis,
      topKeywords: analytics.topKeywords,
      engagementMetrics: {
        reactionsCount: analytics.totalReactions,
        threadsStarted: analytics.threadsStarted,
        filesShared: analytics.filesShared,
      },
    };
  }

  // Moderation tools
  async moderateMessage(
    messageId: string,
    action: ModerationAction,
    moderatorId: string
  ): Promise<void> {
    const message = await this.messageService.getMessage(messageId);
    if (!message) throw new Error("Message not found");

    switch (action.type) {
      case "delete":
        await this.messageService.deleteMessage(messageId, moderatorId);
        this.socketService.broadcastToRoom(message.roomId, "message_deleted", {
          messageId,
          deletedBy: moderatorId,
          reason: action.reason,
        });
        break;

      case "flag":
        await this.messageService.flagMessage(
          messageId,
          action.reason,
          moderatorId
        );
        break;

      case "warn_user":
        await this.userService.warnUser(
          message.userId,
          action.reason,
          moderatorId
        );
        this.socketService.broadcastToUser(message.userId, "user_warned", {
          reason: action.reason,
          warnedBy: moderatorId,
        });
        break;

      case "timeout_user":
        await this.userService.timeoutUser(
          message.userId,
          action.duration,
          moderatorId
        );
        this.socketService.broadcastToUser(message.userId, "user_timeout", {
          duration: action.duration,
          reason: action.reason,
        });
        break;
    }
  }
}

// Analytics interfaces
interface ChatAnalytics {
  messageCount: number;
  activeUsers: number;
  topUsers: Array<{ userId: string; messageCount: number }>;
  peakHours: Array<{ hour: number; messageCount: number }>;
  messageTypes: Record<string, number>;
  averageResponseTime: number;
  sentimentAnalysis: {
    positive: number;
    neutral: number;
    negative: number;
  };
  topKeywords: Array<{ keyword: string; count: number }>;
  engagementMetrics: {
    reactionsCount: number;
    threadsStarted: number;
    filesShared: number;
  };
}

interface ModerationAction {
  type: "delete" | "flag" | "warn_user" | "timeout_user";
  reason: string;
  duration?: number; // for timeouts
}

Live Notifications System: Keeping Users Engaged

Real-time Notification Architecture

// Comprehensive notification system
class LiveNotificationSystem {
  private io: Server;
  private redis: Redis;
  private database: Database;
  private emailService: EmailService;
  private pushService: PushNotificationService;

  constructor() {
    this.setupNotificationChannels();
    this.setupNotificationProcessing();
    this.setupNotificationDelivery();
  }

  // Multi-channel notification delivery
  async sendNotification(notification: NotificationData): Promise<void> {
    try {
      // Determine delivery channels based on user preferences
      const user = await this.getUserNotificationPreferences(
        notification.recipientId
      );
      const deliveryChannels = this.determineDeliveryChannels(
        notification,
        user
      );

      // Process notification through all applicable channels
      const deliveryPromises = deliveryChannels.map((channel) =>
        this.deliverThroughChannel(notification, channel, user)
      );

      await Promise.allSettled(deliveryPromises);

      // Track notification delivery
      await this.trackNotificationDelivery(notification, deliveryChannels);
    } catch (error) {
      console.error("Failed to send notification:", error);
      await this.handleNotificationError(notification, error);
    }
  }

  private determineDeliveryChannels(
    notification: NotificationData,
    user: UserNotificationSettings
  ): NotificationChannel[] {
    const channels: NotificationChannel[] = [];

    // Real-time (WebSocket) - always try first if user is online
    if (this.isUserOnline(notification.recipientId)) {
      channels.push("realtime");
    }

    // Push notification based on user settings and notification priority
    if (this.shouldSendPushNotification(notification, user)) {
      channels.push("push");
    }

    // Email based on user settings and notification type
    if (this.shouldSendEmail(notification, user)) {
      channels.push("email");
    }

    // SMS for critical notifications
    if (notification.priority === "critical" && user.smsNotifications) {
      channels.push("sms");
    }

    return channels;
  }

  private async deliverThroughChannel(
    notification: NotificationData,
    channel: NotificationChannel,
    user: UserNotificationSettings
  ): Promise<void> {
    switch (channel) {
      case "realtime":
        await this.sendRealtimeNotification(notification);
        break;

      case "push":
        await this.sendPushNotification(notification, user);
        break;

      case "email":
        await this.sendEmailNotification(notification, user);
        break;

      case "sms":
        await this.sendSMSNotification(notification, user);
        break;
    }
  }

  // Real-time notification delivery
  private async sendRealtimeNotification(
    notification: NotificationData
  ): Promise<void> {
    const userConnection = await this.getUserConnection(
      notification.recipientId
    );

    if (userConnection) {
      // Send notification to user's socket
      this.io.to(userConnection.socketId).emit("notification", {
        id: notification.id,
        type: notification.type,
        title: notification.title,
        message: notification.message,
        data: notification.data,
        priority: notification.priority,
        timestamp: notification.timestamp,
        actions: notification.actions || [],
      });

      // Update notification status
      await this.markNotificationDelivered(notification.id, "realtime");
    }
  }

  // Advanced notification features
  async createNotificationTemplate(
    template: NotificationTemplate
  ): Promise<string> {
    const templateId = this.generateTemplateId();

    // Validate template
    this.validateNotificationTemplate(template);

    // Store template
    await this.database.notificationTemplates.create({
      id: templateId,
      name: template.name,
      type: template.type,
      channels: template.channels,
      template: {
        title: template.titleTemplate,
        message: template.messageTemplate,
        emailSubject: template.emailSubjectTemplate,
        emailBody: template.emailBodyTemplate,
        pushTitle: template.pushTitleTemplate,
        pushBody: template.pushBodyTemplate,
      },
      variables: template.variables,
      conditions: template.conditions,
      createdAt: Date.now(),
      updatedAt: Date.now(),
    });

    return templateId;
  }

  async sendTemplatedNotification(
    templateId: string,
    recipientId: string,
    variables: Record<string, any>,
    options: NotificationOptions = {}
  ): Promise<void> {
    const template = await this.getNotificationTemplate(templateId);
    if (!template) {
      throw new Error("Notification template not found");
    }

    // Check conditions if specified
    if (
      template.conditions &&
      !this.evaluateConditions(template.conditions, variables)
    ) {
      console.log(`Notification conditions not met for template ${templateId}`);
      return;
    }

    // Render template with variables
    const rendered = this.renderTemplate(template, variables);

    const notification: NotificationData = {
      id: this.generateNotificationId(),
      recipientId,
      type: template.type,
      title: rendered.title,
      message: rendered.message,
      data: options.data || {},
      priority: options.priority || "normal",
      timestamp: Date.now(),
      templateId,
      channels: options.channels || template.channels,
      actions: options.actions || [],
    };

    await this.sendNotification(notification);
  }

  // Notification grouping and batching
  async createNotificationGroup(
    groupConfig: NotificationGroupConfig
  ): Promise<void> {
    const groupId = this.generateGroupId();

    // Store group configuration
    await this.redis.setex(
      `notification_group:${groupId}`,
      groupConfig.batchWindow,
      JSON.stringify({
        ...groupConfig,
        notifications: [],
        createdAt: Date.now(),
      })
    );

    // Schedule batch processing
    setTimeout(() => {
      this.processBatchedNotifications(groupId);
    }, groupConfig.batchWindow * 1000);
  }

  private async processBatchedNotifications(groupId: string): Promise<void> {
    const groupData = await this.redis.get(`notification_group:${groupId}`);
    if (!groupData) return;

    const group = JSON.parse(groupData);

    if (group.notifications.length === 0) return;

    // Create batched notification
    const batchedNotification: NotificationData = {
      id: this.generateNotificationId(),
      recipientId: group.recipientId,
      type: "batched",
      title: this.generateBatchedTitle(group),
      message: this.generateBatchedMessage(group),
      data: {
        notifications: group.notifications,
        groupType: group.type,
      },
      priority: this.calculateBatchPriority(group.notifications),
      timestamp: Date.now(),
    };

    await this.sendNotification(batchedNotification);

    // Cleanup group
    await this.redis.del(`notification_group:${groupId}`);
  }

  // Smart notification timing
  async scheduleNotification(
    notification: NotificationData,
    deliveryTime: Date
  ): Promise<string> {
    const scheduleId = this.generateScheduleId();

    // Store scheduled notification
    await this.database.scheduledNotifications.create({
      id: scheduleId,
      notification,
      deliveryTime,
      status: "pending",
      createdAt: Date.now(),
    });

    // Calculate delay
    const delay = deliveryTime.getTime() - Date.now();

    if (delay > 0) {
      // Use job queue for reliability
      await this.notificationQueue.add(
        "send_scheduled_notification",
        { scheduleId },
        { delay }
      );
    } else {
      // Send immediately if time has passed
      await this.sendNotification(notification);
    }

    return scheduleId;
  }

  // Intelligent notification frequency management
  async manageNotificationFrequency(userId: string): Promise<void> {
    const recentNotifications = await this.getRecentNotifications(userId, 24); // Last 24 hours

    if (recentNotifications.length > 20) {
      // Too many notifications
      // Implement intelligent throttling
      await this.enableNotificationThrottling(userId, {
        throttleLevel: "high",
        batchSimilar: true,
        suppressLowPriority: true,
        digestMode: true,
      });
    } else if (recentNotifications.length < 3) {
      // Too few notifications
      // User might be missing important updates
      await this.suggestNotificationSettingsOptimization(userId);
    }
  }

  // A/B testing for notifications
  async createNotificationExperiment(
    experiment: NotificationExperiment
  ): Promise<string> {
    const experimentId = this.generateExperimentId();

    await this.database.notificationExperiments.create({
      id: experimentId,
      name: experiment.name,
      variants: experiment.variants,
      targetSegment: experiment.targetSegment,
      metrics: experiment.metrics,
      startDate: experiment.startDate,
      endDate: experiment.endDate,
      status: "active",
    });

    return experimentId;
  }

  async sendExperimentNotification(
    experimentId: string,
    baseNotification: NotificationData
  ): Promise<void> {
    const experiment = await this.getNotificationExperiment(experimentId);
    if (!experiment || experiment.status !== "active") return;

    // Determine which variant to use for this user
    const variant = this.selectExperimentVariant(
      experiment,
      baseNotification.recipientId
    );

    // Apply variant modifications
    const modifiedNotification = this.applyVariantModifications(
      baseNotification,
      variant
    );

    // Track experiment participation
    await this.trackExperimentParticipation(
      experimentId,
      variant.id,
      baseNotification.recipientId
    );

    // Send notification
    await this.sendNotification(modifiedNotification);
  }

  // Notification analytics and insights
  async generateNotificationAnalytics(
    timeframe: string
  ): Promise<NotificationAnalytics> {
    const analytics = await this.database.query(`
      SELECT 
        n.type,
        n.priority,
        n.channel,
        COUNT(*) as sent_count,
        SUM(CASE WHEN nd.delivered_at IS NOT NULL THEN 1 ELSE 0 END) as delivered_count,
        SUM(CASE WHEN na.action_taken_at IS NOT NULL THEN 1 ELSE 0 END) as interaction_count,
        AVG(EXTRACT(EPOCH FROM (nd.delivered_at - n.created_at))) as avg_delivery_time
      FROM notifications n
      LEFT JOIN notification_deliveries nd ON n.id = nd.notification_id
      LEFT JOIN notification_actions na ON n.id = na.notification_id
      WHERE n.created_at >= NOW() - INTERVAL '${timeframe}'
      GROUP BY n.type, n.priority, n.channel
    `);

    return {
      totalSent: analytics.reduce((sum, row) => sum + row.sent_count, 0),
      totalDelivered: analytics.reduce(
        (sum, row) => sum + row.delivered_count,
        0
      ),
      totalInteractions: analytics.reduce(
        (sum, row) => sum + row.interaction_count,
        0
      ),
      deliveryRate:
        analytics.reduce((sum, row) => sum + row.delivered_count, 0) /
        analytics.reduce((sum, row) => sum + row.sent_count, 0),
      interactionRate:
        analytics.reduce((sum, row) => sum + row.interaction_count, 0) /
        analytics.reduce((sum, row) => sum + row.delivered_count, 0),
      avgDeliveryTime:
        analytics.reduce((sum, row) => sum + (row.avg_delivery_time || 0), 0) /
        analytics.length,
      breakdownByType: analytics.reduce((acc, row) => {
        if (!acc[row.type])
          acc[row.type] = { sent: 0, delivered: 0, interactions: 0 };
        acc[row.type].sent += row.sent_count;
        acc[row.type].delivered += row.delivered_count;
        acc[row.type].interactions += row.interaction_count;
        return acc;
      }, {}),
      breakdownByPriority: analytics.reduce((acc, row) => {
        if (!acc[row.priority])
          acc[row.priority] = { sent: 0, delivered: 0, interactions: 0 };
        acc[row.priority].sent += row.sent_count;
        acc[row.priority].delivered += row.delivered_count;
        acc[row.priority].interactions += row.interaction_count;
        return acc;
      }, {}),
      breakdownByChannel: analytics.reduce((acc, row) => {
        if (!acc[row.channel])
          acc[row.channel] = { sent: 0, delivered: 0, interactions: 0 };
        acc[row.channel].sent += row.sent_count;
        acc[row.channel].delivered += row.delivered_count;
        acc[row.channel].interactions += row.interaction_count;
        return acc;
      }, {}),
    };
  }

  // Smart notification preferences learning
  async learnUserPreferences(userId: string): Promise<void> {
    const userBehavior = await this.analyzeUserNotificationBehavior(userId);

    const recommendedSettings: Partial<UserNotificationSettings> = {};

    // Analyze interaction patterns
    if (userBehavior.emailInteractionRate < 0.1) {
      recommendedSettings.emailNotifications = false;
    }

    if (userBehavior.pushInteractionRate > 0.3) {
      recommendedSettings.pushNotifications = true;
    }

    // Analyze timing preferences
    const preferredHours = userBehavior.mostActiveHours;
    if (preferredHours.length > 0) {
      recommendedSettings.quietHours = {
        start: Math.max(...preferredHours) + 1,
        end: Math.min(...preferredHours) - 1,
      };
    }

    // Analyze content preferences
    const preferredTypes = userBehavior.mostEngagingTypes;
    recommendedSettings.notificationTypes = preferredTypes.reduce(
      (acc, type) => {
        acc[type] = true;
        return acc;
      },
      {}
    );

    // Store recommendations
    await this.storeNotificationRecommendations(userId, recommendedSettings);
  }
}

// Notification interfaces
interface NotificationData {
  id: string;
  recipientId: string;
  type: string;
  title: string;
  message: string;
  data: Record<string, any>;
  priority: "low" | "normal" | "high" | "critical";
  timestamp: number;
  templateId?: string;
  channels?: NotificationChannel[];
  actions?: NotificationAction[];
}

interface NotificationTemplate {
  name: string;
  type: string;
  channels: NotificationChannel[];
  titleTemplate: string;
  messageTemplate: string;
  emailSubjectTemplate?: string;
  emailBodyTemplate?: string;
  pushTitleTemplate?: string;
  pushBodyTemplate?: string;
  variables: string[];
  conditions?: Record<string, any>;
}

interface NotificationExperiment {
  name: string;
  variants: NotificationVariant[];
  targetSegment: string;
  metrics: string[];
  startDate: Date;
  endDate: Date;
}

interface NotificationAnalytics {
  totalSent: number;
  totalDelivered: number;
  totalInteractions: number;
  deliveryRate: number;
  interactionRate: number;
  avgDeliveryTime: number;
  breakdownByType: Record<string, any>;
  breakdownByPriority: Record<string, any>;
  breakdownByChannel: Record<string, any>;
}

type NotificationChannel = "realtime" | "push" | "email" | "sms";

Connection Management and Scaling: The Infrastructure Reality

Horizontal Scaling for WebSocket Systems

// Production-ready WebSocket scaling architecture
class WebSocketScalingManager {
  private loadBalancer: LoadBalancer;
  private serverNodes: Map<string, ServerNode> = new Map();
  private redis: Redis;
  private connectionRouter: ConnectionRouter;
  private healthMonitor: HealthMonitor;

  constructor() {
    this.loadBalancer = new LoadBalancer();
    this.redis = new Redis(process.env.REDIS_CLUSTER_URL);
    this.connectionRouter = new ConnectionRouter(this.redis);
    this.healthMonitor = new HealthMonitor();

    this.setupLoadBalancing();
    this.setupConnectionRouting();
    this.setupAutoScaling();
    this.startHealthMonitoring();
  }

  // Sticky session load balancing for WebSocket connections
  private setupLoadBalancing(): void {
    this.loadBalancer.configure({
      algorithm: 'sticky_session', // Ensure user stays on same server
      healthCheck: {
        interval: 10000, // 10 seconds
        timeout: 5000,   // 5 seconds
        retries: 3
      },
      failover: {
        enabled: true,
        redistributeConnections: true
      }
    });
  }

  // Cross-server message routing
  private setupConnectionRouting(): void {
    // Subscribe to cross-server communication channel
    this.redis.subscribe('websocket:broadcast', (message) => {
      const data = JSON.parse(message);
      this.routeMessageToLocalConnections(data);
    });

    this.redis.subscribe('websocket:user_message', (message) => {
      const data = JSON.parse(message);
      this.routeMessageToSpecificUser(data);
    });

    this.redis.subscribe('websocket:room_message', (message) => {
      const data = JSON.parse(message);
      this.routeMessageToRoom(data);
    });
  }

  // Auto-scaling based on connection metrics
  private setupAutoScaling(): void {
    setInterval(async () => {
      const metrics = await this.getClusterMetrics();

      if (this.shouldScaleUp(metrics)) {
        await this.scaleUp();
      } else if (this.shouldScaleDown(metrics)) {
        await this.scaleDown();
      }
    }, 30000); // Check every 30 seconds
  }

  private shouldScaleUp(metrics: ClusterMetrics): boolean {
    return (
      metrics.avgConnectionsPerServer > 8000 || // High connection density
      metrics.avgCpuUsage > 70 ||                // High CPU usage
      metrics.avgMemoryUsage > 80 ||             // High memory usage
      metrics.messageQueueBacklog > 1000         // Message backlog
    );
  }

  private shouldScaleDown(metrics: ClusterMetrics): boolean {
    return (
      metrics.avgConnectionsPerServer < 2000 &&  // Low connection density
      metrics.avgCpuUsage < 30 &&                // Low CPU usage
      metrics.avgMemoryUsage < 40 &&             // Low memory usage
      metrics.activeServerCount > 2              // Keep minimum 2 servers
    );
  }

  async scaleUp(): Promise<void> {
    console.log('Scaling up WebSocket cluster...');

    try {
      // Launch new server instance
      const newServer = await this.launchServerInstance();

      // Register with load balancer
      await this.loadBalancer.registerServer(newServer);

      // Add to server nodes
      this.serverNodes.set(newServer.id, newServer);

      // Update Redis cluster information
      await this.redis.sadd('websocket:servers', newServer.id);
      await this.redis.hset('websocket:server_info', newServer.id, JSON.stringify({
        address: newServer.address,
        port: newServer.port,
        capacity: newServer.capacity,
        startedAt: Date.now()
      }));

      console.log(`New WebSocket server launched: ${newServer.id}`);

    } catch (error) {
      console.error('Failed to scale up:', error);
    }
  }

  async scaleDown(): Promise<void> {
    console.log('Scaling down WebSocket cluster...');

    try {
      // Find server with least connections
      const serverToRemove = await this.findServerWithLeastConnections();

      if (!serverToRemove) return;

      // Gracefully drain connections
      await this.drainServerConnections(serverToRemove.id);

      // Remove from load balancer
      await this.loadBalancer.deregisterServer(serverToRemove);

      // Terminate server instance
      await this.terminateServerInstance(serverToRemove.id);

      // Cleanup
      this.serverNodes.delete(serverToRemove.id);
      await this.redis.srem('websocket:servers', serverToRemove.id);
      await this.redis.hdel('websocket:server_info', serverToRemove.id);

      console.log(`WebSocket server terminated: ${serverToRemove.id}`);

    } catch (error) {
      console.error('Failed to scale down:', error);
    }
  }

  // Connection migration for server maintenance
  async migrateConnections(fromServerId: string, toServerId: string): Promise<void> {
    console.log(`Migrating connections from ${fromServerId} to ${toServerId}`);

    const connections = await this.getServerConnections(fromServerId);

    for (const connection of connections) {
      try {
        // Store connection state in Redis
        await this.redis.hset(`connection:${connection.userId}`, {
          state: JSON.stringify(connection.state),
          rooms: JSON.stringify(Array.from(connection.rooms)),
          lastActivity: connection.lastActivity
        });

        // Send migration message to client
        await this.sendMigrationMessage(connection, toServerId);

        // Update connection routing
        await this.redis.hset('user:server_mapping', connection.userId, toServerId);

      } catch (error) {
        console.error(`Failed to migrate connection ${connection.userId}:`, error);
      }
    }
  }

  private async sendMigrationMessage(connection: Connection, newServerId: string): Promise<void> {
    const serverInfo = this.serverNodes.get(newServerId);
    if (!serverInfo) return;

    const migrationData = {
      type: 'server_migration',
      newServerUrl: `wss://${serverInfo.address}:${serverInfo.port}`,
      migrationToken: await this.generateMigrationToken(connection.userId),
      timestamp: Date.now()
    };

    // Send through current connection
    connection.socket.emit('migrate', migrationData);
  }

  // Geographic distribution for global applications
  class GeographicLoadBalancer {
    private regions: Map<string, RegionInfo> = new Map();
    private geoIpService: GeoIPService;

    constructor() {
      this.geoIpService = new GeoIPService();
      this.setupRegions();
    }

    private setupRegions(): void {
      // Configure geographic regions
      this.regions.set('us-east', {
        servers: ['ws-us-east-1', 'ws-us-east-2'],
        latencyWeight: 1.0,
        capacityWeight: 0.8
      });

      this.regions.set('eu-west', {
        servers: ['ws-eu-west-1', 'ws-eu-west-2'],
        latencyWeight: 1.0,
        capacityWeight: 0.9
      });

      this.regions.set('ap-southeast', {
        servers: ['ws-ap-se-1', 'ws-ap-se-2'],
        latencyWeight: 1.0,
        capacityWeight: 0.7
      });
    }

    async routeConnection(clientIp: string): Promise<string> {
      // Get client location
      const location = await this.geoIpService.getLocation(clientIp);

      // Find closest region
      const closestRegion = this.findClosestRegion(location);

      // Get region info
      const regionInfo = this.regions.get(closestRegion);
      if (!regionInfo) {
        throw new Error('No available region');
      }

      // Select best server in region
      const selectedServer = await this.selectServerInRegion(regionInfo);

      return selectedServer;
    }

    private findClosestRegion(location: GeoLocation): string {
      let closestRegion = 'us-east';
      let minDistance = Infinity;

      for (const [regionId, regionInfo] of this.regions.entries()) {
        const distance = this.calculateDistance(location, regionInfo.location);
        if (distance < minDistance) {
          minDistance = distance;
          closestRegion = regionId;
        }
      }

      return closestRegion;
    }

    private async selectServerInRegion(regionInfo: RegionInfo): Promise<string> {
      const serverMetrics = await Promise.all(
        regionInfo.servers.map(serverId => this.getServerMetrics(serverId))
      );

      // Calculate scores based on latency and capacity
      const scores = serverMetrics.map((metrics, index) => ({
        serverId: regionInfo.servers[index],
        score: this.calculateServerScore(metrics, regionInfo)
      }));

      // Select server with highest score
      const bestServer = scores.reduce((best, current) =>
        current.score > best.score ? current : best
      );

      return bestServer.serverId;
    }

    private calculateServerScore(metrics: ServerMetrics, regionInfo: RegionInfo): number {
      const latencyScore = (1 - metrics.averageLatency / 1000) * regionInfo.latencyWeight;
      const capacityScore = (1 - metrics.connectionUtilization) * regionInfo.capacityWeight;

      return latencyScore + capacityScore;
    }
  }

  // Connection state persistence for reliability
  async persistConnectionState(connectionId: string, state: ConnectionState): Promise<void> {
    const stateData = {
      userId: state.userId,
      rooms: Array.from(state.rooms),
      lastActivity: state.lastActivity,
      metadata: state.metadata,
      subscriptions: Array.from(state.subscriptions),
      queuedMessages: state.queuedMessages
    };

    await this.redis.setex(
      `connection_state:${connectionId}`,
      3600, // 1 hour TTL
      JSON.stringify(stateData)
    );
  }

  async restoreConnectionState(connectionId: string): Promise<ConnectionState | null> {
    const stateData = await this.redis.get(`connection_state:${connectionId}`);

    if (!stateData) return null;

    const parsed = JSON.parse(stateData);

    return {
      userId: parsed.userId,
      rooms: new Set(parsed.rooms),
      lastActivity: parsed.lastActivity,
      metadata: parsed.metadata,
      subscriptions: new Set(parsed.subscriptions),
      queuedMessages: parsed.queuedMessages || []
    };
  }

  // Connection monitoring and alerting
  private startHealthMonitoring(): void {
    this.healthMonitor.on('server_unhealthy', async (serverId: string) => {
      console.log(`Server ${serverId} is unhealthy, initiating failover...`);
      await this.handleServerFailure(serverId);
    });

    this.healthMonitor.on('high_latency_detected', async (serverId: string, latency: number) => {
      console.log(`High latency detected on server ${serverId}: ${latency}ms`);
      await this.investigateLatencyIssue(serverId);
    });

    this.healthMonitor.on('connection_spike', async (serverId: string, connectionCount: number) => {
      console.log(`Connection spike on server ${serverId}: ${connectionCount} connections`);
      await this.handleConnectionSpike(serverId);
    });
  }

  private async handleServerFailure(serverId: string): Promise<void> {
    // Get all connections on failed server
    const connections = await this.getServerConnections(serverId);

    // Find healthy servers for migration
    const healthyServers = await this.getHealthyServers();

    if (healthyServers.length === 0) {
      console.error('No healthy servers available for failover!');
      return;
    }

    // Distribute connections across healthy servers
    const connectionsPerServer = Math.ceil(connections.length / healthyServers.length);

    for (let i = 0; i < connections.length; i++) {
      const targetServerIndex = Math.floor(i / connectionsPerServer);
      const targetServer = healthyServers[targetServerIndex];

      await this.migrateConnection(connections[i], serverId, targetServer.id);
    }

    // Remove failed server from rotation
    await this.loadBalancer.deregisterServer({ id: serverId });

    // Alert operations team
    await this.alertOperationsTeam('server_failure', {
      serverId,
      connectionCount: connections.length,
      migratedTo: healthyServers.map(s => s.id)
    });
  }
}

// Scaling interfaces
interface ClusterMetrics {
  avgConnectionsPerServer: number;
  avgCpuUsage: number;
  avgMemoryUsage: number;
  messageQueueBacklog: number;
  activeServerCount: number;
}

interface ServerNode {
  id: string;
  address: string;
  port: number;
  capacity: number;
  region?: string;
}

interface Connection {
  userId: string;
  socket: any;
  state: ConnectionState;
  rooms: Set<string>;
  lastActivity: number;
}

interface ConnectionState {
  userId: string;
  rooms: Set<string>;
  lastActivity: number;
  metadata: Record<string, any>;
  subscriptions: Set<string>;
  queuedMessages: any[];
}

interface RegionInfo {
  servers: string[];
  latencyWeight: number;
  capacityWeight: number;
  location?: GeoLocation;
}

interface GeoLocation {
  latitude: number;
  longitude: number;
  country: string;
  city: string;
}

Key Takeaways

Real-time communication isn’t just about choosing WebSockets over HTTP polling. It’s about building robust, scalable systems that can handle thousands of concurrent connections while maintaining performance and reliability.

Essential real-time patterns:

  • WebSocket lifecycle management with proper connection handling, reconnection logic, and heartbeat monitoring
  • Message routing and broadcasting with room management and efficient message distribution
  • Connection scaling with sticky sessions, geographic distribution, and automatic failover
  • State management with persistence, migration, and recovery mechanisms
  • Performance monitoring with connection metrics, latency tracking, and capacity planning

The real-time architecture decision framework:

  • Use plain WebSockets when you need maximum control and minimal overhead
  • Use Socket.io when you need developer productivity and automatic fallbacks
  • Use Server-Sent Events for one-way real-time updates (we’ll cover this in part 2)
  • Use message queues for reliable delivery and horizontal scaling
  • Use geographic distribution for global applications with latency requirements

Connection management best practices:

  • Always implement reconnection logic with exponential backoff
  • Use heartbeat/ping mechanisms to detect connection health
  • Persist connection state for seamless failover scenarios
  • Monitor connection metrics to detect issues before they become problems
  • Plan for scaling from day one with proper load balancing strategies

The production reality checklist:

  • ✅ WebSocket connections have proper authentication and authorization
  • ✅ Reconnection logic handles network failures gracefully
  • ✅ Message delivery is reliable with queuing for offline users
  • ✅ Connection scaling is automatic based on real metrics
  • ✅ Geographic routing minimizes latency for global users
  • ✅ State persistence enables seamless failover
  • ✅ Monitoring alerts on connection issues before users complain

What’s Next?

In the next blog, we’ll complete our real-time communications journey by diving into Server-Sent Events, WebRTC for peer-to-peer communication, real-time collaboration features, and advanced data synchronization patterns.

We’ll also explore the operational challenges of maintaining real-time systems at scale—from debugging connection issues to optimizing for mobile networks and handling the inevitable moments when everything goes wrong.

Because building real-time features that work in your development environment is just the beginning. Making them work reliably for thousands of users across different networks, devices, and time zones—that’s where the real engineering challenges lie.