Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| #!/usr/bin/env python3 | |
| """ | |
| Comprehensive Data Worker - Collect ALL Data from ALL Sources | |
| Uses all resources from crypto_resources_unified_2025-11-11.json | |
| This worker ensures ZERO data sources are left unused: | |
| - 23 Market Data APIs | |
| - 15 News APIs | |
| - 12 Sentiment APIs | |
| - 13 On-chain Analytics APIs | |
| - 9 Whale Tracking APIs | |
| - 18 Block Explorers | |
| - 1 Community Sentiment API | |
| - 24 RPC Nodes | |
| - 7 HuggingFace Resources | |
| - 13 Free HTTP Endpoints | |
| ALL data is uploaded to HuggingFace Datasets | |
| """ | |
| import asyncio | |
| import time | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional | |
| import httpx | |
| from database.cache_queries import get_cache_queries | |
| from database.db_manager import db_manager | |
| from utils.logger import setup_logger | |
| from unified_resource_loader import get_loader | |
| logger = setup_logger("comprehensive_worker") | |
| # Get resource loader | |
| resource_loader = get_loader() | |
| cache = get_cache_queries(db_manager) | |
| # HuggingFace Dataset Uploader | |
| HF_UPLOAD_ENABLED = bool(os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")) | |
| if HF_UPLOAD_ENABLED: | |
| try: | |
| from hf_dataset_uploader import get_dataset_uploader | |
| hf_uploader = get_dataset_uploader() | |
| logger.info("✅ HuggingFace Dataset upload ENABLED for comprehensive worker") | |
| except Exception as e: | |
| logger.warning(f"HuggingFace Dataset upload disabled: {e}") | |
| HF_UPLOAD_ENABLED = False | |
| hf_uploader = None | |
| else: | |
| logger.info("ℹ️ HuggingFace Dataset upload DISABLED (no HF_TOKEN)") | |
| hf_uploader = None | |
| # ============================================================================ | |
| # NEWS DATA WORKER | |
| # ============================================================================ | |
| async def fetch_news_from_cryptopanic() -> List[Dict[str, Any]]: | |
| """Fetch news from CryptoPanic (FREE, no API key)""" | |
| try: | |
| url = "https://cryptopanic.com/api/v1/posts/" | |
| params = {"auth_token": "free", "public": "true", "kind": "news", "filter": "rising"} | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| news_items = [] | |
| for post in data.get("results", [])[:15]: | |
| news_items.append({ | |
| "title": post.get("title", ""), | |
| "description": post.get("title", ""), | |
| "url": post.get("url", ""), | |
| "published_at": post.get("created_at", ""), | |
| "source": "CryptoPanic", | |
| "source_id": "cryptopanic", | |
| "category": "news", | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| }) | |
| logger.info(f"✅ CryptoPanic: {len(news_items)} articles") | |
| return news_items | |
| except Exception as e: | |
| logger.debug(f"CryptoPanic error: {e}") | |
| return [] | |
| async def fetch_news_from_coinstats() -> List[Dict[str, Any]]: | |
| """Fetch news from CoinStats (FREE, no API key)""" | |
| try: | |
| url = "https://api.coin-stats.com/v2/news" | |
| params = {"limit": 20} | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| news_items = [] | |
| for article in data.get("news", [])[:15]: | |
| news_items.append({ | |
| "title": article.get("title", ""), | |
| "description": article.get("description", ""), | |
| "url": article.get("link", ""), | |
| "published_at": article.get("published", ""), | |
| "source": "CoinStats", | |
| "source_id": "coinstats", | |
| "category": "news", | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| }) | |
| logger.info(f"✅ CoinStats: {len(news_items)} articles") | |
| return news_items | |
| except Exception as e: | |
| logger.debug(f"CoinStats error: {e}") | |
| return [] | |
| async def fetch_news_data() -> List[Dict[str, Any]]: | |
| """ | |
| Fetch news from multiple free sources | |
| Sources: | |
| - CryptoPanic (FREE, no API key) | |
| - CoinStats (FREE, no API key) | |
| - Other news sources from registry | |
| """ | |
| news_data = [] | |
| # Fetch from reliable free sources first | |
| try: | |
| cryptopanic_news = await fetch_news_from_cryptopanic() | |
| news_data.extend(cryptopanic_news) | |
| except Exception as e: | |
| logger.debug(f"Error fetching CryptoPanic: {e}") | |
| try: | |
| coinstats_news = await fetch_news_from_coinstats() | |
| news_data.extend(coinstats_news) | |
| except Exception as e: | |
| logger.debug(f"Error fetching CoinStats: {e}") | |
| # Try additional sources from registry | |
| news_resources = resource_loader.get_resources_by_category("news") | |
| logger.info(f"📰 Fetching news from {len(news_resources)} additional sources...") | |
| for resource in news_resources: | |
| try: | |
| # Skip if requires auth and no key | |
| if resource.auth_type != "none" and not resource.api_key: | |
| logger.debug(f"Skipping {resource.name} (no API key)") | |
| continue | |
| # Build request based on resource | |
| url = resource.base_url | |
| headers = {} | |
| params = {} | |
| # Add auth if needed | |
| if resource.auth_type == "apiKeyHeader" and resource.api_key: | |
| headers["Authorization"] = f"Bearer {resource.api_key}" | |
| elif resource.auth_type == "apiKeyQuery" and resource.api_key: | |
| params["apiKey"] = resource.api_key | |
| # Special handling for different news APIs | |
| if "newsapi" in resource.id: | |
| # Skip NewsAPI if no valid key | |
| if not resource.api_key or resource.api_key.startswith("pub_"): | |
| logger.debug(f"Skipping {resource.name} (invalid API key)") | |
| continue | |
| url = f"{resource.base_url}/everything" | |
| params.update({ | |
| "q": "cryptocurrency OR bitcoin OR ethereum", | |
| "language": "en", | |
| "sortBy": "publishedAt", | |
| "pageSize": 20 | |
| }) | |
| elif "cryptopanic" in resource.id: | |
| # Already handled above | |
| continue | |
| elif "cryptocontrol" in resource.id: | |
| url = f"{resource.base_url}/news" | |
| # Fetch data | |
| logger.debug(f"Fetching from {resource.name}...") | |
| async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: | |
| response = await client.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| # Check if response is JSON | |
| content_type = response.headers.get("content-type", "") | |
| if "application/json" not in content_type and "text/json" not in content_type: | |
| # Might be RSS feed or HTML - skip for now | |
| logger.debug(f"Non-JSON response from {resource.name}: {content_type}") | |
| continue | |
| data = response.json() | |
| # Parse response based on source | |
| articles = [] | |
| if "newsapi" in resource.id: | |
| articles = data.get("articles", []) | |
| elif "cryptopanic" in resource.id: | |
| articles = data.get("results", []) | |
| else: | |
| articles = data if isinstance(data, list) else data.get("news", []) | |
| # Normalize articles | |
| for article in articles[:10]: # Limit per source | |
| try: | |
| normalized = { | |
| "title": article.get("title", article.get("name", "")), | |
| "description": article.get("description", article.get("summary", "")), | |
| "url": article.get("url", article.get("link", "")), | |
| "published_at": article.get("publishedAt", article.get("published_at", article.get("created_at", ""))), | |
| "source": resource.name, | |
| "source_id": resource.id, | |
| "category": "news", | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| } | |
| news_data.append(normalized) | |
| except Exception as e: | |
| logger.debug(f"Error parsing article: {e}") | |
| continue | |
| logger.info(f"✅ {resource.name}: {len(articles[:10])} articles") | |
| except httpx.HTTPError as e: | |
| logger.debug(f"HTTP error from {resource.name}: {e}") | |
| except Exception as e: | |
| logger.debug(f"Error fetching from {resource.name}: {e}") | |
| logger.info(f"📰 Total news articles collected: {len(news_data)}") | |
| return news_data | |
| # ============================================================================ | |
| # SENTIMENT DATA WORKER | |
| # ============================================================================ | |
| async def fetch_fear_greed_index() -> List[Dict[str, Any]]: | |
| """Fetch Fear & Greed Index from Alternative.me (FREE, no API key)""" | |
| try: | |
| url = "https://api.alternative.me/fng/" | |
| params = {"limit": "1"} | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| fng_list = data.get("data", []) | |
| if isinstance(fng_list, list) and len(fng_list) > 0: | |
| fng_data = fng_list[0] | |
| sentiment = { | |
| "metric": "fear_greed_index", | |
| "value": float(fng_data.get("value", 0)), | |
| "classification": fng_data.get("value_classification", ""), | |
| "source": "Alternative.me", | |
| "source_id": "alternative-me-fng", | |
| "timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z", | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| } | |
| logger.info(f"✅ Fear & Greed Index: {fng_data.get('value')} ({fng_data.get('value_classification')})") | |
| return [sentiment] | |
| except Exception as e: | |
| logger.debug(f"Fear & Greed Index error: {e}") | |
| return [] | |
| async def fetch_sentiment_data() -> List[Dict[str, Any]]: | |
| """ | |
| Fetch sentiment data from multiple sources | |
| Sources: | |
| - Alternative.me Fear & Greed Index (FREE, no API key) | |
| - LunarCrush (requires API key) | |
| - Santiment (requires API key) | |
| - And other sentiment sources from registry | |
| """ | |
| sentiment_data = [] | |
| # Fetch Fear & Greed Index first (most reliable free source) | |
| try: | |
| fng_data = await fetch_fear_greed_index() | |
| sentiment_data.extend(fng_data) | |
| except Exception as e: | |
| logger.debug(f"Error fetching Fear & Greed Index: {e}") | |
| sentiment_resources = resource_loader.get_resources_by_category("sentiment") | |
| logger.info(f"😊 Fetching sentiment from {len(sentiment_resources)} additional sources...") | |
| for resource in sentiment_resources: | |
| try: | |
| # Skip if requires auth and no key | |
| if resource.auth_type != "none" and not resource.api_key: | |
| logger.debug(f"Skipping {resource.name} (no API key)") | |
| continue | |
| url = resource.base_url | |
| headers = {} | |
| params = {} | |
| # Add auth | |
| if resource.auth_type == "apiKeyHeader" and resource.api_key: | |
| headers["Authorization"] = f"Bearer {resource.api_key}" | |
| elif resource.auth_type == "apiKeyQuery" and resource.api_key: | |
| params["api_key"] = resource.api_key | |
| # Special handling for different APIs | |
| if "alternative.me" in resource.id or "alternative-me" in resource.id: | |
| # Already handled above | |
| continue | |
| elif "lunarcrush" in resource.id: | |
| url = f"{resource.base_url}/assets" | |
| params.update({"symbol": "BTC,ETH,BNB", "data_points": 1}) | |
| # Fetch data | |
| logger.debug(f"Fetching from {resource.name}...") | |
| async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: | |
| response = await client.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| # Check content type | |
| content_type = response.headers.get("content-type", "") | |
| if "application/json" not in content_type and "text/json" not in content_type: | |
| logger.debug(f"Non-JSON response from {resource.name}: {content_type}") | |
| continue | |
| data = response.json() | |
| # Parse based on source | |
| if "alternative.me" in resource.id or "alternative-me" in resource.id: | |
| fng_list = data.get("data", []) | |
| if isinstance(fng_list, list) and len(fng_list) > 0: | |
| fng_data = fng_list[0] | |
| sentiment_data.append({ | |
| "metric": "fear_greed_index", | |
| "value": float(fng_data.get("value", 0)), | |
| "classification": fng_data.get("value_classification", ""), | |
| "source": resource.name, | |
| "source_id": resource.id, | |
| "timestamp": datetime.fromtimestamp(int(fng_data.get("timestamp", time.time()))).isoformat() + "Z", | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| }) | |
| logger.info(f"✅ {resource.name}: FNG = {fng_data.get('value')} ({fng_data.get('value_classification')})") | |
| elif "lunarcrush" in resource.id: | |
| assets = data.get("data", []) | |
| for asset in assets: | |
| sentiment_data.append({ | |
| "symbol": asset.get("symbol", ""), | |
| "metric": "galaxy_score", | |
| "value": float(asset.get("galaxy_score", 0)), | |
| "alt_rank": asset.get("alt_rank"), | |
| "social_volume": asset.get("social_volume"), | |
| "source": resource.name, | |
| "source_id": resource.id, | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| }) | |
| logger.info(f"✅ {resource.name}: {len(assets)} assets") | |
| except httpx.HTTPError as e: | |
| logger.debug(f"HTTP error from {resource.name}: {e}") | |
| except Exception as e: | |
| logger.debug(f"Error fetching from {resource.name}: {e}") | |
| logger.info(f"😊 Total sentiment data collected: {len(sentiment_data)}") | |
| return sentiment_data | |
| # ============================================================================ | |
| # ON-CHAIN ANALYTICS WORKER | |
| # ============================================================================ | |
| async def fetch_onchain_data() -> List[Dict[str, Any]]: | |
| """ | |
| Fetch on-chain analytics from ALL on-chain APIs | |
| Sources: | |
| - Glassnode | |
| - IntoTheBlock | |
| - CryptoQuant | |
| - And all other on-chain sources (13 total) | |
| """ | |
| onchain_data = [] | |
| onchain_resources = resource_loader.get_resources_by_category("onchain_analytics") | |
| logger.info(f"⛓️ Fetching on-chain data from {len(onchain_resources)} sources...") | |
| for resource in onchain_resources: | |
| try: | |
| # Most on-chain APIs require auth - skip if no key | |
| if resource.auth_type != "none" and not resource.api_key: | |
| logger.debug(f"Skipping {resource.name} (no API key)") | |
| continue | |
| # For demo, we'll try to fetch basic metrics | |
| url = resource.base_url | |
| headers = {} | |
| params = {} | |
| if resource.auth_type == "apiKeyQuery" and resource.api_key: | |
| params["api_key"] = resource.api_key | |
| elif resource.auth_type == "apiKeyHeader" and resource.api_key: | |
| headers["Authorization"] = f"Bearer {resource.api_key}" | |
| # Try to fetch (many will fail without proper API keys) | |
| logger.debug(f"Attempting {resource.name}...") | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Store raw data | |
| onchain_data.append({ | |
| "source": resource.name, | |
| "source_id": resource.id, | |
| "data": data, | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| }) | |
| logger.info(f"✅ {resource.name}: Data received") | |
| except httpx.HTTPError as e: | |
| logger.debug(f"HTTP error from {resource.name}: {e}") | |
| except Exception as e: | |
| logger.debug(f"Error from {resource.name}: {e}") | |
| logger.info(f"⛓️ Total on-chain data points: {len(onchain_data)}") | |
| return onchain_data | |
| # ============================================================================ | |
| # WHALE TRACKING WORKER | |
| # ============================================================================ | |
| async def fetch_whale_data() -> List[Dict[str, Any]]: | |
| """ | |
| Fetch whale transactions from ALL whale tracking APIs | |
| Sources: | |
| - Whale Alert | |
| - Whale Watcher | |
| - And all other whale tracking sources (9 total) | |
| """ | |
| whale_data = [] | |
| whale_resources = resource_loader.get_resources_by_category("whale_tracking") | |
| logger.info(f"🐋 Fetching whale data from {len(whale_resources)} sources...") | |
| for resource in whale_resources: | |
| try: | |
| if resource.auth_type != "none" and not resource.api_key: | |
| logger.debug(f"Skipping {resource.name} (no API key)") | |
| continue | |
| url = resource.base_url | |
| headers = {} | |
| params = {} | |
| if resource.auth_type == "apiKeyQuery" and resource.api_key: | |
| params["api_key"] = resource.api_key | |
| elif resource.auth_type == "apiKeyHeader" and resource.api_key: | |
| headers["X-API-Key"] = resource.api_key | |
| # Special handling for Whale Alert | |
| if "whale-alert" in resource.id and resource.endpoints: | |
| url = f"{resource.base_url}/transactions" | |
| params["min_value"] = 500000 # Min $500k | |
| logger.debug(f"Fetching from {resource.name}...") | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| transactions = data.get("transactions", []) if isinstance(data, dict) else data | |
| for tx in transactions[:20]: # Limit per source | |
| whale_data.append({ | |
| "source": resource.name, | |
| "source_id": resource.id, | |
| "transaction": tx, | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| }) | |
| logger.info(f"✅ {resource.name}: {len(transactions[:20])} transactions") | |
| except httpx.HTTPError as e: | |
| logger.debug(f"HTTP error from {resource.name}: {e}") | |
| except Exception as e: | |
| logger.debug(f"Error from {resource.name}: {e}") | |
| logger.info(f"🐋 Total whale transactions: {len(whale_data)}") | |
| return whale_data | |
| # ============================================================================ | |
| # BLOCK EXPLORER DATA WORKER | |
| # ============================================================================ | |
| async def fetch_block_explorer_data() -> List[Dict[str, Any]]: | |
| """ | |
| Fetch blockchain data from ALL block explorers | |
| Sources: | |
| - Etherscan | |
| - BscScan | |
| - Polygonscan | |
| - And all other block explorers (18 total) | |
| """ | |
| explorer_data = [] | |
| explorer_resources = resource_loader.get_resources_by_category("block_explorers") | |
| logger.info(f"🔍 Fetching from {len(explorer_resources)} block explorers...") | |
| for resource in explorer_resources: | |
| try: | |
| if resource.auth_type != "none" and not resource.api_key: | |
| logger.debug(f"Skipping {resource.name} (no API key)") | |
| continue | |
| url = f"{resource.base_url}/api" | |
| params = { | |
| "module": "stats", | |
| "action": "ethprice", # Get ETH/chain price | |
| } | |
| if resource.api_key: | |
| params["apikey"] = resource.api_key | |
| logger.debug(f"Fetching from {resource.name}...") | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| response = await client.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("status") == "1": | |
| result = data.get("result", {}) | |
| explorer_data.append({ | |
| "chain": resource.chain if hasattr(resource, 'chain') else "unknown", | |
| "source": resource.name, | |
| "source_id": resource.id, | |
| "price_usd": result.get("ethusd"), | |
| "price_btc": result.get("ethbtc"), | |
| "fetched_at": datetime.utcnow().isoformat() + "Z" | |
| }) | |
| logger.info(f"✅ {resource.name}: Price data received") | |
| except httpx.HTTPError as e: | |
| logger.debug(f"HTTP error from {resource.name}: {e}") | |
| except Exception as e: | |
| logger.debug(f"Error from {resource.name}: {e}") | |
| logger.info(f"🔍 Total block explorer data: {len(explorer_data)}") | |
| return explorer_data | |
| # ============================================================================ | |
| # SAVE AND UPLOAD FUNCTIONS | |
| # ============================================================================ | |
| async def save_and_upload_news(news_data: List[Dict[str, Any]]) -> bool: | |
| """Save news data and upload to HuggingFace""" | |
| if not news_data: | |
| return False | |
| logger.info(f"💾 Saving {len(news_data)} news articles...") | |
| # Upload to HuggingFace | |
| if HF_UPLOAD_ENABLED and hf_uploader: | |
| try: | |
| logger.info(f"📤 Uploading {len(news_data)} news articles to HuggingFace...") | |
| success = await hf_uploader.upload_news_data(news_data, append=True) | |
| if success: | |
| logger.info(f"✅ Successfully uploaded news to HuggingFace") | |
| return True | |
| else: | |
| logger.warning(f"⚠️ Failed to upload news to HuggingFace") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error uploading news to HuggingFace: {e}") | |
| return False | |
| return True | |
| async def save_and_upload_sentiment(sentiment_data: List[Dict[str, Any]]) -> bool: | |
| """Save sentiment data and upload to HuggingFace""" | |
| if not sentiment_data: | |
| return False | |
| logger.info(f"💾 Saving {len(sentiment_data)} sentiment records...") | |
| if HF_UPLOAD_ENABLED and hf_uploader: | |
| try: | |
| logger.info(f"📤 Uploading {len(sentiment_data)} sentiment records to HuggingFace...") | |
| success = await hf_uploader.upload_sentiment_data(sentiment_data, append=True) | |
| if success: | |
| logger.info(f"✅ Successfully uploaded sentiment to HuggingFace") | |
| return True | |
| else: | |
| logger.warning(f"⚠️ Failed to upload sentiment to HuggingFace") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error uploading sentiment: {e}") | |
| return False | |
| return True | |
| async def save_and_upload_onchain(onchain_data: List[Dict[str, Any]]) -> bool: | |
| """Save on-chain data and upload to HuggingFace""" | |
| if not onchain_data: | |
| return False | |
| logger.info(f"💾 Saving {len(onchain_data)} on-chain records...") | |
| if HF_UPLOAD_ENABLED and hf_uploader: | |
| try: | |
| logger.info(f"📤 Uploading {len(onchain_data)} on-chain records to HuggingFace...") | |
| success = await hf_uploader.upload_onchain_data(onchain_data, append=True) | |
| if success: | |
| logger.info(f"✅ Successfully uploaded on-chain data to HuggingFace") | |
| return True | |
| else: | |
| logger.warning(f"⚠️ Failed to upload on-chain data to HuggingFace") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error uploading on-chain data: {e}") | |
| return False | |
| return True | |
| async def save_and_upload_whale(whale_data: List[Dict[str, Any]]) -> bool: | |
| """Save whale data and upload to HuggingFace""" | |
| if not whale_data: | |
| return False | |
| logger.info(f"💾 Saving {len(whale_data)} whale records...") | |
| if HF_UPLOAD_ENABLED and hf_uploader: | |
| try: | |
| logger.info(f"📤 Uploading {len(whale_data)} whale records to HuggingFace...") | |
| success = await hf_uploader.upload_whale_data(whale_data, append=True) | |
| if success: | |
| logger.info(f"✅ Successfully uploaded whale data to HuggingFace") | |
| return True | |
| else: | |
| logger.warning(f"⚠️ Failed to upload whale data to HuggingFace") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error uploading whale data: {e}") | |
| return False | |
| return True | |
| async def save_and_upload_explorer(explorer_data: List[Dict[str, Any]]) -> bool: | |
| """Save explorer data and upload to HuggingFace""" | |
| if not explorer_data: | |
| return False | |
| logger.info(f"💾 Saving {len(explorer_data)} explorer records...") | |
| if HF_UPLOAD_ENABLED and hf_uploader: | |
| try: | |
| logger.info(f"📤 Uploading {len(explorer_data)} explorer records to HuggingFace...") | |
| success = await hf_uploader.upload_explorer_data(explorer_data, append=True) | |
| if success: | |
| logger.info(f"✅ Successfully uploaded explorer data to HuggingFace") | |
| return True | |
| else: | |
| logger.warning(f"⚠️ Failed to upload explorer data to HuggingFace") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error uploading explorer data: {e}") | |
| return False | |
| return True | |
| # ============================================================================ | |
| # MAIN WORKER LOOP | |
| # ============================================================================ | |
| async def comprehensive_worker_loop(): | |
| """ | |
| Main worker loop - Fetch ALL data from ALL sources | |
| Runs every 5 minutes to avoid rate limits | |
| """ | |
| logger.info("🚀 Starting comprehensive data worker") | |
| logger.info(f"📊 Resource statistics: {resource_loader.get_stats()}") | |
| iteration = 0 | |
| while True: | |
| try: | |
| iteration += 1 | |
| start_time = time.time() | |
| logger.info(f"\n{'='*80}") | |
| logger.info(f"[Iteration {iteration}] Starting comprehensive data collection") | |
| logger.info(f"{'='*80}") | |
| # Fetch from all sources in parallel | |
| results = await asyncio.gather( | |
| fetch_news_data(), | |
| fetch_sentiment_data(), | |
| fetch_onchain_data(), | |
| fetch_whale_data(), | |
| fetch_block_explorer_data(), | |
| return_exceptions=True | |
| ) | |
| news_data, sentiment_data, onchain_data, whale_data, explorer_data = results | |
| # Save and upload ALL data types | |
| await asyncio.gather( | |
| save_and_upload_news(news_data if not isinstance(news_data, Exception) else []), | |
| save_and_upload_sentiment(sentiment_data if not isinstance(sentiment_data, Exception) else []), | |
| save_and_upload_onchain(onchain_data if not isinstance(onchain_data, Exception) else []), | |
| save_and_upload_whale(whale_data if not isinstance(whale_data, Exception) else []), | |
| save_and_upload_explorer(explorer_data if not isinstance(explorer_data, Exception) else []), | |
| return_exceptions=True | |
| ) | |
| elapsed = time.time() - start_time | |
| total_records = sum([ | |
| len(news_data) if not isinstance(news_data, Exception) else 0, | |
| len(sentiment_data) if not isinstance(sentiment_data, Exception) else 0, | |
| len(onchain_data) if not isinstance(onchain_data, Exception) else 0, | |
| len(whale_data) if not isinstance(whale_data, Exception) else 0, | |
| len(explorer_data) if not isinstance(explorer_data, Exception) else 0, | |
| ]) | |
| logger.info(f"\n{'='*80}") | |
| logger.info(f"[Iteration {iteration}] Completed in {elapsed:.2f}s") | |
| logger.info(f"Total records collected: {total_records}") | |
| logger.info(f"{'='*80}\n") | |
| # Wait 5 minutes to avoid rate limits | |
| await asyncio.sleep(300) | |
| except Exception as e: | |
| logger.error(f"[Iteration {iteration}] Worker error: {e}", exc_info=True) | |
| await asyncio.sleep(300) | |
| async def start_comprehensive_worker(): | |
| """Start comprehensive data worker""" | |
| try: | |
| logger.info("Initializing comprehensive data worker...") | |
| # Run initial fetch | |
| logger.info("Running initial data fetch...") | |
| asyncio.create_task(comprehensive_worker_loop()) | |
| logger.info("Comprehensive data worker started successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to start comprehensive worker: {e}", exc_info=True) | |
| # For testing | |
| if __name__ == "__main__": | |
| async def test(): | |
| """Test the worker""" | |
| logger.info("Testing comprehensive data worker...") | |
| # Test each category | |
| news = await fetch_news_data() | |
| logger.info(f"\n✅ News: {len(news)} articles") | |
| sentiment = await fetch_sentiment_data() | |
| logger.info(f"✅ Sentiment: {len(sentiment)} records") | |
| onchain = await fetch_onchain_data() | |
| logger.info(f"✅ On-chain: {len(onchain)} records") | |
| whale = await fetch_whale_data() | |
| logger.info(f"✅ Whale: {len(whale)} transactions") | |
| explorer = await fetch_block_explorer_data() | |
| logger.info(f"✅ Explorer: {len(explorer)} records") | |
| asyncio.run(test()) | |