Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| """ | |
| WebSocket Data Broadcaster | |
| Broadcasts real-time cryptocurrency data from database to connected clients | |
| """ | |
| import asyncio | |
| import logging | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| from database.db_manager import db_manager | |
| from backend.services.ws_service_manager import ws_manager, ServiceType | |
| from utils.logger import setup_logger | |
| logger = setup_logger("ws_data_broadcaster") | |
| class DataBroadcaster: | |
| """ | |
| Broadcasts cryptocurrency data updates to WebSocket clients | |
| """ | |
| def __init__(self): | |
| """Initialize the broadcaster""" | |
| self.last_broadcast = {} | |
| self.broadcast_interval = 5 # seconds for price updates | |
| self.is_running = False | |
| logger.info("DataBroadcaster initialized") | |
| async def start_broadcasting(self): | |
| """Start all broadcast tasks""" | |
| logger.info("Starting WebSocket data broadcaster...") | |
| self.is_running = True | |
| tasks = [ | |
| self.broadcast_market_data(), | |
| self.broadcast_news(), | |
| self.broadcast_sentiment(), | |
| self.broadcast_whales(), | |
| self.broadcast_gas_prices() | |
| ] | |
| try: | |
| await asyncio.gather(*tasks, return_exceptions=True) | |
| except Exception as e: | |
| logger.error(f"Error in broadcasting tasks: {e}", exc_info=True) | |
| finally: | |
| self.is_running = False | |
| async def stop_broadcasting(self): | |
| """Stop broadcasting""" | |
| logger.info("Stopping WebSocket data broadcaster...") | |
| self.is_running = False | |
| async def broadcast_market_data(self): | |
| """Broadcast market price updates""" | |
| logger.info("Starting market data broadcast...") | |
| while self.is_running: | |
| try: | |
| prices = db_manager.get_latest_prices(limit=50) | |
| if prices: | |
| # Format data for broadcast | |
| data = { | |
| "type": "market_data", | |
| "data": { | |
| "prices": {p.symbol: p.price_usd for p in prices}, | |
| "volumes": {p.symbol: p.volume_24h for p in prices if p.volume_24h}, | |
| "market_caps": {p.symbol: p.market_cap for p in prices if p.market_cap}, | |
| "price_changes": {p.symbol: p.price_change_24h for p in prices if p.price_change_24h} | |
| }, | |
| "count": len(prices), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Broadcast to subscribed clients | |
| await ws_manager.broadcast_to_service(ServiceType.MARKET_DATA, data) | |
| logger.debug(f"Broadcasted {len(prices)} price updates") | |
| except Exception as e: | |
| logger.error(f"Error broadcasting market data: {e}", exc_info=True) | |
| await asyncio.sleep(self.broadcast_interval) | |
| async def broadcast_news(self): | |
| """Broadcast news updates""" | |
| logger.info("Starting news broadcast...") | |
| last_news_id = 0 | |
| while self.is_running: | |
| try: | |
| news = db_manager.get_latest_news(limit=10) | |
| if news and (not last_news_id or news[0].id != last_news_id): | |
| # New news available | |
| last_news_id = news[0].id | |
| data = { | |
| "type": "news", | |
| "data": { | |
| "articles": [ | |
| { | |
| "id": article.id, | |
| "title": article.title, | |
| "source": article.source, | |
| "url": article.url, | |
| "published_at": article.published_at.isoformat(), | |
| "sentiment": article.sentiment | |
| } | |
| for article in news[:5] # Only send 5 latest | |
| ] | |
| }, | |
| "count": len(news[:5]), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| await ws_manager.broadcast_to_service(ServiceType.NEWS, data) | |
| logger.info(f"Broadcasted {len(news[:5])} news articles") | |
| except Exception as e: | |
| logger.error(f"Error broadcasting news: {e}", exc_info=True) | |
| await asyncio.sleep(30) # Check every 30 seconds | |
| async def broadcast_sentiment(self): | |
| """Broadcast sentiment updates""" | |
| logger.info("Starting sentiment broadcast...") | |
| last_sentiment_value = None | |
| while self.is_running: | |
| try: | |
| sentiment = db_manager.get_latest_sentiment() | |
| if sentiment and sentiment.value != last_sentiment_value: | |
| last_sentiment_value = sentiment.value | |
| data = { | |
| "type": "sentiment", | |
| "data": { | |
| "fear_greed_index": sentiment.value, | |
| "classification": sentiment.classification, | |
| "metric_name": sentiment.metric_name, | |
| "source": sentiment.source, | |
| "timestamp": sentiment.timestamp.isoformat() | |
| }, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, data) | |
| logger.info(f"Broadcasted sentiment: {sentiment.value} ({sentiment.classification})") | |
| except Exception as e: | |
| logger.error(f"Error broadcasting sentiment: {e}", exc_info=True) | |
| await asyncio.sleep(60) # Check every minute | |
| async def broadcast_whales(self): | |
| """Broadcast whale transaction updates""" | |
| logger.info("Starting whale transaction broadcast...") | |
| last_whale_id = 0 | |
| while self.is_running: | |
| try: | |
| whales = db_manager.get_whale_transactions(limit=5) | |
| if whales and (not last_whale_id or whales[0].id != last_whale_id): | |
| last_whale_id = whales[0].id | |
| data = { | |
| "type": "whale_transaction", | |
| "data": { | |
| "transactions": [ | |
| { | |
| "id": tx.id, | |
| "blockchain": tx.blockchain, | |
| "amount_usd": tx.amount_usd, | |
| "from_address": tx.from_address[:20] + "...", | |
| "to_address": tx.to_address[:20] + "...", | |
| "timestamp": tx.timestamp.isoformat() | |
| } | |
| for tx in whales | |
| ] | |
| }, | |
| "count": len(whales), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| await ws_manager.broadcast_to_service(ServiceType.WHALE_TRACKING, data) | |
| logger.info(f"Broadcasted {len(whales)} whale transactions") | |
| except Exception as e: | |
| logger.error(f"Error broadcasting whales: {e}", exc_info=True) | |
| await asyncio.sleep(15) # Check every 15 seconds | |
| async def broadcast_gas_prices(self): | |
| """Broadcast gas price updates""" | |
| logger.info("Starting gas price broadcast...") | |
| while self.is_running: | |
| try: | |
| gas_prices = db_manager.get_latest_gas_prices() | |
| if gas_prices: | |
| data = { | |
| "type": "gas_prices", | |
| "data": gas_prices, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Broadcast to RPC_NODES service type (gas prices are blockchain-related) | |
| await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, data) | |
| logger.debug("Broadcasted gas prices") | |
| except Exception as e: | |
| logger.error(f"Error broadcasting gas prices: {e}", exc_info=True) | |
| await asyncio.sleep(30) # Every 30 seconds | |
| # Global broadcaster instance | |
| broadcaster = DataBroadcaster() | |