Real-time Communications - 1/2
The $200,000 Real-time Disaster That Made Headlines
Picture this catastrophe: A major gaming company launches their highly anticipated multiplayer battle royale game. Within hours, 500,000 players flood their servers. The game has stunning graphics, incredible gameplay mechanics, and one fatal flaw—their real-time communication system.
Players start experiencing:
- 5-10 second delays in seeing other players’ movements
- Messages in team chat arriving 30 seconds late
- Players appearing to teleport across the map
- Complete disconnections during crucial moments
- Server crashes every 20 minutes during peak hours
By day three, their player base had dropped to 50,000. Streamers were mocking the game live on Twitch. Reddit was filled with memes about “the lag simulator.” The company’s stock dropped 15%.
The devastating part? Their backend could handle the load. Their database was optimized. Their API responses were sub-100ms. But they had built their real-time system using HTTP polling every 500ms instead of proper WebSocket connections.
Within two weeks, they had to shut down servers, refund millions in pre-orders, and lay off 30% of their development team. All because they treated real-time communication like regular REST API calls.
The Uncomfortable Truth About Real-time Systems
Here’s what separates applications that feel responsive from those that feel like they’re running underwater: Real-time communication isn’t just about speed—it’s about building persistent, bidirectional connections that can handle thousands of concurrent users without breaking a sweat.
Most developers approach real-time features like this:
- Start with HTTP polling (“just check every second”)
- Realize polling is inefficient
- Try to optimize by polling less frequently
- Users complain about lag
- Switch to WebSockets without understanding the architecture
- Experience connection drops and memory leaks
- Panic and go back to polling
But experienced developers build real-time systems differently:
- Design for persistent connections from day one
- Implement proper connection lifecycle management
- Build in reconnection and state synchronization
- Plan for horizontal scaling across multiple servers
- Monitor connection health and performance continuously
The difference isn’t just user experience—it’s the difference between systems that can support real-time collaboration at scale and systems that collapse under the weight of their own connection overhead.
Ready to build real-time features that work like Discord instead of that group chat app you built in college? Let’s dive into the world of WebSockets, connection management, and the architecture patterns that power real-time applications.
WebSocket Fundamentals: Beyond Request-Response
Understanding the WebSocket Protocol
WebSockets aren’t just “fast HTTP”—they’re a completely different communication paradigm that enables truly bidirectional communication.
// Poor real-time implementation with HTTP polling
class BadRealTimeService {
private pollInterval: NodeJS.Timer;
private lastMessageId: string = "";
startPolling(userId: string): void {
// This approach will kill your servers and frustrate users
this.pollInterval = setInterval(async () => {
try {
const messages = await fetch(
`/api/messages?after=${this.lastMessageId}&userId=${userId}`
);
const newMessages = await messages.json();
if (newMessages.length > 0) {
this.lastMessageId = newMessages[newMessages.length - 1].id;
this.displayMessages(newMessages);
}
} catch (error) {
console.error("Polling failed:", error);
// What happens when the network is slow? More requests pile up!
}
}, 500); // Every 500ms = 2 requests per second per user
// 10,000 users = 20,000 requests per second just for chat!
}
stopPolling(): void {
clearInterval(this.pollInterval);
}
sendMessage(message: string): void {
// Still need HTTP for sending
fetch("/api/messages", {
method: "POST",
body: JSON.stringify({ message }),
headers: { "Content-Type": "application/json" },
});
}
}
// Proper WebSocket implementation
class WebSocketRealTimeService {
private ws: WebSocket | null = null;
private reconnectAttempts: number = 0;
private maxReconnectAttempts: number = 5;
private reconnectDelay: number = 1000;
private heartbeatInterval: NodeJS.Timer | null = null;
private messageQueue: QueuedMessage[] = [];
private connectionState:
| "connecting"
| "connected"
| "disconnected"
| "reconnecting" = "disconnected";
async connect(userId: string, token: string): Promise<void> {
if (this.ws?.readyState === WebSocket.OPEN) {
return; // Already connected
}
this.connectionState = "connecting";
try {
// Establish WebSocket connection with authentication
this.ws = new WebSocket(
`wss://api.example.com/ws?token=${token}&userId=${userId}`
);
this.setupEventHandlers();
// Wait for connection to establish
await this.waitForConnection();
this.connectionState = "connected";
this.reconnectAttempts = 0;
// Start heartbeat to keep connection alive
this.startHeartbeat();
// Process any queued messages
await this.processMessageQueue();
console.log("WebSocket connected successfully");
} catch (error) {
console.error("WebSocket connection failed:", error);
this.handleConnectionError();
}
}
private setupEventHandlers(): void {
if (!this.ws) return;
this.ws.onopen = (event) => {
console.log("WebSocket connection opened");
this.onConnectionOpen(event);
};
this.ws.onmessage = (event) => {
this.handleIncomingMessage(event.data);
};
this.ws.onerror = (error) => {
console.error("WebSocket error:", error);
this.handleConnectionError();
};
this.ws.onclose = (event) => {
console.log("WebSocket connection closed:", event.code, event.reason);
this.handleConnectionClose(event);
};
}
private handleIncomingMessage(data: string): void {
try {
const message = JSON.parse(data);
switch (message.type) {
case "chat_message":
this.onChatMessage(message.data);
break;
case "user_joined":
this.onUserJoined(message.data);
break;
case "user_left":
this.onUserLeft(message.data);
break;
case "typing_indicator":
this.onTypingIndicator(message.data);
break;
case "heartbeat_response":
this.onHeartbeatResponse(message.data);
break;
case "error":
this.onServerError(message.data);
break;
default:
console.warn("Unknown message type:", message.type);
}
} catch (error) {
console.error("Failed to parse WebSocket message:", error, data);
}
}
sendMessage(type: string, data: any): void {
const message = {
id: this.generateMessageId(),
type,
data,
timestamp: Date.now(),
};
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
// Queue message if connection is not ready
this.messageQueue.push(message);
// Attempt to reconnect if disconnected
if (this.connectionState === "disconnected") {
this.reconnect();
}
}
}
// Intelligent reconnection with exponential backoff
private async reconnect(): Promise<void> {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
this.onMaxReconnectAttemptsReached();
return;
}
this.connectionState = "reconnecting";
this.reconnectAttempts++;
const delay = Math.min(
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1),
30000 // Max 30 seconds
);
console.log(
`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`
);
setTimeout(() => {
this.connect(this.userId, this.token);
}, delay);
}
// Connection health monitoring
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.sendMessage("heartbeat", { timestamp: Date.now() });
}
}, 30000); // Send heartbeat every 30 seconds
}
private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
// Graceful disconnection
disconnect(): void {
this.stopHeartbeat();
if (this.ws) {
this.ws.close(1000, "Client disconnecting");
this.ws = null;
}
this.connectionState = "disconnected";
}
// Connection state management
getConnectionState(): string {
return this.connectionState;
}
isConnected(): boolean {
return (
this.ws?.readyState === WebSocket.OPEN &&
this.connectionState === "connected"
);
}
}
interface QueuedMessage {
id: string;
type: string;
data: any;
timestamp: number;
}
Server-Side WebSocket Implementation
// Production-ready WebSocket server with Node.js
import WebSocket from "ws";
import { createServer } from "http";
import { parse as parseUrl } from "url";
import jwt from "jsonwebtoken";
class WebSocketServer {
private wss: WebSocket.Server;
private connections: Map<string, ConnectionInfo> = new Map();
private rooms: Map<string, Set<string>> = new Map();
private messageHandlers: Map<string, MessageHandler> = new Map();
constructor(port: number = 8080) {
const server = createServer();
this.wss = new WebSocket.Server({
server,
verifyClient: this.verifyClient.bind(this),
});
this.setupMessageHandlers();
this.setupConnectionHandlers();
server.listen(port, () => {
console.log(`WebSocket server listening on port ${port}`);
});
// Cleanup inactive connections
setInterval(() => this.cleanupConnections(), 30000);
}
private verifyClient(info: any): boolean {
const { query } = parseUrl(info.req.url, true);
const token = query.token as string;
if (!token) {
console.log("WebSocket connection rejected: No token provided");
return false;
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET!) as any;
info.req.userId = decoded.userId;
return true;
} catch (error) {
console.log("WebSocket connection rejected: Invalid token");
return false;
}
}
private setupConnectionHandlers(): void {
this.wss.on("connection", (ws: WebSocket, req: any) => {
const userId = req.userId;
const connectionId = this.generateConnectionId();
// Store connection info
const connectionInfo: ConnectionInfo = {
id: connectionId,
userId,
ws,
lastActivity: Date.now(),
rooms: new Set(),
metadata: {},
};
this.connections.set(connectionId, connectionInfo);
console.log(`User ${userId} connected (${connectionId})`);
// Setup message handling
ws.on("message", (data: string) => {
this.handleMessage(connectionId, data);
});
ws.on("close", () => {
this.handleDisconnection(connectionId);
});
ws.on("error", (error) => {
console.error(`WebSocket error for ${connectionId}:`, error);
this.handleDisconnection(connectionId);
});
// Send welcome message
this.sendToConnection(connectionId, {
type: "connection_established",
data: { connectionId, timestamp: Date.now() },
});
});
}
private setupMessageHandlers(): void {
// Chat message handler
this.messageHandlers.set("chat_message", async (connectionId, data) => {
const connection = this.connections.get(connectionId);
if (!connection) return;
// Validate and sanitize message
const sanitizedMessage = this.sanitizeMessage(data.message);
const messageData = {
id: this.generateMessageId(),
userId: connection.userId,
message: sanitizedMessage,
timestamp: Date.now(),
roomId: data.roomId,
};
// Save to database
await this.saveMessage(messageData);
// Broadcast to room
this.broadcastToRoom(data.roomId, {
type: "chat_message",
data: messageData,
});
// Update user activity
connection.lastActivity = Date.now();
});
// Join room handler
this.messageHandlers.set("join_room", (connectionId, data) => {
const connection = this.connections.get(connectionId);
if (!connection) return;
const roomId = data.roomId;
// Add connection to room
this.joinRoom(connectionId, roomId);
// Notify room about new user
this.broadcastToRoom(
roomId,
{
type: "user_joined",
data: {
userId: connection.userId,
roomId,
timestamp: Date.now(),
},
},
[connectionId]
); // Exclude the user who just joined
// Send room state to the new user
const roomUsers = this.getRoomUsers(roomId);
this.sendToConnection(connectionId, {
type: "room_state",
data: {
roomId,
users: roomUsers,
timestamp: Date.now(),
},
});
});
// Leave room handler
this.messageHandlers.set("leave_room", (connectionId, data) => {
const connection = this.connections.get(connectionId);
if (!connection) return;
const roomId = data.roomId;
// Remove from room
this.leaveRoom(connectionId, roomId);
// Notify room
this.broadcastToRoom(roomId, {
type: "user_left",
data: {
userId: connection.userId,
roomId,
timestamp: Date.now(),
},
});
});
// Typing indicator handler
this.messageHandlers.set("typing_start", (connectionId, data) => {
const connection = this.connections.get(connectionId);
if (!connection) return;
this.broadcastToRoom(
data.roomId,
{
type: "typing_indicator",
data: {
userId: connection.userId,
roomId: data.roomId,
isTyping: true,
timestamp: Date.now(),
},
},
[connectionId]
);
});
this.messageHandlers.set("typing_stop", (connectionId, data) => {
const connection = this.connections.get(connectionId);
if (!connection) return;
this.broadcastToRoom(
data.roomId,
{
type: "typing_indicator",
data: {
userId: connection.userId,
roomId: data.roomId,
isTyping: false,
timestamp: Date.now(),
},
},
[connectionId]
);
});
// Heartbeat handler
this.messageHandlers.set("heartbeat", (connectionId, data) => {
const connection = this.connections.get(connectionId);
if (!connection) return;
connection.lastActivity = Date.now();
this.sendToConnection(connectionId, {
type: "heartbeat_response",
data: { timestamp: Date.now() },
});
});
}
private handleMessage(connectionId: string, data: string): void {
try {
const message = JSON.parse(data);
const handler = this.messageHandlers.get(message.type);
if (handler) {
handler(connectionId, message.data);
} else {
console.warn(`Unknown message type: ${message.type}`);
this.sendToConnection(connectionId, {
type: "error",
data: { message: "Unknown message type", type: message.type },
});
}
} catch (error) {
console.error("Failed to parse WebSocket message:", error);
this.sendToConnection(connectionId, {
type: "error",
data: { message: "Invalid message format" },
});
}
}
// Room management
private joinRoom(connectionId: string, roomId: string): void {
const connection = this.connections.get(connectionId);
if (!connection) return;
// Add connection to room
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Set());
}
this.rooms.get(roomId)!.add(connectionId);
connection.rooms.add(roomId);
console.log(`User ${connection.userId} joined room ${roomId}`);
}
private leaveRoom(connectionId: string, roomId: string): void {
const connection = this.connections.get(connectionId);
if (!connection) return;
const room = this.rooms.get(roomId);
if (room) {
room.delete(connectionId);
if (room.size === 0) {
this.rooms.delete(roomId);
}
}
connection.rooms.delete(roomId);
console.log(`User ${connection.userId} left room ${roomId}`);
}
private broadcastToRoom(
roomId: string,
message: any,
excludeConnections: string[] = []
): void {
const room = this.rooms.get(roomId);
if (!room) return;
const messageStr = JSON.stringify(message);
room.forEach((connectionId) => {
if (!excludeConnections.includes(connectionId)) {
const connection = this.connections.get(connectionId);
if (connection && connection.ws.readyState === WebSocket.OPEN) {
connection.ws.send(messageStr);
}
}
});
}
private sendToConnection(connectionId: string, message: any): void {
const connection = this.connections.get(connectionId);
if (connection && connection.ws.readyState === WebSocket.OPEN) {
connection.ws.send(JSON.stringify(message));
}
}
// Connection cleanup
private cleanupConnections(): void {
const now = Date.now();
const staleTimeout = 5 * 60 * 1000; // 5 minutes
for (const [connectionId, connection] of this.connections.entries()) {
if (now - connection.lastActivity > staleTimeout) {
console.log(`Cleaning up stale connection: ${connectionId}`);
this.handleDisconnection(connectionId);
}
}
}
private handleDisconnection(connectionId: string): void {
const connection = this.connections.get(connectionId);
if (!connection) return;
// Leave all rooms
connection.rooms.forEach((roomId) => {
this.leaveRoom(connectionId, roomId);
});
// Remove connection
this.connections.delete(connectionId);
console.log(`User ${connection.userId} disconnected (${connectionId})`);
}
// Utility methods
private getRoomUsers(roomId: string): string[] {
const room = this.rooms.get(roomId);
if (!room) return [];
const users: string[] = [];
room.forEach((connectionId) => {
const connection = this.connections.get(connectionId);
if (connection) {
users.push(connection.userId);
}
});
return users;
}
private generateConnectionId(): string {
return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private generateMessageId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private sanitizeMessage(message: string): string {
// Basic sanitization - in production, use a proper library
return message.trim().slice(0, 1000); // Limit to 1000 characters
}
private async saveMessage(messageData: any): Promise<void> {
// Save to database - implement based on your database choice
try {
await this.database.messages.create(messageData);
} catch (error) {
console.error("Failed to save message to database:", error);
}
}
// Get server statistics
getStats(): ServerStats {
return {
totalConnections: this.connections.size,
totalRooms: this.rooms.size,
connectionsPerRoom: Array.from(this.rooms.entries()).map(
([roomId, connections]) => ({
roomId,
connectionCount: connections.size,
})
),
};
}
}
interface ConnectionInfo {
id: string;
userId: string;
ws: WebSocket;
lastActivity: number;
rooms: Set<string>;
metadata: Record<string, any>;
}
interface ServerStats {
totalConnections: number;
totalRooms: number;
connectionsPerRoom: Array<{ roomId: string; connectionCount: number }>;
}
type MessageHandler = (connectionId: string, data: any) => void;
Socket.io: WebSockets Made Developer-Friendly
Building Robust Real-time Applications
// Socket.io server implementation with advanced features
import { Server, Socket } from "socket.io";
import { createServer } from "http";
import { createClient } from "redis";
import jwt from "jsonwebtoken";
class SocketIOService {
private io: Server;
private redis: any;
private connectedUsers: Map<string, UserConnection> = new Map();
constructor(port: number = 3000) {
const httpServer = createServer();
this.io = new Server(httpServer, {
cors: {
origin: process.env.CLIENT_URL || "http://localhost:3000",
methods: ["GET", "POST"],
},
// Connection handling options
pingTimeout: 60000, // 60 seconds
pingInterval: 25000, // 25 seconds
transports: ["websocket", "polling"],
});
this.setupRedisAdapter();
this.setupAuthentication();
this.setupEventHandlers();
httpServer.listen(port, () => {
console.log(`Socket.io server running on port ${port}`);
});
}
private async setupRedisAdapter(): Promise<void> {
// Redis adapter for horizontal scaling
const pubClient = createClient({
url: process.env.REDIS_URL || "redis://localhost:6379",
});
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
this.redis = pubClient;
// Configure Socket.io to use Redis for multi-server communication
const { createAdapter } = await import("@socket.io/redis-adapter");
this.io.adapter(createAdapter(pubClient, subClient));
}
private setupAuthentication(): void {
this.io.use(async (socket: Socket, next) => {
try {
const token =
socket.handshake.auth.token || socket.handshake.headers.authorization;
if (!token) {
return next(new Error("Authentication token required"));
}
const decoded = jwt.verify(token, process.env.JWT_SECRET!) as any;
// Attach user info to socket
socket.userId = decoded.userId;
socket.userEmail = decoded.email;
socket.userRole = decoded.role;
next();
} catch (error) {
next(new Error("Authentication failed"));
}
});
}
private setupEventHandlers(): void {
this.io.on("connection", (socket: Socket) => {
console.log(`User connected: ${socket.userId} (${socket.id})`);
// Store user connection info
this.connectedUsers.set(socket.userId, {
socketId: socket.id,
userId: socket.userId,
connectedAt: Date.now(),
currentRooms: new Set(),
lastActivity: Date.now(),
});
this.setupUserEventHandlers(socket);
// Handle disconnection
socket.on("disconnect", (reason) => {
this.handleUserDisconnection(socket, reason);
});
});
}
private setupUserEventHandlers(socket: Socket): void {
// Join chat room
socket.on("join_room", async (data: JoinRoomData) => {
try {
const { roomId, roomType } = data;
// Validate user has permission to join room
const canJoin = await this.validateRoomAccess(
socket.userId,
roomId,
roomType
);
if (!canJoin) {
socket.emit("error", { message: "Access denied to room" });
return;
}
// Join the room
await socket.join(roomId);
// Update user connection info
const userConnection = this.connectedUsers.get(socket.userId);
if (userConnection) {
userConnection.currentRooms.add(roomId);
userConnection.lastActivity = Date.now();
}
// Get room info and recent messages
const roomInfo = await this.getRoomInfo(roomId);
const recentMessages = await this.getRecentMessages(roomId, 50);
// Send room data to user
socket.emit("room_joined", {
roomId,
roomInfo,
recentMessages,
onlineUsers: await this.getRoomOnlineUsers(roomId),
});
// Notify other users in room
socket.to(roomId).emit("user_joined", {
userId: socket.userId,
userEmail: socket.userEmail,
timestamp: Date.now(),
});
console.log(`User ${socket.userId} joined room ${roomId}`);
} catch (error) {
console.error("Error joining room:", error);
socket.emit("error", { message: "Failed to join room" });
}
});
// Leave room
socket.on("leave_room", async (data: { roomId: string }) => {
const { roomId } = data;
await socket.leave(roomId);
// Update user connection info
const userConnection = this.connectedUsers.get(socket.userId);
if (userConnection) {
userConnection.currentRooms.delete(roomId);
}
// Notify other users
socket.to(roomId).emit("user_left", {
userId: socket.userId,
timestamp: Date.now(),
});
console.log(`User ${socket.userId} left room ${roomId}`);
});
// Send chat message
socket.on("send_message", async (data: SendMessageData) => {
try {
const { roomId, message, messageType = "text", metadata = {} } = data;
// Validate message
if (!message || message.trim().length === 0) {
socket.emit("error", { message: "Message cannot be empty" });
return;
}
if (message.length > 2000) {
socket.emit("error", { message: "Message too long" });
return;
}
// Check if user is in room
const rooms = Array.from(socket.rooms);
if (!rooms.includes(roomId)) {
socket.emit("error", { message: "You are not in this room" });
return;
}
// Rate limiting check
const canSend = await this.checkRateLimit(socket.userId);
if (!canSend) {
socket.emit("error", {
message: "Rate limit exceeded. Please slow down.",
});
return;
}
// Create message object
const messageData: ChatMessage = {
id: this.generateMessageId(),
roomId,
userId: socket.userId,
userEmail: socket.userEmail,
message: this.sanitizeMessage(message),
messageType,
metadata,
timestamp: Date.now(),
edited: false,
reactions: {},
};
// Save to database
await this.saveMessage(messageData);
// Broadcast to room
this.io.to(roomId).emit("new_message", messageData);
// Update user activity
const userConnection = this.connectedUsers.get(socket.userId);
if (userConnection) {
userConnection.lastActivity = Date.now();
}
console.log(`Message sent in room ${roomId} by user ${socket.userId}`);
} catch (error) {
console.error("Error sending message:", error);
socket.emit("error", { message: "Failed to send message" });
}
});
// Typing indicators
socket.on("typing_start", (data: { roomId: string }) => {
socket.to(data.roomId).emit("user_typing", {
userId: socket.userId,
userEmail: socket.userEmail,
isTyping: true,
});
});
socket.on("typing_stop", (data: { roomId: string }) => {
socket.to(data.roomId).emit("user_typing", {
userId: socket.userId,
userEmail: socket.userEmail,
isTyping: false,
});
});
// Message reactions
socket.on("add_reaction", async (data: AddReactionData) => {
try {
const { messageId, reaction } = data;
// Validate reaction
const validReactions = ["👍", "👎", "❤️", "😂", "😮", "😢", "😡"];
if (!validReactions.includes(reaction)) {
socket.emit("error", { message: "Invalid reaction" });
return;
}
// Update message with reaction
const updated = await this.addMessageReaction(
messageId,
socket.userId,
reaction
);
if (updated) {
// Get the room ID for this message
const message = await this.getMessage(messageId);
if (message) {
this.io.to(message.roomId).emit("message_reaction_added", {
messageId,
userId: socket.userId,
reaction,
timestamp: Date.now(),
});
}
}
} catch (error) {
console.error("Error adding reaction:", error);
socket.emit("error", { message: "Failed to add reaction" });
}
});
// Private messaging
socket.on("send_private_message", async (data: PrivateMessageData) => {
try {
const { recipientId, message } = data;
// Validate recipient exists and is online
const recipientConnection = this.connectedUsers.get(recipientId);
if (!recipientConnection) {
socket.emit("error", { message: "User is not online" });
return;
}
const messageData: PrivateMessage = {
id: this.generateMessageId(),
senderId: socket.userId,
recipientId,
message: this.sanitizeMessage(message),
timestamp: Date.now(),
read: false,
};
// Save to database
await this.savePrivateMessage(messageData);
// Send to recipient
this.io
.to(recipientConnection.socketId)
.emit("private_message", messageData);
// Confirm to sender
socket.emit("private_message_sent", {
messageId: messageData.id,
recipientId,
timestamp: messageData.timestamp,
});
} catch (error) {
console.error("Error sending private message:", error);
socket.emit("error", { message: "Failed to send private message" });
}
});
// Activity tracking
socket.on("activity", () => {
const userConnection = this.connectedUsers.get(socket.userId);
if (userConnection) {
userConnection.lastActivity = Date.now();
}
});
}
private handleUserDisconnection(socket: Socket, reason: string): void {
console.log(`User disconnected: ${socket.userId} (${reason})`);
// Get user's current rooms and notify them
const userConnection = this.connectedUsers.get(socket.userId);
if (userConnection) {
userConnection.currentRooms.forEach((roomId) => {
socket.to(roomId).emit("user_left", {
userId: socket.userId,
timestamp: Date.now(),
});
});
}
// Remove from connected users
this.connectedUsers.delete(socket.userId);
}
// Rate limiting implementation
private async checkRateLimit(userId: string): Promise<boolean> {
const key = `rate_limit:${userId}`;
const limit = 10; // 10 messages per minute
const window = 60; // 60 seconds
const count = await this.redis.incr(key);
if (count === 1) {
await this.redis.expire(key, window);
}
return count <= limit;
}
// Utility methods for database operations
private async validateRoomAccess(
userId: string,
roomId: string,
roomType: string
): Promise<boolean> {
// Implement room access validation based on your business logic
return true; // Simplified for example
}
private async getRoomInfo(roomId: string): Promise<RoomInfo> {
// Fetch room information from database
return {
id: roomId,
name: `Room ${roomId}`,
description: "",
createdAt: Date.now(),
memberCount: 0,
};
}
private async getRecentMessages(
roomId: string,
limit: number
): Promise<ChatMessage[]> {
// Fetch recent messages from database
return [];
}
private async getRoomOnlineUsers(roomId: string): Promise<OnlineUser[]> {
// Get list of online users in room
const socketsInRoom = await this.io.in(roomId).fetchSockets();
return socketsInRoom.map((socket) => ({
userId: (socket as any).userId,
userEmail: (socket as any).userEmail,
socketId: socket.id,
}));
}
// Broadcasting methods
broadcastToRoom(roomId: string, event: string, data: any): void {
this.io.to(roomId).emit(event, data);
}
broadcastToUser(userId: string, event: string, data: any): void {
const userConnection = this.connectedUsers.get(userId);
if (userConnection) {
this.io.to(userConnection.socketId).emit(event, data);
}
}
broadcastToAll(event: string, data: any): void {
this.io.emit(event, data);
}
// Server statistics
getServerStats(): ServerStatistics {
return {
connectedUsers: this.connectedUsers.size,
totalRooms: this.io.sockets.adapter.rooms.size,
serverUptime: process.uptime(),
memoryUsage: process.memoryUsage(),
};
}
}
// Interfaces
interface UserConnection {
socketId: string;
userId: string;
connectedAt: number;
currentRooms: Set<string>;
lastActivity: number;
}
interface JoinRoomData {
roomId: string;
roomType: "public" | "private" | "direct";
}
interface SendMessageData {
roomId: string;
message: string;
messageType?: "text" | "image" | "file" | "system";
metadata?: Record<string, any>;
}
interface ChatMessage {
id: string;
roomId: string;
userId: string;
userEmail: string;
message: string;
messageType: string;
metadata: Record<string, any>;
timestamp: number;
edited: boolean;
reactions: Record<string, string[]>; // reaction -> array of user IDs
}
interface AddReactionData {
messageId: string;
reaction: string;
}
interface PrivateMessageData {
recipientId: string;
message: string;
}
interface PrivateMessage {
id: string;
senderId: string;
recipientId: string;
message: string;
timestamp: number;
read: boolean;
}
interface RoomInfo {
id: string;
name: string;
description: string;
createdAt: number;
memberCount: number;
}
interface OnlineUser {
userId: string;
userEmail: string;
socketId: string;
}
interface ServerStatistics {
connectedUsers: number;
totalRooms: number;
serverUptime: number;
memoryUsage: NodeJS.MemoryUsage;
}
Real-time Chat Applications: Beyond Hello World
Building Production-Ready Chat Systems
// Complete chat application backend
class ChatApplicationService {
private socketService: SocketIOService;
private messageService: MessageService;
private userService: UserService;
private notificationService: NotificationService;
private redis: Redis;
constructor() {
this.socketService = new SocketIOService();
this.messageService = new MessageService();
this.userService = new UserService();
this.notificationService = new NotificationService();
this.redis = new Redis(process.env.REDIS_URL);
this.setupAdvancedFeatures();
}
private setupAdvancedFeatures(): void {
this.setupMessageThreads();
this.setupMessageSearch();
this.setupFileSharing();
this.setupVoiceMessages();
this.setupMessageScheduling();
this.setupChatBots();
}
// Message threading system
private setupMessageThreads(): void {
this.socketService.onEvent("start_thread", async (socket, data) => {
const { parentMessageId, message } = data;
try {
// Create thread
const thread = await this.messageService.createThread(parentMessageId, {
creatorId: socket.userId,
initialMessage: message,
});
// Notify room about new thread
const parentMessage = await this.messageService.getMessage(
parentMessageId
);
if (parentMessage) {
this.socketService.broadcastToRoom(
parentMessage.roomId,
"thread_created",
{
threadId: thread.id,
parentMessageId,
createdBy: socket.userId,
timestamp: Date.now(),
}
);
}
socket.emit("thread_created", thread);
} catch (error) {
socket.emit("error", { message: "Failed to create thread" });
}
});
this.socketService.onEvent("reply_to_thread", async (socket, data) => {
const { threadId, message } = data;
try {
const reply = await this.messageService.addThreadReply(threadId, {
userId: socket.userId,
message,
timestamp: Date.now(),
});
// Get thread participants
const participants = await this.messageService.getThreadParticipants(
threadId
);
// Notify all participants
participants.forEach((participantId) => {
if (participantId !== socket.userId) {
this.socketService.broadcastToUser(
participantId,
"thread_reply",
reply
);
}
});
socket.emit("thread_reply_sent", reply);
} catch (error) {
socket.emit("error", { message: "Failed to send thread reply" });
}
});
}
// Advanced message search
private setupMessageSearch(): void {
this.socketService.onEvent("search_messages", async (socket, data) => {
const { query, roomId, filters = {} } = data;
try {
const searchResults = await this.messageService.searchMessages({
query,
roomId,
userId: socket.userId,
filters: {
dateFrom: filters.dateFrom,
dateTo: filters.dateTo,
messageType: filters.messageType,
fromUser: filters.fromUser,
hasAttachments: filters.hasAttachments,
inThread: filters.inThread,
},
limit: filters.limit || 20,
offset: filters.offset || 0,
});
socket.emit("search_results", {
query,
results: searchResults.messages,
totalCount: searchResults.totalCount,
hasMore: searchResults.hasMore,
});
} catch (error) {
socket.emit("error", { message: "Search failed" });
}
});
}
// File sharing with progress tracking
private setupFileSharing(): void {
this.socketService.onEvent("upload_file", async (socket, data) => {
const { roomId, fileName, fileSize, fileType, checksum } = data;
try {
// Validate file
if (fileSize > 100 * 1024 * 1024) {
// 100MB limit
socket.emit("error", { message: "File too large" });
return;
}
// Create upload session
const uploadSession = await this.createUploadSession({
userId: socket.userId,
roomId,
fileName,
fileSize,
fileType,
checksum,
});
socket.emit("upload_session_created", {
sessionId: uploadSession.id,
uploadUrl: uploadSession.uploadUrl,
chunkSize: uploadSession.chunkSize,
});
} catch (error) {
socket.emit("error", { message: "Failed to create upload session" });
}
});
this.socketService.onEvent("upload_progress", async (socket, data) => {
const { sessionId, progress } = data;
try {
await this.updateUploadProgress(sessionId, progress);
// Broadcast progress to room if desired
const session = await this.getUploadSession(sessionId);
if (session && session.broadcastProgress) {
this.socketService.broadcastToRoom(
session.roomId,
"file_upload_progress",
{
sessionId,
fileName: session.fileName,
progress,
uploadedBy: socket.userId,
}
);
}
} catch (error) {
console.error("Failed to update upload progress:", error);
}
});
this.socketService.onEvent("upload_complete", async (socket, data) => {
const { sessionId } = data;
try {
const fileInfo = await this.completeFileUpload(sessionId);
// Create file message
const messageData = {
roomId: fileInfo.roomId,
userId: socket.userId,
messageType: "file",
message: `Shared file: ${fileInfo.fileName}`,
metadata: {
fileName: fileInfo.fileName,
fileSize: fileInfo.fileSize,
fileType: fileInfo.fileType,
fileUrl: fileInfo.url,
thumbnailUrl: fileInfo.thumbnailUrl,
},
};
const savedMessage = await this.messageService.saveMessage(messageData);
this.socketService.broadcastToRoom(
fileInfo.roomId,
"new_message",
savedMessage
);
socket.emit("file_uploaded", {
messageId: savedMessage.id,
fileUrl: fileInfo.url,
});
} catch (error) {
socket.emit("error", { message: "Failed to complete file upload" });
}
});
}
// Voice message support
private setupVoiceMessages(): void {
this.socketService.onEvent("send_voice_message", async (socket, data) => {
const { roomId, audioBlob, duration } = data;
try {
// Process audio (convert format, compress, etc.)
const processedAudio = await this.processVoiceMessage(audioBlob);
// Create voice message
const voiceMessage = {
roomId,
userId: socket.userId,
messageType: "voice",
message: "🎵 Voice message",
metadata: {
audioUrl: processedAudio.url,
duration: duration,
waveform: processedAudio.waveform, // For visual representation
transcription: null, // Can be added later with speech-to-text
},
};
const savedMessage = await this.messageService.saveMessage(
voiceMessage
);
this.socketService.broadcastToRoom(roomId, "new_message", savedMessage);
// Optional: Generate transcription asynchronously
this.generateTranscription(savedMessage.id, processedAudio.url);
} catch (error) {
socket.emit("error", { message: "Failed to send voice message" });
}
});
}
// Message scheduling
private setupMessageScheduling(): void {
this.socketService.onEvent("schedule_message", async (socket, data) => {
const { roomId, message, scheduledAt } = data;
try {
const scheduledMessage = await this.messageService.scheduleMessage({
roomId,
userId: socket.userId,
message,
scheduledAt: new Date(scheduledAt),
status: "pending",
});
socket.emit("message_scheduled", {
scheduledMessageId: scheduledMessage.id,
scheduledAt: scheduledMessage.scheduledAt,
});
// Schedule the actual sending
this.scheduleMessageDelivery(scheduledMessage.id, scheduledAt);
} catch (error) {
socket.emit("error", { message: "Failed to schedule message" });
}
});
}
// Chat bot integration
private setupChatBots(): void {
// Listen for bot commands
this.socketService.onMessageReceived(async (messageData) => {
if (messageData.message.startsWith("/")) {
await this.handleBotCommand(messageData);
}
// Check for @bot mentions
if (messageData.message.includes("@bot")) {
await this.handleBotMention(messageData);
}
});
}
private async handleBotCommand(messageData: ChatMessage): Promise<void> {
const command = messageData.message.substring(1).toLowerCase();
const args = command.split(" ").slice(1);
let botResponse: string;
switch (command.split(" ")[0]) {
case "weather":
botResponse = await this.getBotWeatherResponse(
args[0] || "current location"
);
break;
case "translate":
const text = args.join(" ");
botResponse = await this.getBotTranslationResponse(text);
break;
case "remind":
const reminder = args.join(" ");
botResponse = await this.setBotReminder(messageData.userId, reminder);
break;
case "poll":
const question = args.join(" ");
botResponse = await this.createBotPoll(messageData.roomId, question);
break;
default:
botResponse = "Unknown command. Type /help for available commands.";
}
// Send bot response
const botMessage = {
roomId: messageData.roomId,
userId: "bot",
userEmail: "bot@system",
message: botResponse,
messageType: "bot",
metadata: {
isBot: true,
replyTo: messageData.id,
},
};
const savedBotMessage = await this.messageService.saveMessage(botMessage);
this.socketService.broadcastToRoom(
messageData.roomId,
"new_message",
savedBotMessage
);
}
// Message analytics and insights
async generateChatAnalytics(
roomId: string,
timeframe: string
): Promise<ChatAnalytics> {
const analytics = await this.messageService.getChatAnalytics(
roomId,
timeframe
);
return {
messageCount: analytics.totalMessages,
activeUsers: analytics.activeUsers.length,
topUsers: analytics.topUsers,
peakHours: analytics.peakHours,
messageTypes: analytics.messageTypeBreakdown,
averageResponseTime: analytics.averageResponseTime,
sentimentAnalysis: analytics.sentimentAnalysis,
topKeywords: analytics.topKeywords,
engagementMetrics: {
reactionsCount: analytics.totalReactions,
threadsStarted: analytics.threadsStarted,
filesShared: analytics.filesShared,
},
};
}
// Moderation tools
async moderateMessage(
messageId: string,
action: ModerationAction,
moderatorId: string
): Promise<void> {
const message = await this.messageService.getMessage(messageId);
if (!message) throw new Error("Message not found");
switch (action.type) {
case "delete":
await this.messageService.deleteMessage(messageId, moderatorId);
this.socketService.broadcastToRoom(message.roomId, "message_deleted", {
messageId,
deletedBy: moderatorId,
reason: action.reason,
});
break;
case "flag":
await this.messageService.flagMessage(
messageId,
action.reason,
moderatorId
);
break;
case "warn_user":
await this.userService.warnUser(
message.userId,
action.reason,
moderatorId
);
this.socketService.broadcastToUser(message.userId, "user_warned", {
reason: action.reason,
warnedBy: moderatorId,
});
break;
case "timeout_user":
await this.userService.timeoutUser(
message.userId,
action.duration,
moderatorId
);
this.socketService.broadcastToUser(message.userId, "user_timeout", {
duration: action.duration,
reason: action.reason,
});
break;
}
}
}
// Analytics interfaces
interface ChatAnalytics {
messageCount: number;
activeUsers: number;
topUsers: Array<{ userId: string; messageCount: number }>;
peakHours: Array<{ hour: number; messageCount: number }>;
messageTypes: Record<string, number>;
averageResponseTime: number;
sentimentAnalysis: {
positive: number;
neutral: number;
negative: number;
};
topKeywords: Array<{ keyword: string; count: number }>;
engagementMetrics: {
reactionsCount: number;
threadsStarted: number;
filesShared: number;
};
}
interface ModerationAction {
type: "delete" | "flag" | "warn_user" | "timeout_user";
reason: string;
duration?: number; // for timeouts
}
Live Notifications System: Keeping Users Engaged
Real-time Notification Architecture
// Comprehensive notification system
class LiveNotificationSystem {
private io: Server;
private redis: Redis;
private database: Database;
private emailService: EmailService;
private pushService: PushNotificationService;
constructor() {
this.setupNotificationChannels();
this.setupNotificationProcessing();
this.setupNotificationDelivery();
}
// Multi-channel notification delivery
async sendNotification(notification: NotificationData): Promise<void> {
try {
// Determine delivery channels based on user preferences
const user = await this.getUserNotificationPreferences(
notification.recipientId
);
const deliveryChannels = this.determineDeliveryChannels(
notification,
user
);
// Process notification through all applicable channels
const deliveryPromises = deliveryChannels.map((channel) =>
this.deliverThroughChannel(notification, channel, user)
);
await Promise.allSettled(deliveryPromises);
// Track notification delivery
await this.trackNotificationDelivery(notification, deliveryChannels);
} catch (error) {
console.error("Failed to send notification:", error);
await this.handleNotificationError(notification, error);
}
}
private determineDeliveryChannels(
notification: NotificationData,
user: UserNotificationSettings
): NotificationChannel[] {
const channels: NotificationChannel[] = [];
// Real-time (WebSocket) - always try first if user is online
if (this.isUserOnline(notification.recipientId)) {
channels.push("realtime");
}
// Push notification based on user settings and notification priority
if (this.shouldSendPushNotification(notification, user)) {
channels.push("push");
}
// Email based on user settings and notification type
if (this.shouldSendEmail(notification, user)) {
channels.push("email");
}
// SMS for critical notifications
if (notification.priority === "critical" && user.smsNotifications) {
channels.push("sms");
}
return channels;
}
private async deliverThroughChannel(
notification: NotificationData,
channel: NotificationChannel,
user: UserNotificationSettings
): Promise<void> {
switch (channel) {
case "realtime":
await this.sendRealtimeNotification(notification);
break;
case "push":
await this.sendPushNotification(notification, user);
break;
case "email":
await this.sendEmailNotification(notification, user);
break;
case "sms":
await this.sendSMSNotification(notification, user);
break;
}
}
// Real-time notification delivery
private async sendRealtimeNotification(
notification: NotificationData
): Promise<void> {
const userConnection = await this.getUserConnection(
notification.recipientId
);
if (userConnection) {
// Send notification to user's socket
this.io.to(userConnection.socketId).emit("notification", {
id: notification.id,
type: notification.type,
title: notification.title,
message: notification.message,
data: notification.data,
priority: notification.priority,
timestamp: notification.timestamp,
actions: notification.actions || [],
});
// Update notification status
await this.markNotificationDelivered(notification.id, "realtime");
}
}
// Advanced notification features
async createNotificationTemplate(
template: NotificationTemplate
): Promise<string> {
const templateId = this.generateTemplateId();
// Validate template
this.validateNotificationTemplate(template);
// Store template
await this.database.notificationTemplates.create({
id: templateId,
name: template.name,
type: template.type,
channels: template.channels,
template: {
title: template.titleTemplate,
message: template.messageTemplate,
emailSubject: template.emailSubjectTemplate,
emailBody: template.emailBodyTemplate,
pushTitle: template.pushTitleTemplate,
pushBody: template.pushBodyTemplate,
},
variables: template.variables,
conditions: template.conditions,
createdAt: Date.now(),
updatedAt: Date.now(),
});
return templateId;
}
async sendTemplatedNotification(
templateId: string,
recipientId: string,
variables: Record<string, any>,
options: NotificationOptions = {}
): Promise<void> {
const template = await this.getNotificationTemplate(templateId);
if (!template) {
throw new Error("Notification template not found");
}
// Check conditions if specified
if (
template.conditions &&
!this.evaluateConditions(template.conditions, variables)
) {
console.log(`Notification conditions not met for template ${templateId}`);
return;
}
// Render template with variables
const rendered = this.renderTemplate(template, variables);
const notification: NotificationData = {
id: this.generateNotificationId(),
recipientId,
type: template.type,
title: rendered.title,
message: rendered.message,
data: options.data || {},
priority: options.priority || "normal",
timestamp: Date.now(),
templateId,
channels: options.channels || template.channels,
actions: options.actions || [],
};
await this.sendNotification(notification);
}
// Notification grouping and batching
async createNotificationGroup(
groupConfig: NotificationGroupConfig
): Promise<void> {
const groupId = this.generateGroupId();
// Store group configuration
await this.redis.setex(
`notification_group:${groupId}`,
groupConfig.batchWindow,
JSON.stringify({
...groupConfig,
notifications: [],
createdAt: Date.now(),
})
);
// Schedule batch processing
setTimeout(() => {
this.processBatchedNotifications(groupId);
}, groupConfig.batchWindow * 1000);
}
private async processBatchedNotifications(groupId: string): Promise<void> {
const groupData = await this.redis.get(`notification_group:${groupId}`);
if (!groupData) return;
const group = JSON.parse(groupData);
if (group.notifications.length === 0) return;
// Create batched notification
const batchedNotification: NotificationData = {
id: this.generateNotificationId(),
recipientId: group.recipientId,
type: "batched",
title: this.generateBatchedTitle(group),
message: this.generateBatchedMessage(group),
data: {
notifications: group.notifications,
groupType: group.type,
},
priority: this.calculateBatchPriority(group.notifications),
timestamp: Date.now(),
};
await this.sendNotification(batchedNotification);
// Cleanup group
await this.redis.del(`notification_group:${groupId}`);
}
// Smart notification timing
async scheduleNotification(
notification: NotificationData,
deliveryTime: Date
): Promise<string> {
const scheduleId = this.generateScheduleId();
// Store scheduled notification
await this.database.scheduledNotifications.create({
id: scheduleId,
notification,
deliveryTime,
status: "pending",
createdAt: Date.now(),
});
// Calculate delay
const delay = deliveryTime.getTime() - Date.now();
if (delay > 0) {
// Use job queue for reliability
await this.notificationQueue.add(
"send_scheduled_notification",
{ scheduleId },
{ delay }
);
} else {
// Send immediately if time has passed
await this.sendNotification(notification);
}
return scheduleId;
}
// Intelligent notification frequency management
async manageNotificationFrequency(userId: string): Promise<void> {
const recentNotifications = await this.getRecentNotifications(userId, 24); // Last 24 hours
if (recentNotifications.length > 20) {
// Too many notifications
// Implement intelligent throttling
await this.enableNotificationThrottling(userId, {
throttleLevel: "high",
batchSimilar: true,
suppressLowPriority: true,
digestMode: true,
});
} else if (recentNotifications.length < 3) {
// Too few notifications
// User might be missing important updates
await this.suggestNotificationSettingsOptimization(userId);
}
}
// A/B testing for notifications
async createNotificationExperiment(
experiment: NotificationExperiment
): Promise<string> {
const experimentId = this.generateExperimentId();
await this.database.notificationExperiments.create({
id: experimentId,
name: experiment.name,
variants: experiment.variants,
targetSegment: experiment.targetSegment,
metrics: experiment.metrics,
startDate: experiment.startDate,
endDate: experiment.endDate,
status: "active",
});
return experimentId;
}
async sendExperimentNotification(
experimentId: string,
baseNotification: NotificationData
): Promise<void> {
const experiment = await this.getNotificationExperiment(experimentId);
if (!experiment || experiment.status !== "active") return;
// Determine which variant to use for this user
const variant = this.selectExperimentVariant(
experiment,
baseNotification.recipientId
);
// Apply variant modifications
const modifiedNotification = this.applyVariantModifications(
baseNotification,
variant
);
// Track experiment participation
await this.trackExperimentParticipation(
experimentId,
variant.id,
baseNotification.recipientId
);
// Send notification
await this.sendNotification(modifiedNotification);
}
// Notification analytics and insights
async generateNotificationAnalytics(
timeframe: string
): Promise<NotificationAnalytics> {
const analytics = await this.database.query(`
SELECT
n.type,
n.priority,
n.channel,
COUNT(*) as sent_count,
SUM(CASE WHEN nd.delivered_at IS NOT NULL THEN 1 ELSE 0 END) as delivered_count,
SUM(CASE WHEN na.action_taken_at IS NOT NULL THEN 1 ELSE 0 END) as interaction_count,
AVG(EXTRACT(EPOCH FROM (nd.delivered_at - n.created_at))) as avg_delivery_time
FROM notifications n
LEFT JOIN notification_deliveries nd ON n.id = nd.notification_id
LEFT JOIN notification_actions na ON n.id = na.notification_id
WHERE n.created_at >= NOW() - INTERVAL '${timeframe}'
GROUP BY n.type, n.priority, n.channel
`);
return {
totalSent: analytics.reduce((sum, row) => sum + row.sent_count, 0),
totalDelivered: analytics.reduce(
(sum, row) => sum + row.delivered_count,
0
),
totalInteractions: analytics.reduce(
(sum, row) => sum + row.interaction_count,
0
),
deliveryRate:
analytics.reduce((sum, row) => sum + row.delivered_count, 0) /
analytics.reduce((sum, row) => sum + row.sent_count, 0),
interactionRate:
analytics.reduce((sum, row) => sum + row.interaction_count, 0) /
analytics.reduce((sum, row) => sum + row.delivered_count, 0),
avgDeliveryTime:
analytics.reduce((sum, row) => sum + (row.avg_delivery_time || 0), 0) /
analytics.length,
breakdownByType: analytics.reduce((acc, row) => {
if (!acc[row.type])
acc[row.type] = { sent: 0, delivered: 0, interactions: 0 };
acc[row.type].sent += row.sent_count;
acc[row.type].delivered += row.delivered_count;
acc[row.type].interactions += row.interaction_count;
return acc;
}, {}),
breakdownByPriority: analytics.reduce((acc, row) => {
if (!acc[row.priority])
acc[row.priority] = { sent: 0, delivered: 0, interactions: 0 };
acc[row.priority].sent += row.sent_count;
acc[row.priority].delivered += row.delivered_count;
acc[row.priority].interactions += row.interaction_count;
return acc;
}, {}),
breakdownByChannel: analytics.reduce((acc, row) => {
if (!acc[row.channel])
acc[row.channel] = { sent: 0, delivered: 0, interactions: 0 };
acc[row.channel].sent += row.sent_count;
acc[row.channel].delivered += row.delivered_count;
acc[row.channel].interactions += row.interaction_count;
return acc;
}, {}),
};
}
// Smart notification preferences learning
async learnUserPreferences(userId: string): Promise<void> {
const userBehavior = await this.analyzeUserNotificationBehavior(userId);
const recommendedSettings: Partial<UserNotificationSettings> = {};
// Analyze interaction patterns
if (userBehavior.emailInteractionRate < 0.1) {
recommendedSettings.emailNotifications = false;
}
if (userBehavior.pushInteractionRate > 0.3) {
recommendedSettings.pushNotifications = true;
}
// Analyze timing preferences
const preferredHours = userBehavior.mostActiveHours;
if (preferredHours.length > 0) {
recommendedSettings.quietHours = {
start: Math.max(...preferredHours) + 1,
end: Math.min(...preferredHours) - 1,
};
}
// Analyze content preferences
const preferredTypes = userBehavior.mostEngagingTypes;
recommendedSettings.notificationTypes = preferredTypes.reduce(
(acc, type) => {
acc[type] = true;
return acc;
},
{}
);
// Store recommendations
await this.storeNotificationRecommendations(userId, recommendedSettings);
}
}
// Notification interfaces
interface NotificationData {
id: string;
recipientId: string;
type: string;
title: string;
message: string;
data: Record<string, any>;
priority: "low" | "normal" | "high" | "critical";
timestamp: number;
templateId?: string;
channels?: NotificationChannel[];
actions?: NotificationAction[];
}
interface NotificationTemplate {
name: string;
type: string;
channels: NotificationChannel[];
titleTemplate: string;
messageTemplate: string;
emailSubjectTemplate?: string;
emailBodyTemplate?: string;
pushTitleTemplate?: string;
pushBodyTemplate?: string;
variables: string[];
conditions?: Record<string, any>;
}
interface NotificationExperiment {
name: string;
variants: NotificationVariant[];
targetSegment: string;
metrics: string[];
startDate: Date;
endDate: Date;
}
interface NotificationAnalytics {
totalSent: number;
totalDelivered: number;
totalInteractions: number;
deliveryRate: number;
interactionRate: number;
avgDeliveryTime: number;
breakdownByType: Record<string, any>;
breakdownByPriority: Record<string, any>;
breakdownByChannel: Record<string, any>;
}
type NotificationChannel = "realtime" | "push" | "email" | "sms";
Connection Management and Scaling: The Infrastructure Reality
Horizontal Scaling for WebSocket Systems
// Production-ready WebSocket scaling architecture
class WebSocketScalingManager {
private loadBalancer: LoadBalancer;
private serverNodes: Map<string, ServerNode> = new Map();
private redis: Redis;
private connectionRouter: ConnectionRouter;
private healthMonitor: HealthMonitor;
constructor() {
this.loadBalancer = new LoadBalancer();
this.redis = new Redis(process.env.REDIS_CLUSTER_URL);
this.connectionRouter = new ConnectionRouter(this.redis);
this.healthMonitor = new HealthMonitor();
this.setupLoadBalancing();
this.setupConnectionRouting();
this.setupAutoScaling();
this.startHealthMonitoring();
}
// Sticky session load balancing for WebSocket connections
private setupLoadBalancing(): void {
this.loadBalancer.configure({
algorithm: 'sticky_session', // Ensure user stays on same server
healthCheck: {
interval: 10000, // 10 seconds
timeout: 5000, // 5 seconds
retries: 3
},
failover: {
enabled: true,
redistributeConnections: true
}
});
}
// Cross-server message routing
private setupConnectionRouting(): void {
// Subscribe to cross-server communication channel
this.redis.subscribe('websocket:broadcast', (message) => {
const data = JSON.parse(message);
this.routeMessageToLocalConnections(data);
});
this.redis.subscribe('websocket:user_message', (message) => {
const data = JSON.parse(message);
this.routeMessageToSpecificUser(data);
});
this.redis.subscribe('websocket:room_message', (message) => {
const data = JSON.parse(message);
this.routeMessageToRoom(data);
});
}
// Auto-scaling based on connection metrics
private setupAutoScaling(): void {
setInterval(async () => {
const metrics = await this.getClusterMetrics();
if (this.shouldScaleUp(metrics)) {
await this.scaleUp();
} else if (this.shouldScaleDown(metrics)) {
await this.scaleDown();
}
}, 30000); // Check every 30 seconds
}
private shouldScaleUp(metrics: ClusterMetrics): boolean {
return (
metrics.avgConnectionsPerServer > 8000 || // High connection density
metrics.avgCpuUsage > 70 || // High CPU usage
metrics.avgMemoryUsage > 80 || // High memory usage
metrics.messageQueueBacklog > 1000 // Message backlog
);
}
private shouldScaleDown(metrics: ClusterMetrics): boolean {
return (
metrics.avgConnectionsPerServer < 2000 && // Low connection density
metrics.avgCpuUsage < 30 && // Low CPU usage
metrics.avgMemoryUsage < 40 && // Low memory usage
metrics.activeServerCount > 2 // Keep minimum 2 servers
);
}
async scaleUp(): Promise<void> {
console.log('Scaling up WebSocket cluster...');
try {
// Launch new server instance
const newServer = await this.launchServerInstance();
// Register with load balancer
await this.loadBalancer.registerServer(newServer);
// Add to server nodes
this.serverNodes.set(newServer.id, newServer);
// Update Redis cluster information
await this.redis.sadd('websocket:servers', newServer.id);
await this.redis.hset('websocket:server_info', newServer.id, JSON.stringify({
address: newServer.address,
port: newServer.port,
capacity: newServer.capacity,
startedAt: Date.now()
}));
console.log(`New WebSocket server launched: ${newServer.id}`);
} catch (error) {
console.error('Failed to scale up:', error);
}
}
async scaleDown(): Promise<void> {
console.log('Scaling down WebSocket cluster...');
try {
// Find server with least connections
const serverToRemove = await this.findServerWithLeastConnections();
if (!serverToRemove) return;
// Gracefully drain connections
await this.drainServerConnections(serverToRemove.id);
// Remove from load balancer
await this.loadBalancer.deregisterServer(serverToRemove);
// Terminate server instance
await this.terminateServerInstance(serverToRemove.id);
// Cleanup
this.serverNodes.delete(serverToRemove.id);
await this.redis.srem('websocket:servers', serverToRemove.id);
await this.redis.hdel('websocket:server_info', serverToRemove.id);
console.log(`WebSocket server terminated: ${serverToRemove.id}`);
} catch (error) {
console.error('Failed to scale down:', error);
}
}
// Connection migration for server maintenance
async migrateConnections(fromServerId: string, toServerId: string): Promise<void> {
console.log(`Migrating connections from ${fromServerId} to ${toServerId}`);
const connections = await this.getServerConnections(fromServerId);
for (const connection of connections) {
try {
// Store connection state in Redis
await this.redis.hset(`connection:${connection.userId}`, {
state: JSON.stringify(connection.state),
rooms: JSON.stringify(Array.from(connection.rooms)),
lastActivity: connection.lastActivity
});
// Send migration message to client
await this.sendMigrationMessage(connection, toServerId);
// Update connection routing
await this.redis.hset('user:server_mapping', connection.userId, toServerId);
} catch (error) {
console.error(`Failed to migrate connection ${connection.userId}:`, error);
}
}
}
private async sendMigrationMessage(connection: Connection, newServerId: string): Promise<void> {
const serverInfo = this.serverNodes.get(newServerId);
if (!serverInfo) return;
const migrationData = {
type: 'server_migration',
newServerUrl: `wss://${serverInfo.address}:${serverInfo.port}`,
migrationToken: await this.generateMigrationToken(connection.userId),
timestamp: Date.now()
};
// Send through current connection
connection.socket.emit('migrate', migrationData);
}
// Geographic distribution for global applications
class GeographicLoadBalancer {
private regions: Map<string, RegionInfo> = new Map();
private geoIpService: GeoIPService;
constructor() {
this.geoIpService = new GeoIPService();
this.setupRegions();
}
private setupRegions(): void {
// Configure geographic regions
this.regions.set('us-east', {
servers: ['ws-us-east-1', 'ws-us-east-2'],
latencyWeight: 1.0,
capacityWeight: 0.8
});
this.regions.set('eu-west', {
servers: ['ws-eu-west-1', 'ws-eu-west-2'],
latencyWeight: 1.0,
capacityWeight: 0.9
});
this.regions.set('ap-southeast', {
servers: ['ws-ap-se-1', 'ws-ap-se-2'],
latencyWeight: 1.0,
capacityWeight: 0.7
});
}
async routeConnection(clientIp: string): Promise<string> {
// Get client location
const location = await this.geoIpService.getLocation(clientIp);
// Find closest region
const closestRegion = this.findClosestRegion(location);
// Get region info
const regionInfo = this.regions.get(closestRegion);
if (!regionInfo) {
throw new Error('No available region');
}
// Select best server in region
const selectedServer = await this.selectServerInRegion(regionInfo);
return selectedServer;
}
private findClosestRegion(location: GeoLocation): string {
let closestRegion = 'us-east';
let minDistance = Infinity;
for (const [regionId, regionInfo] of this.regions.entries()) {
const distance = this.calculateDistance(location, regionInfo.location);
if (distance < minDistance) {
minDistance = distance;
closestRegion = regionId;
}
}
return closestRegion;
}
private async selectServerInRegion(regionInfo: RegionInfo): Promise<string> {
const serverMetrics = await Promise.all(
regionInfo.servers.map(serverId => this.getServerMetrics(serverId))
);
// Calculate scores based on latency and capacity
const scores = serverMetrics.map((metrics, index) => ({
serverId: regionInfo.servers[index],
score: this.calculateServerScore(metrics, regionInfo)
}));
// Select server with highest score
const bestServer = scores.reduce((best, current) =>
current.score > best.score ? current : best
);
return bestServer.serverId;
}
private calculateServerScore(metrics: ServerMetrics, regionInfo: RegionInfo): number {
const latencyScore = (1 - metrics.averageLatency / 1000) * regionInfo.latencyWeight;
const capacityScore = (1 - metrics.connectionUtilization) * regionInfo.capacityWeight;
return latencyScore + capacityScore;
}
}
// Connection state persistence for reliability
async persistConnectionState(connectionId: string, state: ConnectionState): Promise<void> {
const stateData = {
userId: state.userId,
rooms: Array.from(state.rooms),
lastActivity: state.lastActivity,
metadata: state.metadata,
subscriptions: Array.from(state.subscriptions),
queuedMessages: state.queuedMessages
};
await this.redis.setex(
`connection_state:${connectionId}`,
3600, // 1 hour TTL
JSON.stringify(stateData)
);
}
async restoreConnectionState(connectionId: string): Promise<ConnectionState | null> {
const stateData = await this.redis.get(`connection_state:${connectionId}`);
if (!stateData) return null;
const parsed = JSON.parse(stateData);
return {
userId: parsed.userId,
rooms: new Set(parsed.rooms),
lastActivity: parsed.lastActivity,
metadata: parsed.metadata,
subscriptions: new Set(parsed.subscriptions),
queuedMessages: parsed.queuedMessages || []
};
}
// Connection monitoring and alerting
private startHealthMonitoring(): void {
this.healthMonitor.on('server_unhealthy', async (serverId: string) => {
console.log(`Server ${serverId} is unhealthy, initiating failover...`);
await this.handleServerFailure(serverId);
});
this.healthMonitor.on('high_latency_detected', async (serverId: string, latency: number) => {
console.log(`High latency detected on server ${serverId}: ${latency}ms`);
await this.investigateLatencyIssue(serverId);
});
this.healthMonitor.on('connection_spike', async (serverId: string, connectionCount: number) => {
console.log(`Connection spike on server ${serverId}: ${connectionCount} connections`);
await this.handleConnectionSpike(serverId);
});
}
private async handleServerFailure(serverId: string): Promise<void> {
// Get all connections on failed server
const connections = await this.getServerConnections(serverId);
// Find healthy servers for migration
const healthyServers = await this.getHealthyServers();
if (healthyServers.length === 0) {
console.error('No healthy servers available for failover!');
return;
}
// Distribute connections across healthy servers
const connectionsPerServer = Math.ceil(connections.length / healthyServers.length);
for (let i = 0; i < connections.length; i++) {
const targetServerIndex = Math.floor(i / connectionsPerServer);
const targetServer = healthyServers[targetServerIndex];
await this.migrateConnection(connections[i], serverId, targetServer.id);
}
// Remove failed server from rotation
await this.loadBalancer.deregisterServer({ id: serverId });
// Alert operations team
await this.alertOperationsTeam('server_failure', {
serverId,
connectionCount: connections.length,
migratedTo: healthyServers.map(s => s.id)
});
}
}
// Scaling interfaces
interface ClusterMetrics {
avgConnectionsPerServer: number;
avgCpuUsage: number;
avgMemoryUsage: number;
messageQueueBacklog: number;
activeServerCount: number;
}
interface ServerNode {
id: string;
address: string;
port: number;
capacity: number;
region?: string;
}
interface Connection {
userId: string;
socket: any;
state: ConnectionState;
rooms: Set<string>;
lastActivity: number;
}
interface ConnectionState {
userId: string;
rooms: Set<string>;
lastActivity: number;
metadata: Record<string, any>;
subscriptions: Set<string>;
queuedMessages: any[];
}
interface RegionInfo {
servers: string[];
latencyWeight: number;
capacityWeight: number;
location?: GeoLocation;
}
interface GeoLocation {
latitude: number;
longitude: number;
country: string;
city: string;
}
Key Takeaways
Real-time communication isn’t just about choosing WebSockets over HTTP polling. It’s about building robust, scalable systems that can handle thousands of concurrent connections while maintaining performance and reliability.
Essential real-time patterns:
- WebSocket lifecycle management with proper connection handling, reconnection logic, and heartbeat monitoring
- Message routing and broadcasting with room management and efficient message distribution
- Connection scaling with sticky sessions, geographic distribution, and automatic failover
- State management with persistence, migration, and recovery mechanisms
- Performance monitoring with connection metrics, latency tracking, and capacity planning
The real-time architecture decision framework:
- Use plain WebSockets when you need maximum control and minimal overhead
- Use Socket.io when you need developer productivity and automatic fallbacks
- Use Server-Sent Events for one-way real-time updates (we’ll cover this in part 2)
- Use message queues for reliable delivery and horizontal scaling
- Use geographic distribution for global applications with latency requirements
Connection management best practices:
- Always implement reconnection logic with exponential backoff
- Use heartbeat/ping mechanisms to detect connection health
- Persist connection state for seamless failover scenarios
- Monitor connection metrics to detect issues before they become problems
- Plan for scaling from day one with proper load balancing strategies
The production reality checklist:
- ✅ WebSocket connections have proper authentication and authorization
- ✅ Reconnection logic handles network failures gracefully
- ✅ Message delivery is reliable with queuing for offline users
- ✅ Connection scaling is automatic based on real metrics
- ✅ Geographic routing minimizes latency for global users
- ✅ State persistence enables seamless failover
- ✅ Monitoring alerts on connection issues before users complain
What’s Next?
In the next blog, we’ll complete our real-time communications journey by diving into Server-Sent Events, WebRTC for peer-to-peer communication, real-time collaboration features, and advanced data synchronization patterns.
We’ll also explore the operational challenges of maintaining real-time systems at scale—from debugging connection issues to optimizing for mobile networks and handling the inevitable moments when everything goes wrong.
Because building real-time features that work in your development environment is just the beginning. Making them work reliably for thousands of users across different networks, devices, and time zones—that’s where the real engineering challenges lie.