Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| """ | |
| Market Data Collectors | |
| Fetches cryptocurrency market data from CoinGecko, CoinMarketCap, and Binance | |
| """ | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from typing import Dict, List, Optional, Any | |
| from utils.api_client import get_client | |
| from utils.logger import setup_logger, log_api_request, log_error | |
| from config import config | |
| logger = setup_logger("market_data_collector") | |
| def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: | |
| """ | |
| Calculate staleness in minutes from data timestamp to now | |
| Args: | |
| data_timestamp: Timestamp of the data | |
| Returns: | |
| Staleness in minutes or None if timestamp not available | |
| """ | |
| if not data_timestamp: | |
| return None | |
| now = datetime.now(timezone.utc) | |
| if data_timestamp.tzinfo is None: | |
| data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) | |
| delta = now - data_timestamp | |
| return delta.total_seconds() / 60.0 | |
| async def get_coingecko_simple_price() -> Dict[str, Any]: | |
| """ | |
| Fetch BTC, ETH, BNB prices from CoinGecko simple/price endpoint | |
| Returns: | |
| Dict with provider, category, data, timestamp, staleness, success, error | |
| """ | |
| provider = "CoinGecko" | |
| category = "market_data" | |
| endpoint = "/simple/price" | |
| logger.info(f"Fetching simple price from {provider}") | |
| try: | |
| client = get_client() | |
| provider_config = config.get_provider(provider) | |
| if not provider_config: | |
| error_msg = f"Provider {provider} not configured" | |
| log_error(logger, provider, "config_error", error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg | |
| } | |
| # Build request URL | |
| url = f"{provider_config.endpoint_url}{endpoint}" | |
| params = { | |
| "ids": "bitcoin,ethereum,binancecoin", | |
| "vs_currencies": "usd", | |
| "include_market_cap": "true", | |
| "include_24hr_vol": "true", | |
| "include_24hr_change": "true", | |
| "include_last_updated_at": "true" | |
| } | |
| # Make request | |
| response = await client.get(url, params=params, timeout=provider_config.timeout_ms // 1000) | |
| # Log request | |
| log_api_request( | |
| logger, | |
| provider, | |
| endpoint, | |
| response.get("response_time_ms", 0), | |
| "success" if response["success"] else "error", | |
| response.get("status_code") | |
| ) | |
| if not response["success"]: | |
| error_msg = response.get("error_message", "Unknown error") | |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": response.get("error_type") | |
| } | |
| # Extract data | |
| data = response["data"] | |
| # Parse timestamps from response | |
| data_timestamp = None | |
| if isinstance(data, dict): | |
| # CoinGecko returns last_updated_at as Unix timestamp | |
| for coin_data in data.values(): | |
| if isinstance(coin_data, dict) and "last_updated_at" in coin_data: | |
| data_timestamp = datetime.fromtimestamp( | |
| coin_data["last_updated_at"], | |
| tz=timezone.utc | |
| ) | |
| break | |
| staleness = calculate_staleness_minutes(data_timestamp) | |
| logger.info( | |
| f"{provider} - {endpoint} - Retrieved {len(data) if isinstance(data, dict) else 0} coins, " | |
| f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" | |
| ) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": data, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, | |
| "staleness_minutes": staleness, | |
| "success": True, | |
| "error": None, | |
| "response_time_ms": response.get("response_time_ms", 0) | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "exception" | |
| } | |
| async def get_coinmarketcap_quotes() -> Dict[str, Any]: | |
| """ | |
| Fetch BTC, ETH, BNB market data from CoinMarketCap quotes endpoint | |
| Returns: | |
| Dict with provider, category, data, timestamp, staleness, success, error | |
| """ | |
| provider = "CoinMarketCap" | |
| category = "market_data" | |
| endpoint = "/cryptocurrency/quotes/latest" | |
| logger.info(f"Fetching quotes from {provider}") | |
| try: | |
| client = get_client() | |
| provider_config = config.get_provider(provider) | |
| if not provider_config: | |
| error_msg = f"Provider {provider} not configured" | |
| log_error(logger, provider, "config_error", error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg | |
| } | |
| # Check if API key is available | |
| if provider_config.requires_key and not provider_config.api_key: | |
| error_msg = f"API key required but not configured for {provider}" | |
| log_error(logger, provider, "auth_error", error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "missing_api_key" | |
| } | |
| # Build request | |
| url = f"{provider_config.endpoint_url}{endpoint}" | |
| headers = { | |
| "X-CMC_PRO_API_KEY": provider_config.api_key, | |
| "Accept": "application/json" | |
| } | |
| params = { | |
| "symbol": "BTC,ETH,BNB", | |
| "convert": "USD" | |
| } | |
| # Make request | |
| response = await client.get( | |
| url, | |
| headers=headers, | |
| params=params, | |
| timeout=provider_config.timeout_ms // 1000 | |
| ) | |
| # Log request | |
| log_api_request( | |
| logger, | |
| provider, | |
| endpoint, | |
| response.get("response_time_ms", 0), | |
| "success" if response["success"] else "error", | |
| response.get("status_code") | |
| ) | |
| if not response["success"]: | |
| error_msg = response.get("error_message", "Unknown error") | |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": response.get("error_type") | |
| } | |
| # Extract data | |
| data = response["data"] | |
| # Parse timestamp from response | |
| data_timestamp = None | |
| if isinstance(data, dict) and "data" in data: | |
| # CoinMarketCap response structure | |
| for coin_data in data["data"].values(): | |
| if isinstance(coin_data, dict) and "quote" in coin_data: | |
| quote = coin_data.get("quote", {}).get("USD", {}) | |
| if "last_updated" in quote: | |
| try: | |
| data_timestamp = datetime.fromisoformat( | |
| quote["last_updated"].replace("Z", "+00:00") | |
| ) | |
| break | |
| except: | |
| pass | |
| staleness = calculate_staleness_minutes(data_timestamp) | |
| coin_count = len(data.get("data", {})) if isinstance(data, dict) else 0 | |
| logger.info( | |
| f"{provider} - {endpoint} - Retrieved {coin_count} coins, " | |
| f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" | |
| ) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": data, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, | |
| "staleness_minutes": staleness, | |
| "success": True, | |
| "error": None, | |
| "response_time_ms": response.get("response_time_ms", 0) | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "exception" | |
| } | |
| async def get_binance_ticker() -> Dict[str, Any]: | |
| """ | |
| Fetch ticker data from Binance public API (24hr ticker) | |
| Returns: | |
| Dict with provider, category, data, timestamp, staleness, success, error | |
| """ | |
| provider = "Binance" | |
| category = "market_data" | |
| endpoint = "/api/v3/ticker/24hr" | |
| logger.info(f"Fetching 24hr ticker from {provider}") | |
| try: | |
| client = get_client() | |
| # Binance API base URL | |
| url = f"https://api.binance.com{endpoint}" | |
| params = { | |
| "symbols": '["BTCUSDT","ETHUSDT","BNBUSDT"]' | |
| } | |
| # Make request | |
| response = await client.get(url, params=params, timeout=10) | |
| # Log request | |
| log_api_request( | |
| logger, | |
| provider, | |
| endpoint, | |
| response.get("response_time_ms", 0), | |
| "success" if response["success"] else "error", | |
| response.get("status_code") | |
| ) | |
| if not response["success"]: | |
| error_msg = response.get("error_message", "Unknown error") | |
| log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": response.get("error_type") | |
| } | |
| # Extract data | |
| data = response["data"] | |
| # Parse timestamp from response | |
| # Binance returns closeTime as Unix timestamp in milliseconds | |
| data_timestamp = None | |
| if isinstance(data, list) and len(data) > 0: | |
| first_ticker = data[0] | |
| if isinstance(first_ticker, dict) and "closeTime" in first_ticker: | |
| try: | |
| data_timestamp = datetime.fromtimestamp( | |
| first_ticker["closeTime"] / 1000, | |
| tz=timezone.utc | |
| ) | |
| except: | |
| pass | |
| staleness = calculate_staleness_minutes(data_timestamp) | |
| ticker_count = len(data) if isinstance(data, list) else 0 | |
| logger.info( | |
| f"{provider} - {endpoint} - Retrieved {ticker_count} tickers, " | |
| f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" | |
| ) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": data, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, | |
| "staleness_minutes": staleness, | |
| "success": True, | |
| "error": None, | |
| "response_time_ms": response.get("response_time_ms", 0) | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error: {str(e)}" | |
| log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) | |
| return { | |
| "provider": provider, | |
| "category": category, | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": error_msg, | |
| "error_type": "exception" | |
| } | |
| async def collect_market_data() -> List[Dict[str, Any]]: | |
| """ | |
| Main function to collect market data from all sources | |
| Returns: | |
| List of results from all market data collectors | |
| """ | |
| logger.info("Starting market data collection from all sources") | |
| # Run all collectors concurrently | |
| results = await asyncio.gather( | |
| get_coingecko_simple_price(), | |
| get_coinmarketcap_quotes(), | |
| get_binance_ticker(), | |
| return_exceptions=True | |
| ) | |
| # Process results | |
| processed_results = [] | |
| for result in results: | |
| if isinstance(result, Exception): | |
| logger.error(f"Collector failed with exception: {str(result)}") | |
| processed_results.append({ | |
| "provider": "Unknown", | |
| "category": "market_data", | |
| "data": None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "staleness_minutes": None, | |
| "success": False, | |
| "error": str(result), | |
| "error_type": "exception" | |
| }) | |
| else: | |
| processed_results.append(result) | |
| # Log summary | |
| successful = sum(1 for r in processed_results if r.get("success", False)) | |
| logger.info(f"Market data collection complete: {successful}/{len(processed_results)} successful") | |
| return processed_results | |
| class MarketDataCollector: | |
| """ | |
| Market Data Collector class for WebSocket streaming interface | |
| Wraps the standalone market data collection functions | |
| """ | |
| def __init__(self, config: Any = None): | |
| """ | |
| Initialize the market data collector | |
| Args: | |
| config: Configuration object (optional, for compatibility) | |
| """ | |
| self.config = config | |
| self.logger = logger | |
| async def collect(self) -> Dict[str, Any]: | |
| """ | |
| Collect market data from all sources | |
| Returns: | |
| Dict with aggregated market data | |
| """ | |
| results = await collect_market_data() | |
| # Aggregate data for WebSocket streaming | |
| aggregated = { | |
| "prices": {}, | |
| "volumes": {}, | |
| "market_caps": {}, | |
| "price_changes": {}, | |
| "sources": [], | |
| "timestamp": datetime.now(timezone.utc).isoformat() | |
| } | |
| for result in results: | |
| if result.get("success") and result.get("data"): | |
| provider = result.get("provider", "unknown") | |
| aggregated["sources"].append(provider) | |
| data = result["data"] | |
| # Parse CoinGecko data | |
| if provider == "CoinGecko" and isinstance(data, dict): | |
| for coin_id, coin_data in data.items(): | |
| if isinstance(coin_data, dict): | |
| symbol = coin_id.upper() | |
| if "usd" in coin_data: | |
| aggregated["prices"][symbol] = coin_data["usd"] | |
| if "usd_market_cap" in coin_data: | |
| aggregated["market_caps"][symbol] = coin_data["usd_market_cap"] | |
| if "usd_24h_vol" in coin_data: | |
| aggregated["volumes"][symbol] = coin_data["usd_24h_vol"] | |
| if "usd_24h_change" in coin_data: | |
| aggregated["price_changes"][symbol] = coin_data["usd_24h_change"] | |
| # Parse CoinMarketCap data | |
| elif provider == "CoinMarketCap" and isinstance(data, dict): | |
| if "data" in data: | |
| for symbol, coin_data in data["data"].items(): | |
| if isinstance(coin_data, dict) and "quote" in coin_data: | |
| quote = coin_data.get("quote", {}).get("USD", {}) | |
| if "price" in quote: | |
| aggregated["prices"][symbol] = quote["price"] | |
| if "market_cap" in quote: | |
| aggregated["market_caps"][symbol] = quote["market_cap"] | |
| if "volume_24h" in quote: | |
| aggregated["volumes"][symbol] = quote["volume_24h"] | |
| if "percent_change_24h" in quote: | |
| aggregated["price_changes"][symbol] = quote["percent_change_24h"] | |
| # Parse Binance data | |
| elif provider == "Binance" and isinstance(data, list): | |
| for ticker in data: | |
| if isinstance(ticker, dict): | |
| symbol = ticker.get("symbol", "").replace("USDT", "") | |
| if "lastPrice" in ticker: | |
| aggregated["prices"][symbol] = float(ticker["lastPrice"]) | |
| if "volume" in ticker: | |
| aggregated["volumes"][symbol] = float(ticker["volume"]) | |
| if "priceChangePercent" in ticker: | |
| aggregated["price_changes"][symbol] = float(ticker["priceChangePercent"]) | |
| return aggregated | |
| # Example usage | |
| if __name__ == "__main__": | |
| async def main(): | |
| results = await collect_market_data() | |
| print("\n=== Market Data Collection Results ===") | |
| for result in results: | |
| print(f"\nProvider: {result['provider']}") | |
| print(f"Success: {result['success']}") | |
| print(f"Staleness: {result.get('staleness_minutes', 'N/A')} minutes") | |
| if result['success']: | |
| print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") | |
| else: | |
| print(f"Error: {result.get('error', 'Unknown')}") | |
| asyncio.run(main()) | |