Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| """ | |
| WebSocket API for Data Collection Services | |
| This module provides WebSocket endpoints for real-time data streaming | |
| from all data collection services. | |
| """ | |
| import asyncio | |
| from datetime import datetime | |
| from typing import Any, Dict, Optional | |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect | |
| import logging | |
| from backend.services.ws_service_manager import ws_manager, ServiceType | |
| from collectors.market_data import MarketDataCollector | |
| from collectors.explorers import ExplorerDataCollector | |
| from collectors.news import NewsCollector | |
| from collectors.sentiment import SentimentCollector | |
| from collectors.whale_tracking import WhaleTrackingCollector | |
| from collectors.rpc_nodes import RPCNodeCollector | |
| from collectors.onchain import OnChainCollector | |
| from config import Config | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| # ============================================================================ | |
| # Data Collection Service Handlers | |
| # ============================================================================ | |
| class DataCollectionStreamers: | |
| """Handles data streaming for all collection services""" | |
| def __init__(self): | |
| self.config = Config() | |
| self.market_data_collector = MarketDataCollector(self.config) | |
| self.explorer_collector = ExplorerDataCollector(self.config) | |
| self.news_collector = NewsCollector(self.config) | |
| self.sentiment_collector = SentimentCollector(self.config) | |
| self.whale_collector = WhaleTrackingCollector(self.config) | |
| self.rpc_collector = RPCNodeCollector(self.config) | |
| self.onchain_collector = OnChainCollector(self.config) | |
| # ======================================================================== | |
| # Market Data Streaming | |
| # ======================================================================== | |
| async def stream_market_data(self): | |
| """Stream real-time market data""" | |
| try: | |
| data = await self.market_data_collector.collect() | |
| if data: | |
| return { | |
| "prices": data.get("prices", {}), | |
| "volumes": data.get("volumes", {}), | |
| "market_caps": data.get("market_caps", {}), | |
| "price_changes": data.get("price_changes", {}), | |
| "source": data.get("source", "unknown"), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming market data: {e}") | |
| return None | |
| async def stream_order_books(self): | |
| """Stream order book data""" | |
| try: | |
| # This would integrate with market_data_extended for order book data | |
| data = await self.market_data_collector.collect() | |
| if data and "order_book" in data: | |
| return { | |
| "bids": data["order_book"].get("bids", []), | |
| "asks": data["order_book"].get("asks", []), | |
| "spread": data["order_book"].get("spread"), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming order books: {e}") | |
| return None | |
| # ======================================================================== | |
| # Explorer Data Streaming | |
| # ======================================================================== | |
| async def stream_explorer_data(self): | |
| """Stream blockchain explorer data""" | |
| try: | |
| data = await self.explorer_collector.collect() | |
| if data: | |
| return { | |
| "latest_block": data.get("latest_block"), | |
| "network_hashrate": data.get("network_hashrate"), | |
| "difficulty": data.get("difficulty"), | |
| "mempool_size": data.get("mempool_size"), | |
| "transactions_count": data.get("transactions_count"), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming explorer data: {e}") | |
| return None | |
| async def stream_transactions(self): | |
| """Stream recent transactions""" | |
| try: | |
| data = await self.explorer_collector.collect() | |
| if data and "recent_transactions" in data: | |
| return { | |
| "transactions": data["recent_transactions"], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming transactions: {e}") | |
| return None | |
| # ======================================================================== | |
| # News Streaming | |
| # ======================================================================== | |
| async def stream_news(self): | |
| """Stream news updates""" | |
| try: | |
| data = await self.news_collector.collect() | |
| if data and "articles" in data: | |
| return { | |
| "articles": data["articles"][:10], # Latest 10 articles | |
| "sources": data.get("sources", []), | |
| "categories": data.get("categories", []), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming news: {e}") | |
| return None | |
| async def stream_breaking_news(self): | |
| """Stream breaking news alerts""" | |
| try: | |
| data = await self.news_collector.collect() | |
| if data and "breaking" in data: | |
| return { | |
| "breaking_news": data["breaking"], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming breaking news: {e}") | |
| return None | |
| # ======================================================================== | |
| # Sentiment Streaming | |
| # ======================================================================== | |
| async def stream_sentiment(self): | |
| """Stream sentiment analysis data""" | |
| try: | |
| data = await self.sentiment_collector.collect() | |
| if data: | |
| return { | |
| "overall_sentiment": data.get("overall_sentiment"), | |
| "sentiment_score": data.get("sentiment_score"), | |
| "social_volume": data.get("social_volume"), | |
| "trending_topics": data.get("trending_topics", []), | |
| "sentiment_by_source": data.get("by_source", {}), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming sentiment: {e}") | |
| return None | |
| async def stream_social_trends(self): | |
| """Stream social media trends""" | |
| try: | |
| data = await self.sentiment_collector.collect() | |
| if data and "social_trends" in data: | |
| return { | |
| "trends": data["social_trends"], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming social trends: {e}") | |
| return None | |
| # ======================================================================== | |
| # Whale Tracking Streaming | |
| # ======================================================================== | |
| async def stream_whale_activity(self): | |
| """Stream whale transaction data""" | |
| try: | |
| data = await self.whale_collector.collect() | |
| if data: | |
| return { | |
| "large_transactions": data.get("large_transactions", []), | |
| "whale_wallets": data.get("whale_wallets", []), | |
| "total_volume": data.get("total_volume"), | |
| "alert_threshold": data.get("alert_threshold"), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming whale activity: {e}") | |
| return None | |
| async def stream_whale_alerts(self): | |
| """Stream whale transaction alerts""" | |
| try: | |
| data = await self.whale_collector.collect() | |
| if data and "alerts" in data: | |
| return { | |
| "alerts": data["alerts"], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming whale alerts: {e}") | |
| return None | |
| # ======================================================================== | |
| # RPC Node Streaming | |
| # ======================================================================== | |
| async def stream_rpc_status(self): | |
| """Stream RPC node status""" | |
| try: | |
| data = await self.rpc_collector.collect() | |
| if data: | |
| return { | |
| "nodes": data.get("nodes", []), | |
| "active_nodes": data.get("active_nodes"), | |
| "total_nodes": data.get("total_nodes"), | |
| "average_latency": data.get("average_latency"), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming RPC status: {e}") | |
| return None | |
| async def stream_blockchain_events(self): | |
| """Stream blockchain events from RPC nodes""" | |
| try: | |
| data = await self.rpc_collector.collect() | |
| if data and "events" in data: | |
| return { | |
| "events": data["events"], | |
| "block_number": data.get("block_number"), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming blockchain events: {e}") | |
| return None | |
| # ======================================================================== | |
| # On-Chain Analytics Streaming | |
| # ======================================================================== | |
| async def stream_onchain_metrics(self): | |
| """Stream on-chain analytics""" | |
| try: | |
| data = await self.onchain_collector.collect() | |
| if data: | |
| return { | |
| "active_addresses": data.get("active_addresses"), | |
| "transaction_count": data.get("transaction_count"), | |
| "total_fees": data.get("total_fees"), | |
| "gas_price": data.get("gas_price"), | |
| "network_utilization": data.get("network_utilization"), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming on-chain metrics: {e}") | |
| return None | |
| async def stream_contract_events(self): | |
| """Stream smart contract events""" | |
| try: | |
| data = await self.onchain_collector.collect() | |
| if data and "contract_events" in data: | |
| return { | |
| "events": data["contract_events"], | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error streaming contract events: {e}") | |
| return None | |
| # Global instance | |
| data_streamers = DataCollectionStreamers() | |
| # ============================================================================ | |
| # Background Streaming Tasks | |
| # ============================================================================ | |
| async def start_data_collection_streams(): | |
| """Start all data collection stream tasks""" | |
| logger.info("Starting data collection WebSocket streams") | |
| tasks = [ | |
| # Market Data | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.MARKET_DATA, | |
| data_streamers.stream_market_data, | |
| interval=5.0 # 5 second updates | |
| )), | |
| # Explorer Data | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.EXPLORERS, | |
| data_streamers.stream_explorer_data, | |
| interval=10.0 # 10 second updates | |
| )), | |
| # News | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.NEWS, | |
| data_streamers.stream_news, | |
| interval=60.0 # 1 minute updates | |
| )), | |
| # Sentiment | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.SENTIMENT, | |
| data_streamers.stream_sentiment, | |
| interval=30.0 # 30 second updates | |
| )), | |
| # Whale Tracking | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.WHALE_TRACKING, | |
| data_streamers.stream_whale_activity, | |
| interval=15.0 # 15 second updates | |
| )), | |
| # RPC Nodes | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.RPC_NODES, | |
| data_streamers.stream_rpc_status, | |
| interval=20.0 # 20 second updates | |
| )), | |
| # On-Chain Analytics | |
| asyncio.create_task(ws_manager.start_service_stream( | |
| ServiceType.ONCHAIN, | |
| data_streamers.stream_onchain_metrics, | |
| interval=30.0 # 30 second updates | |
| )), | |
| ] | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| # ============================================================================ | |
| # WebSocket Endpoints | |
| # ============================================================================ | |
| async def websocket_data_endpoint(websocket: WebSocket): | |
| """ | |
| Unified WebSocket endpoint for all data collection services | |
| Connection URL: ws://host:port/ws/data | |
| After connecting, send subscription messages: | |
| { | |
| "action": "subscribe", | |
| "service": "market_data" | "explorers" | "news" | "sentiment" | | |
| "whale_tracking" | "rpc_nodes" | "onchain" | "all" | |
| } | |
| To unsubscribe: | |
| { | |
| "action": "unsubscribe", | |
| "service": "service_name" | |
| } | |
| To get status: | |
| { | |
| "action": "get_status" | |
| } | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| try: | |
| while True: | |
| # Receive and handle client messages | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"Client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"WebSocket error for client {connection.client_id}: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |
| async def websocket_market_data(websocket: WebSocket): | |
| """ | |
| Dedicated WebSocket endpoint for market data | |
| Auto-subscribes to market_data service | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| connection.subscribe(ServiceType.MARKET_DATA) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"Market data client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"Market data WebSocket error: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |
| async def websocket_whale_tracking(websocket: WebSocket): | |
| """ | |
| Dedicated WebSocket endpoint for whale tracking | |
| Auto-subscribes to whale_tracking service | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| connection.subscribe(ServiceType.WHALE_TRACKING) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"Whale tracking client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"Whale tracking WebSocket error: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |
| async def websocket_news(websocket: WebSocket): | |
| """ | |
| Dedicated WebSocket endpoint for news | |
| Auto-subscribes to news service | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| connection.subscribe(ServiceType.NEWS) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"News client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"News WebSocket error: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |
| async def websocket_sentiment(websocket: WebSocket): | |
| """ | |
| Dedicated WebSocket endpoint for sentiment analysis | |
| Auto-subscribes to sentiment service | |
| """ | |
| connection = await ws_manager.connect(websocket) | |
| connection.subscribe(ServiceType.SENTIMENT) | |
| try: | |
| while True: | |
| data = await websocket.receive_json() | |
| await ws_manager.handle_client_message(connection, data) | |
| except WebSocketDisconnect: | |
| logger.info(f"Sentiment client disconnected: {connection.client_id}") | |
| except Exception as e: | |
| logger.error(f"Sentiment WebSocket error: {e}") | |
| finally: | |
| await ws_manager.disconnect(connection.client_id) | |