Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| """ | |
| WebSocket API for Monitoring Services | |
| This module provides WebSocket endpoints for real-time monitoring data | |
| including health checks, pool management, and scheduler status. | |
| """ | |
| import asyncio | |
| from datetime import datetime | |
| from typing import Any, Dict | |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect | |
| import logging | |
| from backend.services.ws_service_manager import ws_manager, ServiceType | |
| from monitoring.health_checker import HealthChecker | |
| from monitoring.source_pool_manager import SourcePoolManager | |
| from monitoring.scheduler import TaskScheduler | |
| from config import Config | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # ============================================================================ | |
| # Monitoring Service Handlers | |
| # ============================================================================ | |
| class MonitoringStreamers: | |
| """Handles data streaming for all monitoring services""" | |
| def __init__(self): | |
| self.config = Config() | |
| self.health_checker = HealthChecker() | |
| try: | |
| self.pool_manager = SourcePoolManager() | |
| except: | |
| self.pool_manager = None | |
| logger.warning("SourcePoolManager not available") | |
| try: | |
| self.scheduler = TaskScheduler() | |
| except: | |
| self.scheduler = None | |
| logger.warning("TaskScheduler not available") | |
| # ======================================================================== | |
| # Health Checker Streaming | |
| # ======================================================================== | |
| async def stream_health_status(self): | |
| """Stream health check status for all providers""" | |
| try: | |
| health_data = await self.health_checker.check_all_providers() | |
| if health_data: | |
| return { | |
| "overall_health": health_data.get("overall_health", "unknown"), | |
| "healthy_count": health_data.get("healthy_count", 0), | |
| "unhealthy_count": health_data.get("unhealthy_count", 0), | |
| "total_providers": health_data.get("total_providers", 0), | |
| "providers": health_data.get("providers", {}), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming health status: {e}") | |
| return None | |
| async def stream_provider_health(self): | |
| """Stream individual provider health changes""" | |
| try: | |
| health_data = await self.health_checker.check_all_providers() | |
| if health_data and "providers" in health_data: | |
| # Filter for providers with issues | |
| issues = { | |
| name: status | |
| for name, status in health_data["providers"].items() | |
| if status.get("status") != "healthy" | |
| } | |
| if issues: | |
| return { | |
| "providers_with_issues": issues, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming provider health: {e}") | |
| return None | |
| async def stream_health_alerts(self): | |
| """Stream health alerts for critical issues""" | |
| try: | |
| health_data = await self.health_checker.check_all_providers() | |
| if health_data: | |
| critical_issues = [] | |
| for name, status in health_data.get("providers", {}).items(): | |
| if status.get("status") == "critical": | |
| critical_issues.append({ | |
| "provider": name, | |
| "status": status, | |
| "alert_level": "critical" | |
| }) | |
| elif status.get("status") == "unhealthy": | |
| critical_issues.append({ | |
| "provider": name, | |
| "status": status, | |
| "alert_level": "warning" | |
| }) | |
| if critical_issues: | |
| return { | |
| "alerts": critical_issues, | |
| "total_alerts": len(critical_issues), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming health alerts: {e}") | |
| return None | |
| # ======================================================================== | |
| # Pool Manager Streaming | |
| # ======================================================================== | |
| async def stream_pool_status(self): | |
| """Stream source pool management status""" | |
| if not self.pool_manager: | |
| return None | |
| try: | |
| pool_data = self.pool_manager.get_status() | |
| if pool_data: | |
| return { | |
| "pools": pool_data.get("pools", {}), | |
| "active_sources": pool_data.get("active_sources", []), | |
| "inactive_sources": pool_data.get("inactive_sources", []), | |
| "failover_count": pool_data.get("failover_count", 0), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming pool status: {e}") | |
| return None | |
| async def stream_failover_events(self): | |
| """Stream failover events""" | |
| if not self.pool_manager: | |
| return None | |
| try: | |
| events = self.pool_manager.get_recent_failovers() | |
| if events: | |
| return { | |
| "failover_events": events, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming failover events: {e}") | |
| return None | |
| async def stream_source_health(self): | |
| """Stream individual source health in pools""" | |
| if not self.pool_manager: | |
| return None | |
| try: | |
| health_data = self.pool_manager.get_source_health() | |
| if health_data: | |
| return { | |
| "source_health": health_data, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming source health: {e}") | |
| return None | |
| # ======================================================================== | |
| # Scheduler Streaming | |
| # ======================================================================== | |
| async def stream_scheduler_status(self): | |
| """Stream scheduler status""" | |
| if not self.scheduler: | |
| return None | |
| try: | |
| status_data = self.scheduler.get_status() | |
| if status_data: | |
| return { | |
| "running": status_data.get("running", False), | |
| "total_jobs": status_data.get("total_jobs", 0), | |
| "active_jobs": status_data.get("active_jobs", 0), | |
| "jobs": status_data.get("jobs", []), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming scheduler status: {e}") | |
| return None | |
| async def stream_job_executions(self): | |
| """Stream job execution events""" | |
| if not self.scheduler: | |
| return None | |
| try: | |
| executions = self.scheduler.get_recent_executions() | |
| if executions: | |
| return { | |
| "executions": executions, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming job executions: {e}") | |
| return None | |
| async def stream_job_failures(self): | |
| """Stream job failures""" | |
| if not self.scheduler: | |
| return None | |
| try: | |
| failures = self.scheduler.get_recent_failures() | |
| if failures: | |
| return { | |
| "failures": failures, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming job failures: {e}") | |
| return None | |
| # Global instance | |
| monitoring_streamers = MonitoringStreamers() | |
| # ============================================================================ | |
| # Background Streaming Tasks | |
| # ============================================================================ | |
| async def start_monitoring_streams(): | |
| """Start all monitoring stream tasks""" | |
| logger.info("Starting monitoring WebSocket streams") | |
| tasks = [ | |
| # Health Checker | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.HEALTH_CHECKER, | |
| monitoring_streamers.stream_health_status, | |
| interval=30.0 # 30 second updates | |
| )), | |
| # Pool Manager | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.POOL_MANAGER, | |
| monitoring_streamers.stream_pool_status, | |
| interval=20.0 # 20 second updates | |
| )), | |
| # Scheduler | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.SCHEDULER, | |
| monitoring_streamers.stream_scheduler_status, | |
| interval=15.0 # 15 second updates | |
| )), | |
| ] | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| # ============================================================================ | |
| # WebSocket Endpoints | |
| # ============================================================================ | |
| async def websocket_monitoring_endpoint(websocket: WebSocket): | |
| """ | |
| Unified WebSocket endpoint for all monitoring services | |
| Connection URL: ws://host:port/ws/monitoring | |
| After connecting, send subscription messages: | |
| { | |
| "action": "subscribe", | |
| "service": "health_checker" | "pool_manager" | "scheduler" | "all" | |
| } | |
| To unsubscribe: | |
| { | |
| "action": "unsubscribe", | |
| "service": "service_name" | |
| } | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"Monitoring client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"Monitoring WebSocket error: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |
| async def websocket_health(websocket: WebSocket): | |
| """ | |
| Dedicated WebSocket endpoint for health monitoring | |
| Auto-subscribes to health_checker service | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| connection.subscribe(ServiceType.HEALTH_CHECKER) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"Health monitoring client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"Health monitoring WebSocket error: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |
| async def websocket_pool_status(websocket: WebSocket): | |
| """ | |
| Dedicated WebSocket endpoint for pool manager status | |
| Auto-subscribes to pool_manager service | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| connection.subscribe(ServiceType.POOL_MANAGER) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"Pool status client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"Pool status WebSocket error: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |
| async def websocket_scheduler_status(websocket: WebSocket): | |
| """ | |
| Dedicated WebSocket endpoint for scheduler status | |
| Auto-subscribes to scheduler service | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| connection.subscribe(ServiceType.SCHEDULER) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"Scheduler status client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"Scheduler status WebSocket error: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |