Real-time Communications - 2/2

The $5 Million Collaboration Disaster That Changed Everything

Picture this catastrophe: A major design agency lands their biggest client ever—rebranding a Fortune 500 company. The project requires 15 designers, 8 project managers, and 5 clients to collaborate in real-time on hundreds of design assets across multiple time zones.

They built what they thought was a sophisticated collaboration platform:

  • Real-time document editing with live cursors
  • Instant comment threads on design elements
  • Live video calls integrated with screen sharing
  • Automatic version control and conflict resolution
  • Presence indicators showing who’s online and what they’re working on

The first day of the project, everything seemed perfect. By day three, reality hit:

  • Multiple users editing the same element caused data corruption
  • Comments appeared 30 seconds after being posted
  • Live cursors were jumping around randomly, confusing everyone
  • Video calls dropped every few minutes during peak hours
  • Presence indicators showed people as online when they’d left hours ago
  • The collaboration features only worked when users were on the same server region

The client presentation was a disaster. Designs were lost, revisions were overwritten, and the real-time collaboration features made the team less productive than if they’d just emailed files back and forth.

The project was cancelled, the agency lost $5 million in revenue, and their reputation was destroyed. All because they confused “real-time features” with “real-time architecture.”

The Uncomfortable Truth About Real-time Collaboration

Here’s what separates real-time systems that enable seamless collaboration from those that create chaos: Real-time isn’t just about speed—it’s about consistency, conflict resolution, and maintaining data integrity across multiple simultaneous users.

Most developers approach real-time collaboration like this:

  1. Add WebSockets for instant updates
  2. Broadcast every change to all connected users
  3. Hope users don’t edit the same thing at once
  4. Handle conflicts by letting the last edit win
  5. Wonder why users are losing work and getting frustrated

But systems that enable true collaboration work differently:

  1. Design for concurrent editing from day one
  2. Implement operational transformation or CRDTs for conflict-free merging
  3. Build sophisticated presence and awareness systems
  4. Handle network partitions and offline scenarios gracefully
  5. Provide users with clear visibility into what’s happening

The difference isn’t just user experience—it’s the difference between tools that multiply team productivity and tools that become productivity bottlenecks disguised as innovation.

Ready to build collaboration features that work like Google Docs instead of that group editing nightmare you experienced in college? Let’s dive into the patterns that power real-time collaboration at scale.


Server-Sent Events: The Overlooked Real-time Hero

When WebSockets Are Overkill

Server-Sent Events (SSE) are perfect for scenarios where you need real-time updates from server to client, but don’t need bidirectional communication. They’re simpler, more reliable, and often more appropriate than WebSockets.

// Server-Sent Events implementation for real-time feeds
class ServerSentEventsService {
  private connections: Map<string, SSEConnection> = new Map();
  private eventStreams: Map<string, EventStream> = new Map();
  private redis: Redis;

  constructor() {
    this.redis = new Redis(process.env.REDIS_URL);
    this.setupEventProcessing();
  }

  // Create SSE endpoint
  createEventStream(req: Request, res: Response): void {
    const userId = req.user.id;
    const streamId = this.generateStreamId();

    // Set SSE headers
    res.writeHead(200, {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
      "Access-Control-Allow-Origin": "*",
      "Access-Control-Allow-Headers": "Cache-Control",
    });

    // Send initial connection event
    this.sendEvent(res, "connected", {
      streamId,
      timestamp: Date.now(),
      message: "Connected to event stream",
    });

    // Create connection info
    const connection: SSEConnection = {
      id: streamId,
      userId,
      response: res,
      lastActivity: Date.now(),
      subscriptions: new Set(),
      filters: new Map(),
    };

    this.connections.set(streamId, connection);

    // Handle client disconnect
    req.on("close", () => {
      this.handleDisconnection(streamId);
    });

    req.on("error", (error) => {
      console.error(`SSE connection error for ${streamId}:`, error);
      this.handleDisconnection(streamId);
    });

    // Send heartbeat to keep connection alive
    this.startHeartbeat(streamId);

    console.log(`SSE connection established: ${streamId} for user ${userId}`);
  }

  // Subscribe to specific event types
  subscribe(streamId: string, eventType: string, filters?: EventFilters): void {
    const connection = this.connections.get(streamId);
    if (!connection) return;

    connection.subscriptions.add(eventType);

    if (filters) {
      connection.filters.set(eventType, filters);
    }

    // Send confirmation
    this.sendEvent(connection.response, "subscribed", {
      eventType,
      filters,
      timestamp: Date.now(),
    });

    console.log(`User ${connection.userId} subscribed to ${eventType}`);
  }

  // Publish event to subscribers
  async publishEvent(
    eventType: string,
    data: any,
    targetFilters?: EventFilters
  ): Promise<void> {
    const event: StreamEvent = {
      id: this.generateEventId(),
      type: eventType,
      data,
      timestamp: Date.now(),
      targetFilters,
    };

    // Store event for replay/recovery
    await this.storeEvent(event);

    // Send to all matching connections
    for (const [streamId, connection] of this.connections.entries()) {
      if (this.shouldSendEventToConnection(event, connection)) {
        this.sendEvent(connection.response, eventType, data, event.id);
        connection.lastActivity = Date.now();
      }
    }

    // Also publish to Redis for cross-server delivery
    await this.redis.publish("sse:events", JSON.stringify(event));
  }

  private shouldSendEventToConnection(
    event: StreamEvent,
    connection: SSEConnection
  ): boolean {
    // Check if subscribed to event type
    if (!connection.subscriptions.has(event.type)) {
      return false;
    }

    // Apply filters if specified
    const filters = connection.filters.get(event.type);
    if (filters) {
      return this.matchesFilters(event.data, filters);
    }

    // Apply target filters from event
    if (event.targetFilters) {
      if (
        event.targetFilters.userId &&
        event.targetFilters.userId !== connection.userId
      ) {
        return false;
      }

      if (
        event.targetFilters.userRole &&
        !this.userHasRole(connection.userId, event.targetFilters.userRole)
      ) {
        return false;
      }
    }

    return true;
  }

  private sendEvent(
    res: Response,
    eventType: string,
    data: any,
    eventId?: string
  ): void {
    try {
      let message = `event: ${eventType}\n`;

      if (eventId) {
        message += `id: ${eventId}\n`;
      }

      message += `data: ${JSON.stringify(data)}\n\n`;

      res.write(message);
    } catch (error) {
      console.error("Failed to send SSE event:", error);
    }
  }

  // Heartbeat to keep connections alive
  private startHeartbeat(streamId: string): void {
    const heartbeatInterval = setInterval(() => {
      const connection = this.connections.get(streamId);

      if (!connection) {
        clearInterval(heartbeatInterval);
        return;
      }

      // Send heartbeat
      this.sendEvent(connection.response, "heartbeat", {
        timestamp: Date.now(),
      });

      // Check for stale connections
      const now = Date.now();
      if (now - connection.lastActivity > 5 * 60 * 1000) {
        // 5 minutes
        console.log(`Closing stale SSE connection: ${streamId}`);
        this.handleDisconnection(streamId);
        clearInterval(heartbeatInterval);
      }
    }, 30000); // Every 30 seconds
  }

  // Live activity feed implementation
  async createActivityFeed(userId: string): Promise<ActivityFeed> {
    const feedId = this.generateFeedId();

    const feed: ActivityFeed = {
      id: feedId,
      userId,
      subscriptions: {
        userActivity: true,
        projectUpdates: true,
        mentions: true,
        systemNotifications: true,
      },
      filters: {
        priority: ["high", "medium"],
        categories: ["collaboration", "updates", "alerts"],
      },
      createdAt: Date.now(),
    };

    // Store feed configuration
    await this.redis.hset(`activity_feed:${feedId}`, {
      userId,
      config: JSON.stringify(feed),
      createdAt: Date.now(),
    });

    return feed;
  }

  // Real-time dashboard updates
  async setupDashboardUpdates(
    dashboardId: string,
    metrics: string[]
  ): Promise<void> {
    // Start background job to calculate and send updates
    setInterval(async () => {
      const updates: Record<string, any> = {};

      for (const metric of metrics) {
        updates[metric] = await this.calculateMetric(metric);
      }

      await this.publishEvent(
        "dashboard_update",
        {
          dashboardId,
          metrics: updates,
          timestamp: Date.now(),
        },
        {
          dashboardId, // Only send to users viewing this dashboard
        }
      );
    }, 5000); // Update every 5 seconds
  }

  // Event replay for connection recovery
  async replayEvents(streamId: string, fromEventId?: string): Promise<void> {
    const connection = this.connections.get(streamId);
    if (!connection) return;

    const events = await this.getEventsSince(fromEventId);

    for (const event of events) {
      if (this.shouldSendEventToConnection(event, connection)) {
        this.sendEvent(connection.response, event.type, event.data, event.id);
      }
    }

    console.log(`Replayed ${events.length} events for connection ${streamId}`);
  }

  // Cleanup and connection management
  private handleDisconnection(streamId: string): void {
    const connection = this.connections.get(streamId);
    if (!connection) return;

    try {
      connection.response.end();
    } catch (error) {
      // Response already closed
    }

    this.connections.delete(streamId);

    console.log(`SSE connection closed: ${streamId}`);
  }

  // Cross-server event distribution
  private setupEventProcessing(): void {
    this.redis.subscribe("sse:events", (message) => {
      const event: StreamEvent = JSON.parse(message);

      // Process event for local connections
      for (const [streamId, connection] of this.connections.entries()) {
        if (this.shouldSendEventToConnection(event, connection)) {
          this.sendEvent(connection.response, event.type, event.data, event.id);
        }
      }
    });
  }

  // Analytics and monitoring
  getConnectionStats(): SSEStats {
    const totalConnections = this.connections.size;
    const subscriptionBreakdown: Record<string, number> = {};

    for (const connection of this.connections.values()) {
      for (const subscription of connection.subscriptions) {
        subscriptionBreakdown[subscription] =
          (subscriptionBreakdown[subscription] || 0) + 1;
      }
    }

    return {
      totalConnections,
      subscriptionBreakdown,
      averageSubscriptionsPerConnection:
        Object.values(subscriptionBreakdown).reduce(
          (sum, count) => sum + count,
          0
        ) / totalConnections,
      oldestConnection: Math.min(
        ...Array.from(this.connections.values()).map((c) => c.lastActivity)
      ),
    };
  }
}

// Client-side SSE handling with automatic reconnection
class SSEClient {
  private eventSource: EventSource | null = null;
  private url: string;
  private reconnectAttempts: number = 0;
  private maxReconnectAttempts: number = 5;
  private reconnectDelay: number = 1000;
  private listeners: Map<string, Function[]> = new Map();

  constructor(url: string) {
    this.url = url;
  }

  connect(): void {
    if (this.eventSource) {
      this.eventSource.close();
    }

    this.eventSource = new EventSource(this.url);

    this.eventSource.onopen = () => {
      console.log("SSE connection opened");
      this.reconnectAttempts = 0;
      this.emit("connected");
    };

    this.eventSource.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        this.emit("message", data);
      } catch (error) {
        console.error("Failed to parse SSE message:", error);
      }
    };

    this.eventSource.onerror = (error) => {
      console.error("SSE error:", error);

      if (this.eventSource?.readyState === EventSource.CLOSED) {
        this.attemptReconnect();
      }
    };

    // Set up specific event listeners
    this.setupEventListeners();
  }

  private setupEventListeners(): void {
    if (!this.eventSource) return;

    // Listen for specific event types
    this.eventSource.addEventListener("heartbeat", (event) => {
      // Handle heartbeat - maybe update UI to show connection status
    });

    this.eventSource.addEventListener("dashboard_update", (event) => {
      const data = JSON.parse(event.data);
      this.emit("dashboard_update", data);
    });

    this.eventSource.addEventListener("notification", (event) => {
      const data = JSON.parse(event.data);
      this.emit("notification", data);
    });

    this.eventSource.addEventListener("user_activity", (event) => {
      const data = JSON.parse(event.data);
      this.emit("user_activity", data);
    });
  }

  private attemptReconnect(): void {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error("Max reconnection attempts reached");
      this.emit("max_reconnect_attempts");
      return;
    }

    this.reconnectAttempts++;
    const delay = Math.min(
      this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1),
      30000 // Max 30 seconds
    );

    console.log(
      `Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`
    );

    setTimeout(() => {
      this.connect();
    }, delay);
  }

  on(event: string, callback: Function): void {
    if (!this.listeners.has(event)) {
      this.listeners.set(event, []);
    }
    this.listeners.get(event)!.push(callback);
  }

  private emit(event: string, data?: any): void {
    const callbacks = this.listeners.get(event) || [];
    callbacks.forEach((callback) => callback(data));
  }

  disconnect(): void {
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }
  }
}

interface SSEConnection {
  id: string;
  userId: string;
  response: Response;
  lastActivity: number;
  subscriptions: Set<string>;
  filters: Map<string, EventFilters>;
}

interface StreamEvent {
  id: string;
  type: string;
  data: any;
  timestamp: number;
  targetFilters?: EventFilters;
}

interface EventFilters {
  userId?: string;
  userRole?: string;
  dashboardId?: string;
  priority?: string[];
  categories?: string[];
}

interface ActivityFeed {
  id: string;
  userId: string;
  subscriptions: Record<string, boolean>;
  filters: Record<string, any>;
  createdAt: number;
}

interface SSEStats {
  totalConnections: number;
  subscriptionBreakdown: Record<string, number>;
  averageSubscriptionsPerConnection: number;
  oldestConnection: number;
}

WebRTC: Peer-to-Peer Real-time Communication

Building Direct Client-to-Client Connections

// WebRTC signaling server for peer-to-peer connections
class WebRTCSignalingServer {
  private io: Server;
  private activeRooms: Map<string, RTCRoom> = new Map();
  private userConnections: Map<string, UserRTCConnection> = new Map();

  constructor(io: Server) {
    this.io = io;
    this.setupSignalingHandlers();
  }

  private setupSignalingHandlers(): void {
    this.io.on('connection', (socket: Socket) => {
      console.log(`WebRTC user connected: ${socket.userId}`);

      // Store user connection
      this.userConnections.set(socket.userId, {
        userId: socket.userId,
        socketId: socket.id,
        currentRoom: null,
        peerConnections: new Map(),
        mediaStreams: new Set()
      });

      // Handle WebRTC signaling events
      this.setupRTCEventHandlers(socket);

      socket.on('disconnect', () => {
        this.handleUserDisconnection(socket.userId);
      });
    });
  }

  private setupRTCEventHandlers(socket: Socket): void {
    // Join RTC room for video/audio calls
    socket.on('join_rtc_room', async (data: JoinRTCRoomData) => {
      try {
        const { roomId, mediaTypes } = data;

        // Validate room access
        const canJoin = await this.validateRoomAccess(socket.userId, roomId);
        if (!canJoin) {
          socket.emit('rtc_error', { message: 'Access denied to RTC room' });
          return;
        }

        // Create or get room
        let room = this.activeRooms.get(roomId);
        if (!room) {
          room = this.createRTCRoom(roomId);
        }

        // Add user to room
        room.participants.set(socket.userId, {
          userId: socket.userId,
          socketId: socket.id,
          mediaTypes,
          joinedAt: Date.now(),
          isConnected: true,
          peerConnections: new Map()
        });

        // Update user connection
        const userConnection = this.userConnections.get(socket.userId);
        if (userConnection) {
          userConnection.currentRoom = roomId;
        }

        // Notify existing participants about new user
        this.broadcastToRoom(roomId, 'user_joined_rtc', {
          userId: socket.userId,
          mediaTypes
        }, [socket.userId]);

        // Send existing participants to new user
        const existingParticipants = Array.from(room.participants.entries())
          .filter(([userId]) => userId !== socket.userId)
          .map(([userId, participant]) => ({
            userId,
            mediaTypes: participant.mediaTypes
          }));

        socket.emit('rtc_room_joined', {
          roomId,
          participants: existingParticipants
        });

        console.log(`User ${socket.userId} joined RTC room ${roomId}`);
      } catch (error) {
        console.error('Error joining RTC room:', error);
        socket.emit('rtc_error', { message: 'Failed to join RTC room' });
      }
    });

    // WebRTC offer handling
    socket.on('rtc_offer', async (data: RTCOfferData) => {
      const { targetUserId, offer, mediaTypes } = data;

      try {
        // Find target user's socket
        const targetConnection = this.userConnections.get(targetUserId);
        if (!targetConnection) {
          socket.emit('rtc_error', { message: 'Target user not available' });
          return;
        }

        // Forward offer to target user
        this.io.to(targetConnection.socketId).emit('rtc_offer', {
          fromUserId: socket.userId,
          offer,
          mediaTypes
        });

        console.log(`RTC offer sent from ${socket.userId} to ${targetUserId}`);
      } catch (error) {
        console.error('Error handling RTC offer:', error);
        socket.emit('rtc_error', { message: 'Failed to send offer' });
      }
    });

    // WebRTC answer handling
    socket.on('rtc_answer', (data: RTCAnswerData) => {
      const { targetUserId, answer } = data;

      const targetConnection = this.userConnections.get(targetUserId);
      if (targetConnection) {
        this.io.to(targetConnection.socketId).emit('rtc_answer', {
          fromUserId: socket.userId,
          answer
        });

        console.log(`RTC answer sent from ${socket.userId} to ${targetUserId}`);
      }
    });

    // ICE candidate handling
    socket.on('ice_candidate', (data: ICECandidateData) => {
      const { targetUserId, candidate } = data;

      const targetConnection = this.userConnections.get(targetUserId);
      if (targetConnection) {
        this.io.to(targetConnection.socketId).emit('ice_candidate', {
          fromUserId: socket.userId,
          candidate
        });
      }
    });

    // Media stream management
    socket.on('media_stream_started', (data: MediaStreamData) => {
      const { streamId, mediaTypes, roomId } = data;

      const userConnection = this.userConnections.get(socket.userId);
      if (userConnection) {
        userConnection.mediaStreams.add(streamId);

        // Notify room about new stream
        if (roomId) {
          this.broadcastToRoom(roomId, 'media_stream_started', {
            userId: socket.userId,
            streamId,
            mediaTypes
          }, [socket.userId]);
        }
      }
    });

    socket.on('media_stream_stopped', (data: MediaStreamData) => {
      const { streamId, roomId } = data;

      const userConnection = this.userConnections.get(socket.userId);
      if (userConnection) {
        userConnection.mediaStreams.delete(streamId);

        // Notify room about stopped stream
        if (roomId) {
          this.broadcastToRoom(roomId, 'media_stream_stopped', {
            userId: socket.userId,
            streamId
          }, [socket.userId]);
        }
      }
    });

    // Screen sharing
    socket.on('screen_share_started', (data: ScreenShareData) => {
      const { streamId, roomId } = data;

      this.broadcastToRoom(roomId, 'screen_share_started', {
        userId: socket.userId,
        streamId
      }, [socket.userId]);
    });

    socket.on('screen_share_stopped', (data: ScreenShareData) => {
      const { streamId, roomId } = data;

      this.broadcastToRoom(roomId, 'screen_share_stopped', {
        userId: socket.userId,
        streamId
      }, [socket.userId]);
    });

    // Connection quality monitoring
    socket.on('connection_quality', (data: ConnectionQualityData) => {
      const { targetUserId, quality } = data;

      // Store quality metrics for monitoring
      this.storeConnectionQuality(socket.userId, targetUserId, quality);

      // If quality is poor, suggest quality adjustments
      if (quality.score < 0.5) {
        this.suggestQualityAdjustments(socket.userId, targetUserId, quality);
      }
    });
  }

  // Client-side WebRTC implementation
  class WebRTCClient {
    private localStream: MediaStream | null = null;
    private peerConnections: Map<string, RTCPeerConnection> = new Map();
    private socket: Socket;
    private roomId: string | null = null;
    private mediaConstraints: MediaStreamConstraints;

    constructor(socket: Socket) {
      this.socket = socket;
      this.mediaConstraints = {
        video: {
          width: { ideal: 1280 },
          height: { ideal: 720 },
          frameRate: { ideal: 30 }
        },
        audio: {
          echoCancellation: true,
          noiseSuppression: true,
          autoGainControl: true
        }
      };

      this.setupSignalingListeners();
    }

    private setupSignalingListeners(): void {
      this.socket.on('rtc_offer', async (data: any) => {
        await this.handleOffer(data);
      });

      this.socket.on('rtc_answer', async (data: any) => {
        await this.handleAnswer(data);
      });

      this.socket.on('ice_candidate', async (data: any) => {
        await this.handleIceCandidate(data);
      });

      this.socket.on('user_joined_rtc', (data: any) => {
        this.handleUserJoined(data);
      });

      this.socket.on('user_left_rtc', (data: any) => {
        this.handleUserLeft(data);
      });
    }

    async startLocalStream(constraints?: MediaStreamConstraints): Promise<MediaStream> {
      try {
        this.localStream = await navigator.mediaDevices.getUserMedia(
          constraints || this.mediaConstraints
        );

        // Notify server about stream
        this.socket.emit('media_stream_started', {
          streamId: this.localStream.id,
          mediaTypes: {
            video: this.localStream.getVideoTracks().length > 0,
            audio: this.localStream.getAudioTracks().length > 0
          },
          roomId: this.roomId
        });

        return this.localStream;
      } catch (error) {
        console.error('Failed to start local stream:', error);
        throw error;
      }
    }

    async startScreenShare(): Promise<MediaStream> {
      try {
        const screenStream = await navigator.mediaDevices.getDisplayMedia({
          video: {
            width: { ideal: 1920 },
            height: { ideal: 1080 },
            frameRate: { ideal: 15 }
          },
          audio: true
        });

        // Replace video track in existing peer connections
        const videoTrack = screenStream.getVideoTracks()[0];

        for (const [userId, peerConnection] of this.peerConnections) {
          const sender = peerConnection.getSenders().find(s =>
            s.track && s.track.kind === 'video'
          );

          if (sender) {
            await sender.replaceTrack(videoTrack);
          }
        }

        // Notify server about screen share
        this.socket.emit('screen_share_started', {
          streamId: screenStream.id,
          roomId: this.roomId
        });

        // Handle screen share end
        videoTrack.onended = () => {
          this.stopScreenShare();
        };

        return screenStream;
      } catch (error) {
        console.error('Failed to start screen share:', error);
        throw error;
      }
    }

    async stopScreenShare(): Promise<void> {
      try {
        // Switch back to camera
        if (this.localStream) {
          const videoTrack = this.localStream.getVideoTracks()[0];

          for (const [userId, peerConnection] of this.peerConnections) {
            const sender = peerConnection.getSenders().find(s =>
              s.track && s.track.kind === 'video'
            );

            if (sender) {
              await sender.replaceTrack(videoTrack);
            }
          }
        }

        // Notify server
        this.socket.emit('screen_share_stopped', {
          roomId: this.roomId
        });
      } catch (error) {
        console.error('Failed to stop screen share:', error);
      }
    }

    async createPeerConnection(userId: string): Promise<RTCPeerConnection> {
      const configuration: RTCConfiguration = {
        iceServers: [
          { urls: 'stun:stun.l.google.com:19302' },
          { urls: 'stun:stun1.l.google.com:19302' },
          {
            urls: 'turn:your-turn-server.com:3478',
            username: 'your-username',
            credential: 'your-password'
          }
        ],
        iceCandidatePoolSize: 10
      };

      const peerConnection = new RTCPeerConnection(configuration);

      // Add local stream tracks
      if (this.localStream) {
        this.localStream.getTracks().forEach(track => {
          peerConnection.addTrack(track, this.localStream!);
        });
      }

      // Handle remote stream
      peerConnection.ontrack = (event) => {
        const [remoteStream] = event.streams;
        this.onRemoteStream(userId, remoteStream);
      };

      // Handle ICE candidates
      peerConnection.onicecandidate = (event) => {
        if (event.candidate) {
          this.socket.emit('ice_candidate', {
            targetUserId: userId,
            candidate: event.candidate
          });
        }
      };

      // Connection quality monitoring
      this.setupConnectionQualityMonitoring(userId, peerConnection);

      this.peerConnections.set(userId, peerConnection);

      return peerConnection;
    }

    private async handleOffer(data: any): Promise<void> {
      const { fromUserId, offer, mediaTypes } = data;

      try {
        const peerConnection = await this.createPeerConnection(fromUserId);

        await peerConnection.setRemoteDescription(offer);

        const answer = await peerConnection.createAnswer();
        await peerConnection.setLocalDescription(answer);

        this.socket.emit('rtc_answer', {
          targetUserId: fromUserId,
          answer
        });
      } catch (error) {
        console.error('Failed to handle offer:', error);
      }
    }

    private async handleAnswer(data: any): Promise<void> {
      const { fromUserId, answer } = data;

      try {
        const peerConnection = this.peerConnections.get(fromUserId);
        if (peerConnection) {
          await peerConnection.setRemoteDescription(answer);
        }
      } catch (error) {
        console.error('Failed to handle answer:', error);
      }
    }

    private async handleIceCandidate(data: any): Promise<void> {
      const { fromUserId, candidate } = data;

      try {
        const peerConnection = this.peerConnections.get(fromUserId);
        if (peerConnection) {
          await peerConnection.addIceCandidate(candidate);
        }
      } catch (error) {
        console.error('Failed to handle ICE candidate:', error);
      }
    }

    private setupConnectionQualityMonitoring(userId: string, peerConnection: RTCPeerConnection): void {
      setInterval(async () => {
        try {
          const stats = await peerConnection.getStats();
          const quality = this.analyzeConnectionQuality(stats);

          this.socket.emit('connection_quality', {
            targetUserId: userId,
            quality
          });

          // Auto-adjust quality based on connection
          if (quality.score < 0.3) {
            await this.adjustStreamQuality('low');
          } else if (quality.score > 0.8) {
            await this.adjustStreamQuality('high');
          }

        } catch (error) {
          console.error('Failed to get connection stats:', error);
        }
      }, 5000); // Check every 5 seconds
    }

    private analyzeConnectionQuality(stats: RTCStatsReport): ConnectionQuality {
      let bytesReceived = 0;
      let bytesSent = 0;
      let packetsLost = 0;
      let packetsReceived = 0;
      let roundTripTime = 0;
      let jitter = 0;

      stats.forEach(stat => {
        if (stat.type === 'inbound-rtp') {
          bytesReceived += stat.bytesReceived || 0;
          packetsLost += stat.packetsLost || 0;
          packetsReceived += stat.packetsReceived || 0;
          jitter += stat.jitter || 0;
        } else if (stat.type === 'outbound-rtp') {
          bytesSent += stat.bytesSent || 0;
        } else if (stat.type === 'candidate-pair' && stat.state === 'succeeded') {
          roundTripTime = stat.currentRoundTripTime || 0;
        }
      });

      // Calculate quality score (0-1)
      const packetLossRate = packetsReceived > 0 ? packetsLost / packetsReceived : 0;
      const rttScore = Math.max(0, 1 - (roundTripTime / 0.5)); // 500ms max
      const jitterScore = Math.max(0, 1 - (jitter / 0.05)); // 50ms max
      const lossScore = Math.max(0, 1 - (packetLossRate * 10)); // 10% max

      const overallScore = (rttScore + jitterScore + lossScore) / 3;

      return {
        score: overallScore,
        roundTripTime: roundTripTime * 1000, // Convert to ms
        packetLossRate,
        jitter: jitter * 1000, // Convert to ms
        bandwidth: {
          upload: bytesSent * 8 / 1024, // kbps
          download: bytesReceived * 8 / 1024 // kbps
        }
      };
    }

    private async adjustStreamQuality(quality: 'low' | 'medium' | 'high'): Promise<void> {
      if (!this.localStream) return;

      const videoTrack = this.localStream.getVideoTracks()[0];
      if (!videoTrack) return;

      let constraints: MediaTrackConstraints;

      switch (quality) {
        case 'low':
          constraints = {
            width: { exact: 640 },
            height: { exact: 360 },
            frameRate: { exact: 15 }
          };
          break;
        case 'medium':
          constraints = {
            width: { exact: 1280 },
            height: { exact: 720 },
            frameRate: { exact: 24 }
          };
          break;
        case 'high':
          constraints = {
            width: { exact: 1920 },
            height: { exact: 1080 },
            frameRate: { exact: 30 }
          };
          break;
      }

      try {
        await videoTrack.applyConstraints(constraints);
        console.log(`Adjusted video quality to ${quality}`);
      } catch (error) {
        console.error('Failed to adjust video quality:', error);
      }
    }

    private onRemoteStream(userId: string, stream: MediaStream): void {
      // This would be handled by your UI framework
      // For example, add the stream to a video element
      console.log(`Received remote stream from user ${userId}`);
    }

    async leaveRoom(): Promise<void> {
      // Close all peer connections
      for (const [userId, peerConnection] of this.peerConnections) {
        peerConnection.close();
      }
      this.peerConnections.clear();

      // Stop local stream
      if (this.localStream) {
        this.localStream.getTracks().forEach(track => track.stop());
        this.localStream = null;
      }

      this.roomId = null;
    }
  }
}

// WebRTC interfaces
interface RTCRoom {
  id: string;
  participants: Map<string, RTCParticipant>;
  createdAt: number;
  maxParticipants: number;
}

interface RTCParticipant {
  userId: string;
  socketId: string;
  mediaTypes: MediaTypes;
  joinedAt: number;
  isConnected: boolean;
  peerConnections: Map<string, string>; // userId -> connectionId
}

interface MediaTypes {
  video: boolean;
  audio: boolean;
  screen?: boolean;
}

interface ConnectionQuality {
  score: number;
  roundTripTime: number;
  packetLossRate: number;
  jitter: number;
  bandwidth: {
    upload: number;
    download: number;
  };
}

interface UserRTCConnection {
  userId: string;
  socketId: string;
  currentRoom: string | null;
  peerConnections: Map<string, RTCPeerConnection>;
  mediaStreams: Set<string>;
}

Real-time Collaboration Features: Building Google Docs-Level Editing

Operational Transformation and Conflict Resolution

// Operational Transformation for real-time collaborative editing
class OperationalTransformService {
  private documents: Map<string, CollaborativeDocument> = new Map();
  private userCursors: Map<string, UserCursor> = new Map();
  private transformationEngine: TransformationEngine;
  private socketService: SocketIOService;

  constructor(socketService: SocketIOService) {
    this.socketService = socketService;
    this.transformationEngine = new TransformationEngine();
    this.setupCollaborationHandlers();
  }

  private setupCollaborationHandlers(): void {
    this.socketService.onEvent('join_document', async (socket, data) => {
      await this.handleJoinDocument(socket, data);
    });

    this.socketService.onEvent('document_operation', async (socket, data) => {
      await this.handleDocumentOperation(socket, data);
    });

    this.socketService.onEvent('cursor_update', (socket, data) => {
      this.handleCursorUpdate(socket, data);
    });

    this.socketService.onEvent('selection_update', (socket, data) => {
      this.handleSelectionUpdate(socket, data);
    });
  }

  private async handleJoinDocument(socket: any, data: JoinDocumentData): Promise<void> {
    const { documentId } = data;

    try {
      // Load or create document
      let document = this.documents.get(documentId);
      if (!document) {
        document = await this.loadDocument(documentId);
        this.documents.set(documentId, document);
      }

      // Add user to document
      document.activeUsers.set(socket.userId, {
        userId: socket.userId,
        userName: socket.userEmail,
        socketId: socket.id,
        joinedAt: Date.now(),
        cursor: { line: 0, character: 0 },
        selection: null,
        color: this.generateUserColor(socket.userId)
      });

      // Send document state to user
      socket.emit('document_joined', {
        documentId,
        content: document.content,
        version: document.version,
        activeUsers: Array.from(document.activeUsers.values()),
        permissions: await this.getUserDocumentPermissions(socket.userId, documentId)
      });

      // Notify other users
      socket.to(documentId).emit('user_joined_document', {
        user: document.activeUsers.get(socket.userId)
      });

      // Join socket room
      await socket.join(documentId);

      console.log(`User ${socket.userId} joined document ${documentId}`);
    } catch (error) {
      console.error('Error joining document:', error);
      socket.emit('error', { message: 'Failed to join document' });
    }
  }

  private async handleDocumentOperation(socket: any, data: DocumentOperationData): Promise<void> {
    const { documentId, operation } = data;

    try {
      const document = this.documents.get(documentId);
      if (!document) {
        socket.emit('error', { message: 'Document not found' });
        return;
      }

      // Check permissions
      if (!await this.canUserEdit(socket.userId, documentId)) {
        socket.emit('error', { message: 'Insufficient permissions' });
        return;
      }

      // Transform operation against concurrent operations
      const transformedOperation = await this.transformOperation(
        operation,
        document.pendingOperations,
        document.version
      );

      if (!transformedOperation) {
        socket.emit('error', { message: 'Operation could not be applied' });
        return;
      }

      // Apply operation to document
      const result = this.applyOperation(document, transformedOperation);

      if (result.success) {
        // Update document
        document.content = result.newContent;
        document.version++;
        document.lastModified = Date.now();
        document.lastModifiedBy = socket.userId;

        // Store operation in history
        document.operationHistory.push({
          ...transformedOperation,
          appliedAt: Date.now(),
          appliedBy: socket.userId,
          version: document.version
        });

        // Broadcast to other users
        socket.to(documentId).emit('document_operation_applied', {
          operation: transformedOperation,
          version: document.version,
          appliedBy: socket.userId
        });

        // Acknowledge to sender
        socket.emit('operation_acknowledged', {
          operationId: operation.id,
          version: document.version
        });

        // Persist to database
        await this.persistDocumentChange(documentId, transformedOperation);

      } else {
        socket.emit('operation_failed', {
          operationId: operation.id,
          reason: result.error
        });
      }
    } catch (error) {
      console.error('Error handling document operation:', error);
      socket.emit('error', { message: 'Failed to process operation' });
    }
  }

  private async transformOperation(
    operation: DocumentOperation,
    pendingOperations: DocumentOperation[],
    currentVersion: number
  ): Promise<DocumentOperation | null> {
    let transformedOperation = { ...operation };

    // Transform against all operations that happened after this operation's base version
    const laterOperations = pendingOperations.filter(op =>
      op.version > operation.baseVersion
    );

    for (const laterOp of laterOperations) {
      const result = this.transformationEngine.transform(
        transformedOperation,
        laterOp
      );

      if (!result) {
        return null; // Transformation failed
      }

      transformedOperation = result;
    }

    transformedOperation.baseVersion = currentVersion;

    return transformedOperation;
  }

  private applyOperation(
    document: CollaborativeDocument,
    operation: DocumentOperation
  ): OperationResult {
    try {
      switch (operation.type) {
        case 'insert':
          return this.applyInsertOperation(document, operation);
        case 'delete':
          return this.applyDeleteOperation(document, operation);
        case 'replace':
          return this.applyReplaceOperation(document, operation);
        case 'format':
          return this.applyFormatOperation(document, operation);
        default:
          return { success: false, error: 'Unknown operation type' };
      }
    } catch (error) {
      return { success: false, error: error.message };
    }
  }

  private applyInsertOperation(
    document: CollaborativeDocument,
    operation: DocumentOperation
  ): OperationResult {
    const { position, content } = operation;

    if (position < 0 || position > document.content.length) {
      return { success: false, error: 'Invalid position' };
    }

    const newContent =
      document.content.slice(0, position) +
      content +
      document.content.slice(position);

    return { success: true, newContent };
  }

  private applyDeleteOperation(
    document: CollaborativeDocument,
    operation: DocumentOperation
  ): OperationResult {
    const { position, length } = operation;

    if (position < 0 || position + length > document.content.length) {
      return { success: false, error: 'Invalid position or length' };
    }

    const newContent =
      document.content.slice(0, position) +
      document.content.slice(position + length);

    return { success: true, newContent };
  }

  // Advanced collaborative features
  class CollaborativeEditingEngine extends OperationalTransformService {
    private commentThreads: Map<string, CommentThread[]> = new Map();
    private suggestionMode: Map<string, boolean> = new Map();
    private versionHistory: Map<string, DocumentVersion[]> = new Map();

    setupAdvancedFeatures(): void {
      this.setupCommentSystem();
      this.setupSuggestionMode();
      this.setupVersionHistory();
      this.setupRealTimeFormatting();
    }

    private setupCommentSystem(): void {
      this.socketService.onEvent('add_comment', async (socket, data) => {
        const { documentId, position, content, threadId } = data;

        try {
          const comment: DocumentComment = {
            id: this.generateCommentId(),
            threadId: threadId || this.generateThreadId(),
            documentId,
            authorId: socket.userId,
            authorName: socket.userEmail,
            content,
            position,
            createdAt: Date.now(),
            resolved: false
          };

          // Add to thread
          let threads = this.commentThreads.get(documentId) || [];
          let thread = threads.find(t => t.id === comment.threadId);

          if (!thread) {
            thread = {
              id: comment.threadId,
              documentId,
              position,
              comments: [],
              resolved: false,
              createdAt: Date.now()
            };
            threads.push(thread);
          }

          thread.comments.push(comment);
          this.commentThreads.set(documentId, threads);

          // Broadcast to document users
          this.socketService.broadcastToRoom(documentId, 'comment_added', {
            comment,
            threadId: comment.threadId
          });

          // Persist comment
          await this.persistComment(comment);

        } catch (error) {
          socket.emit('error', { message: 'Failed to add comment' });
        }
      });

      this.socketService.onEvent('resolve_comment_thread', async (socket, data) => {
        const { documentId, threadId } = data;

        const threads = this.commentThreads.get(documentId) || [];
        const thread = threads.find(t => t.id === threadId);

        if (thread) {
          thread.resolved = true;
          thread.resolvedBy = socket.userId;
          thread.resolvedAt = Date.now();

          this.socketService.broadcastToRoom(documentId, 'comment_thread_resolved', {
            threadId,
            resolvedBy: socket.userId
          });

          await this.persistCommentResolution(threadId, socket.userId);
        }
      });
    }

    private setupSuggestionMode(): void {
      this.socketService.onEvent('enable_suggestion_mode', (socket, data) => {
        const { documentId } = data;
        this.suggestionMode.set(`${documentId}:${socket.userId}`, true);

        socket.emit('suggestion_mode_enabled', { documentId });
      });

      this.socketService.onEvent('suggest_edit', async (socket, data) => {
        const { documentId, operation, description } = data;

        const suggestion: EditSuggestion = {
          id: this.generateSuggestionId(),
          documentId,
          authorId: socket.userId,
          authorName: socket.userEmail,
          operation,
          description,
          status: 'pending',
          createdAt: Date.now()
        };

        // Store suggestion without applying
        await this.persistSuggestion(suggestion);

        // Notify document collaborators
        this.socketService.broadcastToRoom(documentId, 'edit_suggestion_created', {
          suggestion
        });
      });

      this.socketService.onEvent('accept_suggestion', async (socket, data) => {
        const { suggestionId } = data;

        const suggestion = await this.getSuggestion(suggestionId);
        if (!suggestion) return;

        // Apply the suggested operation
        await this.handleDocumentOperation(socket, {
          documentId: suggestion.documentId,
          operation: suggestion.operation
        });

        // Mark suggestion as accepted
        suggestion.status = 'accepted';
        suggestion.acceptedBy = socket.userId;
        suggestion.acceptedAt = Date.now();

        await this.persistSuggestion(suggestion);

        this.socketService.broadcastToRoom(suggestion.documentId, 'suggestion_accepted', {
          suggestionId,
          acceptedBy: socket.userId
        });
      });
    }

    private setupVersionHistory(): void {
      // Create version snapshots periodically
      setInterval(() => {
        this.createVersionSnapshots();
      }, 60000); // Every minute

      this.socketService.onEvent('get_version_history', async (socket, data) => {
        const { documentId } = data;

        const versions = this.versionHistory.get(documentId) || [];

        socket.emit('version_history', {
          documentId,
          versions: versions.slice(-20) // Last 20 versions
        });
      });

      this.socketService.onEvent('restore_version', async (socket, data) => {
        const { documentId, versionId } = data;

        try {
          const versions = this.versionHistory.get(documentId) || [];
          const version = versions.find(v => v.id === versionId);

          if (!version) {
            socket.emit('error', { message: 'Version not found' });
            return;
          }

          // Create restore operation
          const restoreOperation: DocumentOperation = {
            id: this.generateOperationId(),
            type: 'replace',
            position: 0,
            length: this.documents.get(documentId)?.content.length || 0,
            content: version.content,
            baseVersion: this.documents.get(documentId)?.version || 0,
            timestamp: Date.now()
          };

          await this.handleDocumentOperation(socket, {
            documentId,
            operation: restoreOperation
          });

        } catch (error) {
          socket.emit('error', { message: 'Failed to restore version' });
        }
      });
    }

    private setupRealTimeFormatting(): void {
      this.socketService.onEvent('apply_formatting', async (socket, data) => {
        const { documentId, range, formatting } = data;

        const formatOperation: DocumentOperation = {
          id: this.generateOperationId(),
          type: 'format',
          position: range.start,
          length: range.end - range.start,
          formatting,
          baseVersion: this.documents.get(documentId)?.version || 0,
          timestamp: Date.now()
        };

        await this.handleDocumentOperation(socket, {
          documentId,
          operation: formatOperation
        });
      });

      this.socketService.onEvent('toggle_formatting', async (socket, data) => {
        const { documentId, range, formatType } = data;

        // Get current formatting at range
        const document = this.documents.get(documentId);
        if (!document) return;

        const currentFormatting = this.getFormattingAtRange(document, range);
        const isFormatted = currentFormatting[formatType];

        const formatOperation: DocumentOperation = {
          id: this.generateOperationId(),
          type: 'format',
          position: range.start,
          length: range.end - range.start,
          formatting: { [formatType]: !isFormatted },
          baseVersion: document.version,
          timestamp: Date.now()
        };

        await this.handleDocumentOperation(socket, {
          documentId,
          operation: formatOperation
        });
      });
    }

    // Collaborative conflict resolution
    private async resolveConflicts(
      documentId: string,
      conflictingOperations: DocumentOperation[]
    ): Promise<DocumentOperation[]> {
      const resolution: DocumentOperation[] = [];

      // Sort operations by timestamp
      conflictingOperations.sort((a, b) => a.timestamp - b.timestamp);

      let transformedOps: DocumentOperation[] = [];

      for (let i = 0; i < conflictingOperations.length; i++) {
        let currentOp = conflictingOperations[i];

        // Transform against all previous operations
        for (const prevOp of transformedOps) {
          const result = this.transformationEngine.transform(currentOp, prevOp);
          if (result) {
            currentOp = result;
          }
        }

        transformedOps.push(currentOp);
        resolution.push(currentOp);
      }

      return resolution;
    }

    // Intelligent merge strategies
    private intelligentMerge(
      baseContent: string,
      userAChanges: DocumentOperation[],
      userBChanges: DocumentOperation[]
    ): MergeResult {
      // Implement three-way merge algorithm
      const conflicts: ConflictRegion[] = [];
      const mergedOperations: DocumentOperation[] = [];

      // Find conflicting regions
      for (const opA of userAChanges) {
        for (const opB of userBChanges) {
          if (this.operationsConflict(opA, opB)) {
            conflicts.push({
              position: Math.min(opA.position, opB.position),
              length: Math.max(opA.position + (opA.length || 0), opB.position + (opB.length || 0)),
              userAOperation: opA,
              userBOperation: opB,
              resolutionStrategy: this.suggestResolutionStrategy(opA, opB)
            });
          }
        }
      }

      return {
        mergedContent: this.applyMergedOperations(baseContent, mergedOperations),
        conflicts,
        autoResolved: conflicts.length === 0
      };
    }

    private createVersionSnapshots(): void {
      for (const [documentId, document] of this.documents) {
        if (document.lastModified > document.lastSnapshotAt + 60000) { // 1 minute
          const version: DocumentVersion = {
            id: this.generateVersionId(),
            documentId,
            content: document.content,
            version: document.version,
            createdAt: Date.now(),
            createdBy: document.lastModifiedBy,
            description: `Auto-saved version ${document.version}`
          };

          let versions = this.versionHistory.get(documentId) || [];
          versions.push(version);

          // Keep only last 100 versions
          if (versions.length > 100) {
            versions = versions.slice(-100);
          }

          this.versionHistory.set(documentId, versions);
          document.lastSnapshotAt = Date.now();
        }
      }
    }
  }
}

// Collaborative editing interfaces
interface CollaborativeDocument {
  id: string;
  content: string;
  version: number;
  lastModified: number;
  lastModifiedBy: string;
  lastSnapshotAt: number;
  activeUsers: Map<string, DocumentUser>;
  operationHistory: DocumentOperation[];
  pendingOperations: DocumentOperation[];
}

interface DocumentUser {
  userId: string;
  userName: string;
  socketId: string;
  joinedAt: number;
  cursor: CursorPosition;
  selection: SelectionRange | null;
  color: string;
}

interface DocumentOperation {
  id: string;
  type: 'insert' | 'delete' | 'replace' | 'format';
  position: number;
  length?: number;
  content?: string;
  formatting?: TextFormatting;
  baseVersion: number;
  timestamp: number;
}

interface CursorPosition {
  line: number;
  character: number;
}

interface SelectionRange {
  start: CursorPosition;
  end: CursorPosition;
}

interface TextFormatting {
  bold?: boolean;
  italic?: boolean;
  underline?: boolean;
  color?: string;
  backgroundColor?: string;
  fontSize?: number;
}

interface CommentThread {
  id: string;
  documentId: string;
  position: number;
  comments: DocumentComment[];
  resolved: boolean;
  createdAt: number;
  resolvedBy?: string;
  resolvedAt?: number;
}

interface DocumentComment {
  id: string;
  threadId: string;
  documentId: string;
  authorId: string;
  authorName: string;
  content: string;
  position: number;
  createdAt: number;
  resolved: boolean;
}

interface EditSuggestion {
  id: string;
  documentId: string;
  authorId: string;
  authorName: string;
  operation: DocumentOperation;
  description: string;
  status: 'pending' | 'accepted' | 'rejected';
  createdAt: number;
  acceptedBy?: string;
  acceptedAt?: number;
}

interface DocumentVersion {
  id: string;
  documentId: string;
  content: string;
  version: number;
  createdAt: number;
  createdBy: string;
  description: string;
}

Presence and User Activity Tracking: Knowing Who’s There

Building Awareness Systems

// Comprehensive presence and activity tracking system
class PresenceTrackingService {
  private userPresence: Map<string, UserPresence> = new Map();
  private activityStreams: Map<string, ActivityStream> = new Map();
  private presenceRooms: Map<string, Set<string>> = new Map();
  private redis: Redis;
  private socketService: SocketIOService;

  constructor(socketService: SocketIOService) {
    this.socketService = socketService;
    this.redis = new Redis(process.env.REDIS_URL);
    this.setupPresenceHandlers();
    this.startPresenceMonitoring();
  }

  private setupPresenceHandlers(): void {
    this.socketService.onConnection((socket) => {
      this.handleUserConnection(socket);
    });

    this.socketService.onEvent("update_presence", (socket, data) => {
      this.updateUserPresence(socket.userId, data);
    });

    this.socketService.onEvent("join_presence_room", (socket, data) => {
      this.joinPresenceRoom(socket.userId, data.roomId);
    });

    this.socketService.onEvent("leave_presence_room", (socket, data) => {
      this.leavePresenceRoom(socket.userId, data.roomId);
    });

    this.socketService.onEvent("activity_update", (socket, data) => {
      this.trackUserActivity(socket.userId, data);
    });

    this.socketService.onDisconnection((socket) => {
      this.handleUserDisconnection(socket.userId);
    });
  }

  private async handleUserConnection(socket: any): Promise<void> {
    const presence: UserPresence = {
      userId: socket.userId,
      status: "online",
      lastSeen: Date.now(),
      connectedAt: Date.now(),
      socketId: socket.id,
      device: this.detectDevice(socket.handshake),
      location: await this.getLocation(socket.handshake.address),
      activity: {
        type: "online",
        details: {},
        startedAt: Date.now(),
      },
      isIdle: false,
      customStatus: null,
    };

    this.userPresence.set(socket.userId, presence);

    // Publish presence update
    await this.publishPresenceUpdate(socket.userId, "online");

    // Start activity tracking for this user
    this.startUserActivityTracking(socket.userId);

    console.log(`User ${socket.userId} is now online`);
  }

  private async updateUserPresence(
    userId: string,
    update: PresenceUpdate
  ): Promise<void> {
    const presence = this.userPresence.get(userId);
    if (!presence) return;

    // Update presence fields
    if (update.status) {
      presence.status = update.status;
    }

    if (update.customStatus !== undefined) {
      presence.customStatus = update.customStatus;
    }

    if (update.activity) {
      presence.activity = {
        ...update.activity,
        startedAt: update.activity.startedAt || Date.now(),
      };
    }

    presence.lastSeen = Date.now();

    // Publish update to interested parties
    await this.publishPresenceUpdate(userId, presence.status, {
      customStatus: presence.customStatus,
      activity: presence.activity,
    });

    // Store in Redis for cross-server consistency
    await this.redis.setex(
      `presence:${userId}`,
      300, // 5 minutes TTL
      JSON.stringify(presence)
    );
  }

  private async joinPresenceRoom(
    userId: string,
    roomId: string
  ): Promise<void> {
    // Add user to presence room
    if (!this.presenceRooms.has(roomId)) {
      this.presenceRooms.set(roomId, new Set());
    }

    this.presenceRooms.get(roomId)!.add(userId);

    // Get current room members
    const roomMembers = Array.from(this.presenceRooms.get(roomId) || []);
    const membersPresence = await this.getBulkPresence(roomMembers);

    // Notify user of current room presence
    this.socketService.sendToUser(userId, "room_presence_update", {
      roomId,
      members: membersPresence,
    });

    // Notify room members of new user
    const userPresence = this.userPresence.get(userId);
    if (userPresence) {
      this.broadcastToPresenceRoom(
        roomId,
        "user_joined_presence",
        {
          userId,
          presence: userPresence,
        },
        [userId]
      );
    }

    console.log(`User ${userId} joined presence room ${roomId}`);
  }

  private async trackUserActivity(
    userId: string,
    activity: UserActivity
  ): Promise<void> {
    const presence = this.userPresence.get(userId);
    if (!presence) return;

    // Update current activity
    presence.activity = {
      type: activity.type,
      details: activity.details,
      startedAt: activity.startedAt || Date.now(),
    };

    presence.lastSeen = Date.now();
    presence.isIdle = false;

    // Store detailed activity in stream
    let stream = this.activityStreams.get(userId);
    if (!stream) {
      stream = {
        userId,
        activities: [],
        createdAt: Date.now(),
      };
      this.activityStreams.set(userId, stream);
    }

    stream.activities.push({
      ...activity,
      timestamp: Date.now(),
    });

    // Keep only last 100 activities
    if (stream.activities.length > 100) {
      stream.activities = stream.activities.slice(-100);
    }

    // Broadcast activity update to relevant rooms
    await this.broadcastActivityUpdate(userId, activity);

    // Store activity for analytics
    await this.storeActivityData(userId, activity);
  }

  private async broadcastActivityUpdate(
    userId: string,
    activity: UserActivity
  ): Promise<void> {
    // Find all rooms where this user's presence matters
    const relevantRooms = this.findUserPresenceRooms(userId);

    const activityUpdate = {
      userId,
      activity: {
        type: activity.type,
        details: this.sanitizeActivityDetails(activity.details),
        startedAt: activity.startedAt,
      },
      timestamp: Date.now(),
    };

    for (const roomId of relevantRooms) {
      this.broadcastToPresenceRoom(
        roomId,
        "user_activity_update",
        activityUpdate,
        [userId]
      );
    }
  }

  // Advanced activity detection
  private startUserActivityTracking(userId: string): void {
    const activityTimer = setInterval(async () => {
      const presence = this.userPresence.get(userId);
      if (!presence) {
        clearInterval(activityTimer);
        return;
      }

      const inactiveTime = Date.now() - presence.lastSeen;

      // Auto-detect idle state
      if (inactiveTime > 5 * 60 * 1000 && !presence.isIdle) {
        // 5 minutes
        presence.isIdle = true;
        presence.status = "away";

        await this.publishPresenceUpdate(userId, "away", {
          reason: "idle",
        });
      }

      // Auto-detect offline state
      if (inactiveTime > 30 * 60 * 1000) {
        // 30 minutes
        presence.status = "offline";
        await this.handleUserDisconnection(userId);
        clearInterval(activityTimer);
      }
    }, 60000); // Check every minute
  }

  // Smart activity inference
  private async inferUserActivity(
    userId: string
  ): Promise<InferredActivity | null> {
    const stream = this.activityStreams.get(userId);
    if (!stream || stream.activities.length < 3) return null;

    const recentActivities = stream.activities.slice(-10);
    const patterns = this.analyzeActivityPatterns(recentActivities);

    return this.inferActivityFromPatterns(patterns);
  }

  private analyzeActivityPatterns(
    activities: UserActivity[]
  ): ActivityPattern[] {
    const patterns: ActivityPattern[] = [];

    // Group by activity type
    const groupedByType = activities.reduce((acc, activity) => {
      if (!acc[activity.type]) acc[activity.type] = [];
      acc[activity.type].push(activity);
      return acc;
    }, {} as Record<string, UserActivity[]>);

    // Analyze each group
    for (const [type, typeActivities] of Object.entries(groupedByType)) {
      const pattern: ActivityPattern = {
        type,
        frequency: typeActivities.length,
        averageDuration: this.calculateAverageDuration(typeActivities),
        timeDistribution: this.analyzeTimeDistribution(typeActivities),
        commonDetails: this.findCommonDetails(typeActivities),
      };

      patterns.push(pattern);
    }

    return patterns;
  }

  private inferActivityFromPatterns(
    patterns: ActivityPattern[]
  ): InferredActivity | null {
    // Find dominant pattern
    const dominantPattern = patterns.reduce((prev, current) =>
      current.frequency > prev.frequency ? current : prev
    );

    // Infer what user is likely doing
    if (
      dominantPattern.type === "document_edit" &&
      dominantPattern.frequency > 5
    ) {
      return {
        type: "focused_writing",
        confidence: 0.8,
        details: {
          documentType: dominantPattern.commonDetails.documentType,
          wordCount: dominantPattern.commonDetails.totalWords,
        },
      };
    }

    if (
      dominantPattern.type === "video_call" &&
      dominantPattern.averageDuration > 600000
    ) {
      // 10 minutes
      return {
        type: "in_meeting",
        confidence: 0.9,
        details: {
          duration: dominantPattern.averageDuration,
          participants: dominantPattern.commonDetails.participants,
        },
      };
    }

    // Add more inference rules...

    return null;
  }

  // Presence analytics and insights
  async generatePresenceAnalytics(
    timeframe: string
  ): Promise<PresenceAnalytics> {
    const analytics: PresenceAnalytics = {
      totalUsers: this.userPresence.size,
      statusBreakdown: this.getStatusBreakdown(),
      averageSessionDuration: await this.calculateAverageSessionDuration(
        timeframe
      ),
      peakActiveHours: await this.calculatePeakActiveHours(timeframe),
      deviceBreakdown: this.getDeviceBreakdown(),
      locationBreakdown: this.getLocationBreakdown(),
      activityBreakdown: await this.getActivityBreakdown(timeframe),
      engagementMetrics: await this.calculateEngagementMetrics(timeframe),
    };

    return analytics;
  }

  private getStatusBreakdown(): Record<string, number> {
    const breakdown: Record<string, number> = {
      online: 0,
      away: 0,
      busy: 0,
      offline: 0,
    };

    for (const presence of this.userPresence.values()) {
      breakdown[presence.status] = (breakdown[presence.status] || 0) + 1;
    }

    return breakdown;
  }

  private getDeviceBreakdown(): Record<string, number> {
    const breakdown: Record<string, number> = {};

    for (const presence of this.userPresence.values()) {
      const deviceType = presence.device.type;
      breakdown[deviceType] = (breakdown[deviceType] || 0) + 1;
    }

    return breakdown;
  }

  // Real-time collaboration awareness
  async createCollaborationAwareness(
    documentId: string
  ): Promise<CollaborationAwareness> {
    const documentUsers = await this.getDocumentUsers(documentId);
    const awareness: CollaborationAwareness = {
      documentId,
      activeUsers: [],
      recentActivity: [],
      conflictAreas: [],
      collaborationScore: 0,
    };

    for (const userId of documentUsers) {
      const presence = this.userPresence.get(userId);
      const activity = this.activityStreams.get(userId);

      if (presence && presence.status === "online") {
        awareness.activeUsers.push({
          userId,
          presence,
          currentCursor: await this.getUserCursor(userId, documentId),
          recentEdits: await this.getRecentEdits(userId, documentId, 5),
          collaborationStyle: this.analyzeCollaborationStyle(
            userId,
            documentId
          ),
        });
      }

      if (activity) {
        const documentActivities = activity.activities
          .filter((a) => a.details.documentId === documentId)
          .slice(-10);

        awareness.recentActivity.push(...documentActivities);
      }
    }

    // Calculate collaboration score
    awareness.collaborationScore = this.calculateCollaborationScore(awareness);

    // Detect potential conflicts
    awareness.conflictAreas = await this.detectPotentialConflicts(
      documentId,
      awareness.activeUsers
    );

    return awareness;
  }

  private calculateCollaborationScore(
    awareness: CollaborationAwareness
  ): number {
    let score = 0;

    // Base score from active users
    score += Math.min(awareness.activeUsers.length * 20, 60);

    // Bonus for diverse activities
    const activityTypes = new Set(awareness.recentActivity.map((a) => a.type));
    score += activityTypes.size * 5;

    // Penalty for conflicts
    score -= awareness.conflictAreas.length * 10;

    // Bonus for complementary collaboration styles
    const styles = awareness.activeUsers.map((u) => u.collaborationStyle);
    if (styles.includes("leader") && styles.includes("supporter")) {
      score += 15;
    }

    return Math.max(0, Math.min(100, score));
  }

  // Privacy-respecting presence
  private sanitizeActivityDetails(
    details: Record<string, any>
  ): Record<string, any> {
    const sanitized = { ...details };

    // Remove sensitive information
    delete sanitized.privateNotes;
    delete sanitized.personalData;
    delete sanitized.sensitiveContent;

    // Generalize location information
    if (sanitized.location) {
      sanitized.location = this.generalizeLocation(sanitized.location);
    }

    return sanitized;
  }

  private async publishPresenceUpdate(
    userId: string,
    status: string,
    metadata?: Record<string, any>
  ): Promise<void> {
    const update = {
      userId,
      status,
      timestamp: Date.now(),
      metadata: metadata || {},
    };

    // Publish to Redis for cross-server updates
    await this.redis.publish("presence:updates", JSON.stringify(update));

    // Update local presence rooms
    const userRooms = this.findUserPresenceRooms(userId);

    for (const roomId of userRooms) {
      this.broadcastToPresenceRoom(roomId, "presence_update", update, [userId]);
    }
  }

  private broadcastToPresenceRoom(
    roomId: string,
    event: string,
    data: any,
    excludeUsers: string[] = []
  ): void {
    const roomMembers = this.presenceRooms.get(roomId);
    if (!roomMembers) return;

    for (const memberId of roomMembers) {
      if (!excludeUsers.includes(memberId)) {
        this.socketService.sendToUser(memberId, event, data);
      }
    }
  }

  async handleUserDisconnection(userId: string): Promise<void> {
    const presence = this.userPresence.get(userId);
    if (!presence) return;

    // Update status to offline
    presence.status = "offline";
    presence.lastSeen = Date.now();

    // Calculate session duration
    const sessionDuration = Date.now() - presence.connectedAt;

    // Store session data for analytics
    await this.storeSessionData(userId, {
      duration: sessionDuration,
      activities: this.activityStreams.get(userId)?.activities || [],
      disconnectedAt: Date.now(),
    });

    // Publish offline status
    await this.publishPresenceUpdate(userId, "offline");

    // Clean up
    this.userPresence.delete(userId);
    this.activityStreams.delete(userId);

    // Remove from all presence rooms
    for (const [roomId, members] of this.presenceRooms.entries()) {
      members.delete(userId);
    }

    console.log(
      `User ${userId} is now offline (session: ${sessionDuration}ms)`
    );
  }
}

// Presence interfaces
interface UserPresence {
  userId: string;
  status: "online" | "away" | "busy" | "offline";
  lastSeen: number;
  connectedAt: number;
  socketId: string;
  device: DeviceInfo;
  location: LocationInfo;
  activity: UserActivity;
  isIdle: boolean;
  customStatus: string | null;
}

interface UserActivity {
  type: string;
  details: Record<string, any>;
  startedAt: number;
}

interface ActivityStream {
  userId: string;
  activities: UserActivity[];
  createdAt: number;
}

interface DeviceInfo {
  type: "desktop" | "mobile" | "tablet";
  os: string;
  browser: string;
}

interface LocationInfo {
  country: string;
  city: string;
  timezone: string;
}

interface InferredActivity {
  type: string;
  confidence: number;
  details: Record<string, any>;
}

interface ActivityPattern {
  type: string;
  frequency: number;
  averageDuration: number;
  timeDistribution: number[];
  commonDetails: Record<string, any>;
}

interface PresenceAnalytics {
  totalUsers: number;
  statusBreakdown: Record<string, number>;
  averageSessionDuration: number;
  peakActiveHours: number[];
  deviceBreakdown: Record<string, number>;
  locationBreakdown: Record<string, number>;
  activityBreakdown: Record<string, number>;
  engagementMetrics: Record<string, number>;
}

interface CollaborationAwareness {
  documentId: string;
  activeUsers: CollaborativeUser[];
  recentActivity: UserActivity[];
  conflictAreas: ConflictArea[];
  collaborationScore: number;
}

interface CollaborativeUser {
  userId: string;
  presence: UserPresence;
  currentCursor: CursorPosition;
  recentEdits: any[];
  collaborationStyle: "leader" | "supporter" | "independent";
}

interface ConflictArea {
  position: number;
  length: number;
  users: string[];
  severity: "low" | "medium" | "high";
}

Real-time Data Synchronization: Keeping Everything in Sync

Advanced Data Synchronization Patterns

// Comprehensive data synchronization system
class DataSynchronizationService {
  private syncNodes: Map<string, SyncNode> = new Map();
  private subscriptions: Map<string, DataSubscription[]> = new Map();
  private conflictResolver: ConflictResolver;
  private changeLog: ChangeLogService;
  private redis: Redis;

  constructor() {
    this.conflictResolver = new ConflictResolver();
    this.changeLog = new ChangeLogService();
    this.redis = new Redis(process.env.REDIS_URL);
    this.setupSynchronization();
  }

  private setupSynchronization(): void {
    // Set up cross-server sync
    this.redis.subscribe('data:sync', (message) => {
      const syncEvent = JSON.parse(message);
      this.processSyncEvent(syncEvent);
    });

    // Start periodic sync health checks
    setInterval(() => {
      this.performSyncHealthCheck();
    }, 30000); // Every 30 seconds
  }

  // Subscribe to data changes with filtering
  async subscribeToData(
    nodeId: string,
    dataType: string,
    filter: DataFilter
  ): Promise<string> {
    const subscriptionId = this.generateSubscriptionId();

    const subscription: DataSubscription = {
      id: subscriptionId,
      nodeId,
      dataType,
      filter,
      lastSyncTimestamp: Date.now(),
      state: 'active',
      createdAt: Date.now()
    };

    if (!this.subscriptions.has(dataType)) {
      this.subscriptions.set(dataType, []);
    }

    this.subscriptions.get(dataType)!.push(subscription);

    // Send initial data
    const initialData = await this.getInitialData(dataType, filter);
    await this.sendSyncUpdate(nodeId, {
      type: 'initial_data',
      dataType,
      data: initialData,
      timestamp: Date.now(),
      subscriptionId
    });

    return subscriptionId;
  }

  // Real-time data update with conflict detection
  async updateData(
    dataType: string,
    entityId: string,
    changes: DataChange[],
    metadata: UpdateMetadata
  ): Promise<SyncResult> {
    const updateId = this.generateUpdateId();

    try {
      // Get current data for conflict detection
      const currentData = await this.getCurrentData(dataType, entityId);

      // Detect conflicts
      const conflicts = await this.detectConflicts(currentData, changes, metadata);

      if (conflicts.length > 0) {
        // Attempt automatic resolution
        const resolution = await this.resolveConflicts(conflicts, changes, metadata);

        if (resolution.requiresManualIntervention) {
          return {
            success: false,
            updateId,
            conflicts,
            requiresResolution: true
          };
        }

        // Use resolved changes
        changes = resolution.resolvedChanges;
      }

      // Apply changes locally
      const result = await this.applyChanges(dataType, entityId, changes, metadata);

      if (result.success) {
        // Log change for audit trail
        await this.changeLog.recordChange({
          updateId,
          dataType,
          entityId,
          changes,
          metadata,
          timestamp: Date.now(),
          conflictsResolved: conflicts.length > 0
        });

        // Broadcast to subscribers
        await this.broadcastUpdate({
          updateId,
          dataType,
          entityId,
          changes,
          metadata,
          timestamp: Date.now()
        });

        return {
          success: true,
          updateId,
          appliedChanges: changes,
          conflictsResolved: conflicts.length
        };
      } else {
        return {
          success: false,
          updateId,
          error: result.error
        };
      }
    } catch (error) {
      console.error('Data update failed:', error);
      return {
        success: false,
        updateId,
        error: error.message
      };
    }
  }

  // Optimistic updates with rollback capability
  async optimisticUpdate(
    nodeId: string,
    dataType: string,
    entityId: string,
    changes: DataChange[],
    metadata: UpdateMetadata
  ): Promise<OptimisticResult> {
    const optimisticId = this.generateOptimisticId();

    // Apply changes optimistically to local state
    const rollbackData = await this.captureRollbackData(dataType, entityId);

    try {
      // Apply changes locally immediately
      await this.applyChangesOptimistically(dataType, entityId, changes);

      // Send update to server for validation
      const serverResult = await this.validateOptimisticUpdate({
        optimisticId,
        nodeId,
        dataType,
        entityId,
        changes,
        metadata,
        timestamp: Date.now()
      });

      if (serverResult.accepted) {
        // Server accepted - commit optimistic changes
        await this.commitOptimisticChanges(optimisticId);

        return {
          optimisticId,
          accepted: true,
          finalData: serverResult.data
        };
      } else {
        // Server rejected - rollback optimistic changes
        await this.rollbackOptimisticChanges(dataType, entityId, rollbackData);

        return {
          optimisticId,
          accepted: false,
          conflicts: serverResult.conflicts,
          serverData: serverResult.data,
          rollbackApplied: true
        };
      }
    } catch (error) {
      // Error occurred - rollback
      await this.rollbackOptimisticChanges(dataType, entityId, rollbackData);

      return {
        optimisticId,
        accepted: false,
        error: error.message,
        rollbackApplied: true
      };
    }
  }

  // Multi-way merge for complex data structures
  async performMultiWayMerge(
    dataType: string,
    entityId: string,
    versions: DataVersion[]
  ): Promise<MergeResult> {
    // Find common ancestor
    const commonAncestor = this.findCommonAncestor(versions);

    if (!commonAncestor) {
      return {
        success: false,
        error: 'No common ancestor found'
      };
    }

    // Perform three-way merge for each version pair
    const mergeResults: PartialMergeResult[] = [];

    for (let i = 0; i < versions.length; i++) {
      for (let j = i + 1; j < versions.length; j++) {
        const result = await this.performThreeWayMerge(
          commonAncestor,
          versions[i],
          versions[j]
        );

        mergeResults.push(result);
      }
    }

    // Combine all merge results
    const finalMerge = await this.combineMergeResults(mergeResults);

    return finalMerge;
  }

  private async performThreeWayMerge(
    base: DataVersion,
    versionA: DataVersion,
    versionB: DataVersion
  ): Promise<PartialMergeResult> {
    const conflicts: DataConflict[] = [];
    const mergedChanges: DataChange[] = [];

    // Get changes from base to each version
    const changesA = this.calculateChanges(base.data, versionA.data);
    const changesB = this.calculateChanges(base.data, versionB.data);

    // Find conflicting changes
    for (const changeA of changesA) {
      for (const changeB of changesB) {
        if (this.changesConflict(changeA, changeB)) {
          conflicts.push({
            path: changeA.path,
            versionA: {
              value: changeA.newValue,
              timestamp: versionA.timestamp,
              author: versionA.author
            },
            versionB: {
              value: changeB.newValue,
              timestamp: versionB.timestamp,
              author: versionB.author
            },
            base: {
              value: this.getValueAtPath(base.data, changeA.path)
            }
          });
        } else {
          // Non-conflicting changes can be merged
          mergedChanges.push(changeA);
        }
      }
    }

    // Add non-conflicting changes from B
    for (const changeB of changesB) {
      if (!changesA.some(changeA => this.changesConflict(changeA, changeB))) {
        mergedChanges.push(changeB);
      }
    }

    return {
      mergedChanges,
      conflicts,
      autoResolved: conflicts.length === 0
    };
  }

  // Smart conflict resolution with ML-based suggestions
  class MLConflictResolver extends ConflictResolver {
    private model: ConflictResolutionModel;

    constructor() {
      super();
      this.model = new ConflictResolutionModel();
    }

    async suggestResolution(conflict: DataConflict): Promise<ConflictResolution> {
      // Analyze conflict context
      const context = await this.analyzeConflictContext(conflict);

      // Get ML model prediction
      const prediction = await this.model.predict({
        conflictType: conflict.type,
        dataType: conflict.dataType,
        authors: [conflict.versionA.author, conflict.versionB.author],
        values: [conflict.versionA.value, conflict.versionB.value],
        timestamps: [conflict.versionA.timestamp, conflict.versionB.timestamp],
        context
      });

      const suggestions: ResolutionSuggestion[] = [];

      // Rule-based suggestions
      if (this.isSimpleValueConflict(conflict)) {
        suggestions.push(this.createTimestampBasedSuggestion(conflict));
        suggestions.push(this.createAuthorBasedSuggestion(conflict));
      }

      if (this.isArrayConflict(conflict)) {
        suggestions.push(this.createMergeSuggestion(conflict));
        suggestions.push(this.createUnionSuggestion(conflict));
      }

      if (this.isObjectConflict(conflict)) {
        suggestions.push(this.createDeepMergeSuggestion(conflict));
      }

      // ML-based suggestion
      if (prediction.confidence > 0.7) {
        suggestions.unshift({
          type: 'ml_suggested',
          description: prediction.description,
          resolvedValue: prediction.resolvedValue,
          confidence: prediction.confidence,
          reasoning: prediction.reasoning
        });
      }

      return {
        conflict,
        suggestions: suggestions.sort((a, b) => b.confidence - a.confidence),
        autoResolvable: suggestions.some(s => s.confidence > 0.9),
        recommendedSuggestion: suggestions[0]
      };
    }

    private createTimestampBasedSuggestion(conflict: DataConflict): ResolutionSuggestion {
      const latest = conflict.versionA.timestamp > conflict.versionB.timestamp
        ? conflict.versionA
        : conflict.versionB;

      return {
        type: 'timestamp_based',
        description: 'Use the most recent value',
        resolvedValue: latest.value,
        confidence: 0.6,
        reasoning: `Selected value from ${latest.author} as it was modified more recently`
      };
    }

    private createAuthorBasedSuggestion(conflict: DataConflict): ResolutionSuggestion {
      // Could be enhanced with user role/permission analysis
      return {
        type: 'author_based',
        description: 'Prefer changes from specific author based on context',
        resolvedValue: conflict.versionA.value, // Placeholder
        confidence: 0.4,
        reasoning: 'Based on author authority and context'
      };
    }

    private createMergeSuggestion(conflict: DataConflict): ResolutionSuggestion {
      if (Array.isArray(conflict.versionA.value) && Array.isArray(conflict.versionB.value)) {
        const merged = this.intelligentArrayMerge(
          conflict.versionA.value,
          conflict.versionB.value
        );

        return {
          type: 'array_merge',
          description: 'Merge both arrays intelligently',
          resolvedValue: merged,
          confidence: 0.8,
          reasoning: 'Combined unique elements from both versions'
        };
      }

      return {
        type: 'merge',
        description: 'Merge operation not applicable',
        resolvedValue: conflict.versionA.value,
        confidence: 0.1,
        reasoning: 'Values are not mergeable'
      };
    }

    private intelligentArrayMerge(arrayA: any[], arrayB: any[]): any[] {
      // Implement intelligent array merging logic
      const result = [...arrayA];

      for (const item of arrayB) {
        if (!this.arrayContains(result, item)) {
          result.push(item);
        }
      }

      return result;
    }

    private arrayContains(array: any[], item: any): boolean {
      // Deep equality check for array items
      return array.some(existing =>
        JSON.stringify(existing) === JSON.stringify(item)
      );
    }
  }

  // Data synchronization with eventual consistency
  async establishEventualConsistency(
    dataType: string,
    partitionStrategy: PartitionStrategy
  ): Promise<void> {
    // Set up partition-aware synchronization
    for (const partition of partitionStrategy.partitions) {
      await this.setupPartitionSync(dataType, partition);
    }

    // Start consistency monitoring
    setInterval(async () => {
      const inconsistencies = await this.detectInconsistencies(dataType);

      if (inconsistencies.length > 0) {
        await this.resolveInconsistencies(dataType, inconsistencies);
      }
    }, 60000); // Check every minute
  }

  private async detectInconsistencies(dataType: string): Promise<DataInconsistency[]> {
    const inconsistencies: DataInconsistency[] = [];

    // Compare data across all nodes
    const nodes = await this.getActiveNodes(dataType);
    const data = await Promise.all(
      nodes.map(node => this.getDataFromNode(node.id, dataType))
    );

    // Find differences
    for (let i = 0; i < data.length; i++) {
      for (let j = i + 1; j < data.length; j++) {
        const diff = this.compareData(data[i], data[j]);

        if (diff.hasDifferences) {
          inconsistencies.push({
            nodeA: nodes[i].id,
            nodeB: nodes[j].id,
            dataType,
            differences: diff.differences,
            severity: this.calculateInconsistencySeverity(diff)
          });
        }
      }
    }

    return inconsistencies;
  }

  // Offline-capable synchronization
  async enableOfflineSync(
    nodeId: string,
    dataTypes: string[]
  ): Promise<OfflineSyncConfig> {
    const config: OfflineSyncConfig = {
      nodeId,
      dataTypes,
      storageQuota: 100 * 1024 * 1024, // 100MB
      syncStrategy: 'incremental',
      conflictResolution: 'server_wins',
      compressionEnabled: true,
      encryptionEnabled: true
    };

    // Set up local storage
    await this.setupOfflineStorage(nodeId, config);

    // Start offline sync monitoring
    this.startOfflineSyncMonitoring(nodeId, config);

    return config;
  }

  private async syncOfflineChanges(nodeId: string): Promise<OfflineSyncResult> {
    const offlineChanges = await this.getOfflineChanges(nodeId);
    const syncResults: OfflineChangeResult[] = [];

    for (const change of offlineChanges) {
      try {
        const result = await this.syncOfflineChange(change);
        syncResults.push(result);
      } catch (error) {
        syncResults.push({
          changeId: change.id,
          success: false,
          error: error.message
        });
      }
    }

    return {
      nodeId,
      totalChanges: offlineChanges.length,
      successful: syncResults.filter(r => r.success).length,
      failed: syncResults.filter(r => !r.success).length,
      results: syncResults
    };
  }

  // Performance monitoring for synchronization
  async monitorSyncPerformance(): Promise<SyncPerformanceMetrics> {
    const metrics: SyncPerformanceMetrics = {
      averageSyncLatency: 0,
      syncThroughput: 0,
      conflictRate: 0,
      consistency: 0,
      nodeHealth: new Map(),
      networkMetrics: {
        bandwidth: 0,
        packetLoss: 0,
        jitter: 0
      }
    };

    // Calculate metrics
    const recentSyncs = await this.getRecentSyncOperations(300); // Last 5 minutes

    if (recentSyncs.length > 0) {
      metrics.averageSyncLatency = recentSyncs.reduce((sum, sync) =>
        sum + sync.latency, 0) / recentSyncs.length;

      metrics.syncThroughput = recentSyncs.length / 5; // Operations per minute

      const conflicts = recentSyncs.filter(sync => sync.hadConflicts);
      metrics.conflictRate = conflicts.length / recentSyncs.length;
    }

    // Check node health
    for (const [nodeId, node] of this.syncNodes.entries()) {
      const health = await this.checkNodeHealth(nodeId);
      metrics.nodeHealth.set(nodeId, health);
    }

    // Calculate overall consistency
    metrics.consistency = await this.calculateConsistencyScore();

    return metrics;
  }
}

// Data synchronization interfaces
interface DataSubscription {
  id: string;
  nodeId: string;
  dataType: string;
  filter: DataFilter;
  lastSyncTimestamp: number;
  state: 'active' | 'paused' | 'error';
  createdAt: number;
}

interface DataChange {
  path: string;
  operation: 'set' | 'delete' | 'insert' | 'move';
  oldValue?: any;
  newValue?: any;
  timestamp: number;
}

interface DataConflict {
  path: string;
  type: 'value' | 'structure' | 'concurrent_edit';
  dataType: string;
  versionA: ConflictVersion;
  versionB: ConflictVersion;
  base?: ConflictVersion;
}

interface ConflictVersion {
  value: any;
  timestamp: number;
  author: string;
}

interface SyncResult {
  success: boolean;
  updateId: string;
  appliedChanges?: DataChange[];
  conflicts?: DataConflict[];
  conflictsResolved?: number;
  requiresResolution?: boolean;
  error?: string;
}

interface OptimisticResult {
  optimisticId: string;
  accepted: boolean;
  finalData?: any;
  conflicts?: DataConflict[];
  serverData?: any;
  rollbackApplied?: boolean;
  error?: string;
}

interface MergeResult {
  success: boolean;
  mergedData?: any;
  conflicts?: DataConflict[];
  autoResolved?: boolean;
  error?: string;
}

interface ResolutionSuggestion {
  type: string;
  description: string;
  resolvedValue: any;
  confidence: number;
  reasoning: string;
}

interface SyncPerformanceMetrics {
  averageSyncLatency: number;
  syncThroughput: number;
  conflictRate: number;
  consistency: number;
  nodeHealth: Map<string, NodeHealth>;
  networkMetrics: {
    bandwidth: number;
    packetLoss: number;
    jitter: number;
  };
}

interface NodeHealth {
  status: 'healthy' | 'degraded' | 'unhealthy';
  lastHeartbeat: number;
  responseTime: number;
  errorRate: number;
}

Key Takeaways

Real-time communication at scale requires more than just WebSocket connections. It demands sophisticated architecture that handles conflict resolution, maintains data consistency, and provides seamless user experiences across different network conditions.

Essential real-time collaboration patterns:

  • Server-Sent Events for efficient one-way real-time updates with automatic reconnection
  • WebRTC for direct peer-to-peer communication with quality adaptation
  • Operational Transformation for conflict-free collaborative editing
  • Presence systems that provide intelligent activity awareness
  • Data synchronization with eventual consistency and offline support

The collaboration architecture decision framework:

  • Use SSE for real-time feeds, dashboards, and notification streams
  • Use WebRTC for video calls, voice communication, and peer-to-peer file sharing
  • Use Operational Transformation for text editing and document collaboration
  • Use Presence tracking for awareness in collaborative applications
  • Use CRDT or Event Sourcing for complex state synchronization

Real-time collaboration best practices:

  • Design for concurrent editing with proper conflict resolution from day one
  • Implement intelligent presence that respects privacy while providing awareness
  • Handle network partitions gracefully with offline-first design
  • Optimize for mobile networks with adaptive quality and compression
  • Monitor collaboration health with metrics that predict problems

The production reality checklist:

  • ✅ Conflict resolution handles all edge cases without data loss
  • ✅ Presence systems provide accurate awareness without privacy violations
  • ✅ Offline synchronization works reliably with automatic conflict resolution
  • ✅ WebRTC connections adapt to network quality automatically
  • ✅ Data synchronization maintains consistency across geographic regions
  • ✅ Performance monitoring catches collaboration issues before users notice
  • ✅ Collaboration features enhance productivity rather than creating confusion

What’s Next?

You’ve now mastered the complete real-time communication stack. Next, we’ll dive into Background Processing & Jobs, where we’ll build robust task queues, scheduled job systems, and distributed processing pipelines that can handle millions of background tasks reliably.

But first, implement these real-time collaboration patterns in your applications. The difference between systems that enable seamless teamwork and systems that get in the way lies in these architectural details.

Remember: real-time collaboration isn’t about making everything instant—it’s about making everything work together harmoniously, even when the unexpected happens.