File size: 4,955 Bytes
109a48b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6222009
1750ce2
109a48b
 
 
 
 
 
 
 
 
 
6222009
1750ce2
109a48b
 
6222009
109a48b
b9c1a6b
109a48b
 
b9c1a6b
109a48b
b9c1a6b
109a48b
b9c1a6b
 
109a48b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
408337e
109a48b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
Global Progress Event Manager for Real-Time SSE Streaming

This module provides a singleton ProgressManager that captures all workflow progress
events and broadcasts them to connected SSE clients in real-time.
"""

import asyncio
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
from collections import defaultdict


class ProgressManager:
    """
    Manages progress events for active analysis sessions.
    
    Features:
    - Emit events to multiple subscribers simultaneously
    - Store event history for late-joining clients
    - Automatic cleanup of dead connections
    - Thread-safe event broadcasting
    """
    
    def __init__(self):
        self._queues: Dict[str, List[asyncio.Queue]] = defaultdict(list)
        self._history: Dict[str, List[Dict]] = defaultdict(list)
        self._lock = asyncio.Lock()
    
    def emit(self, session_id: str, event: Dict[str, Any]):
        """
        Emit a progress event to all subscribers.
        
        Args:
            session_id: Session identifier
            event: Event data (must include 'type' and 'message')
        """
        print(f"[SSE] PROGRESS_MANAGER EMIT: session={session_id}, event_type={event.get('type')}, msg={event.get('message', '')[:50]}")
        
        # Add timestamp
        event['timestamp'] = datetime.now().isoformat()
        
        # Store in history
        self._history[session_id].append(event)
        
        # Limit history size to prevent memory leaks
        if len(self._history[session_id]) > 500:
            self._history[session_id] = self._history[session_id][-500:]
        
        print(f"[SSE] History stored, total events for {session_id}: {len(self._history[session_id])}")
        
        # Send to all active subscribers
        if session_id in self._queues:
            print(f"[SSE] Found {len(self._queues[session_id])} subscribers for {session_id}")
            dead_queues = []
            for i, queue in enumerate(self._queues[session_id]):
                try:
                    queue.put_nowait(event)
                    print(f"[SSE] Successfully queued event to subscriber {i+1}")
                except asyncio.QueueFull:
                    print(f"[SSE] ERROR: Queue full for subscriber {i+1}")
                    dead_queues.append(queue)
                except Exception as e:
                    print(f"[SSE] ERROR: Exception queuing event to subscriber {i+1}: {type(e).__name__}: {e}")
                    dead_queues.append(queue)
            
            # Remove dead queues
            for dead_queue in dead_queues:
                if dead_queue in self._queues[session_id]:
                    self._queues[session_id].remove(dead_queue)
    
    async def subscribe(self, session_id: str):
        """
        Subscribe to progress events for a session.
        
        Args:
            session_id: Session identifier
            
        Yields:
            Progress events as they occur
        """
        queue = asyncio.Queue(maxsize=100)
        self._queues[session_id].append(queue)
        
        try:
            while True:
                event = await queue.get()
                print(f"[SSE] YIELDING event to client: type={event.get('type')}, msg={event.get('message', '')[:50]}")
                yield event
        except asyncio.CancelledError:
            # Client disconnected
            pass
        finally:
            # Cleanup
            if session_id in self._queues and queue in self._queues[session_id]:
                self._queues[session_id].remove(queue)
    
    def get_history(self, session_id: str) -> List[Dict]:
        """
        Get all past events for a session.
        
        Args:
            session_id: Session identifier
            
        Returns:
            List of past events
        """
        return self._history.get(session_id, [])
    
    def clear(self, session_id: str):
        """
        Clear history and disconnect all subscribers for a session.
        
        Args:
            session_id: Session identifier
        """
        if session_id in self._history:
            del self._history[session_id]
        if session_id in self._queues:
            # Close all queues
            for queue in self._queues[session_id]:
                try:
                    queue.put_nowait({'type': 'session_cleared', 'message': 'Session ended'})
                except:
                    pass
            del self._queues[session_id]
    
    def get_active_sessions(self) -> List[str]:
        """Get list of sessions with active subscribers."""
        return [sid for sid, queues in self._queues.items() if len(queues) > 0]
    
    def get_subscriber_count(self, session_id: str) -> int:
        """Get number of active subscribers for a session."""
        return len(self._queues.get(session_id, []))


# Global singleton instance
progress_manager = ProgressManager()