#!/usr/bin/env python3 """Centralized access to Hugging Face models with ensemble sentiment.""" from __future__ import annotations import logging import os import random import threading import time from dataclasses import dataclass from typing import Any, Dict, List, Mapping, Optional, Sequence from config import HUGGINGFACE_MODELS, get_settings # Module logger must exist before any import-time logging below. logger = logging.getLogger(__name__) # Import environment detector try: from utils.environment_detector import ( get_environment_detector, should_use_ai_models, get_device, is_huggingface_space ) ENV_DETECTOR_AVAILABLE = True except ImportError: ENV_DETECTOR_AVAILABLE = False logger.warning("Environment detector not available") # Only import transformers if we should use AI models TRANSFORMERS_AVAILABLE = False HF_HUB_AVAILABLE = False if ENV_DETECTOR_AVAILABLE: env_detector = get_environment_detector() # Log environment info env_detector.log_environment() # Only import if we should use AI models if should_use_ai_models(): try: from transformers import pipeline TRANSFORMERS_AVAILABLE = True logger.info("✅ Transformers imported successfully") except ImportError: logger.warning("⚠️ Transformers not installed - using fallback mode") TRANSFORMERS_AVAILABLE = False try: from huggingface_hub.errors import RepositoryNotFoundError HF_HUB_AVAILABLE = True except ImportError: HF_HUB_AVAILABLE = False RepositoryNotFoundError = Exception else: logger.info("ℹ️ AI models disabled - using fallback mode only") TRANSFORMERS_AVAILABLE = False else: # Fallback to old behavior if environment detector not available try: from transformers import pipeline TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False try: from huggingface_hub.errors import RepositoryNotFoundError HF_HUB_AVAILABLE = True except ImportError: HF_HUB_AVAILABLE = False RepositoryNotFoundError = Exception try: import requests REQUESTS_AVAILABLE = True except ImportError: REQUESTS_AVAILABLE = False settings = get_settings() HF_TOKEN_ENV = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN") _is_hf_space = is_huggingface_space() if ENV_DETECTOR_AVAILABLE else bool(os.getenv("SPACE_ID")) # Determine HF_MODE based on environment if ENV_DETECTOR_AVAILABLE and not should_use_ai_models(): HF_MODE = "off" # Disable if environment says so else: _default_hf_mode = "public" if TRANSFORMERS_AVAILABLE else "off" HF_MODE = os.getenv("HF_MODE", _default_hf_mode).lower() if HF_MODE not in ("off", "public", "auth"): HF_MODE = "off" logger.warning(f"Invalid HF_MODE, resetting to 'off'") if HF_MODE == "auth" and not HF_TOKEN_ENV: HF_MODE = "off" logger.warning("HF_MODE='auth' but no HF_TOKEN found, resetting to 'off'") # Linked models in HF Space - these are pre-validated LINKED_MODEL_IDS = { "cardiffnlp/twitter-roberta-base-sentiment-latest", "ProsusAI/finbert", "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", "ElKulako/cryptobert", "kk08/CryptoBERT", "agarkovv/CryptoTrader-LM", "StephanAkkerman/FinTwitBERT-sentiment", "OpenC/crypto-gpt-o3-mini", "burakutf/finetuned-finbert-crypto", "mathugo/crypto_news_bert", "mayurjadhav/crypto-sentiment-model", "yiyanghkust/finbert-tone", "facebook/bart-large-cnn", "facebook/bart-large-mnli", "distilbert-base-uncased-finetuned-sst-2-english", "nlptown/bert-base-multilingual-uncased-sentiment", "finiteautomata/bertweet-base-sentiment-analysis", } # Extended Model Catalog - Using VERIFIED public models only # These models are tested and confirmed working on HuggingFace Hub CRYPTO_SENTIMENT_MODELS = [ "kk08/CryptoBERT", # Crypto-specific sentiment binary classification "ElKulako/cryptobert", # Crypto social sentiment (Bullish/Neutral/Bearish) "mayurjadhav/crypto-sentiment-model", # Crypto sentiment analysis "mathugo/crypto_news_bert", # Crypto news sentiment "burakutf/finetuned-finbert-crypto", # Finetuned FinBERT for crypto "cardiffnlp/twitter-roberta-base-sentiment-latest", # Fallback "distilbert-base-uncased-finetuned-sst-2-english", # General sentiment ] SOCIAL_SENTIMENT_MODELS = [ "ElKulako/cryptobert", # Crypto social sentiment "cardiffnlp/twitter-roberta-base-sentiment-latest", # Twitter sentiment "finiteautomata/bertweet-base-sentiment-analysis", # BERTweet sentiment "nlptown/bert-base-multilingual-uncased-sentiment", # Multilingual sentiment "distilbert-base-uncased-finetuned-sst-2-english", # General sentiment ] FINANCIAL_SENTIMENT_MODELS = [ "StephanAkkerman/FinTwitBERT-sentiment", # Financial tweet sentiment "ProsusAI/finbert", # Financial sentiment "yiyanghkust/finbert-tone", # Financial tone classification "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", # Financial news "cardiffnlp/twitter-roberta-base-sentiment-latest", # Fallback ] NEWS_SENTIMENT_MODELS = [ "StephanAkkerman/FinTwitBERT-sentiment", # News sentiment "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", # Financial news "ProsusAI/finbert", # Financial news sentiment "cardiffnlp/twitter-roberta-base-sentiment-latest", # Fallback ] GENERATION_MODELS = [ "OpenC/crypto-gpt-o3-mini", # Crypto/DeFi text generation "gpt2", # General text generation fallback "distilgpt2", # Lightweight text generation ] TRADING_SIGNAL_MODELS = [ "agarkovv/CryptoTrader-LM", # BTC/ETH trading signals (buy/sell/hold) ] SUMMARIZATION_MODELS = [ "FurkanGozukara/Crypto-Financial-News-Summarizer", # Crypto/Financial news summarization "facebook/bart-large-cnn", # BART summarization "facebook/bart-large-mnli", # BART zero-shot classification "google/pegasus-xsum", # Pegasus summarization ] ZERO_SHOT_MODELS = [ "facebook/bart-large-mnli", # Zero-shot classification "typeform/distilbert-base-uncased-mnli", # DistilBERT NLI ] CLASSIFICATION_MODELS = [ "yiyanghkust/finbert-tone", # Financial tone classification "distilbert-base-uncased-finetuned-sst-2-english", # Sentiment classification ] @dataclass(frozen=True) class PipelineSpec: key: str task: str model_id: str requires_auth: bool = False category: str = "sentiment" MODEL_SPECS: Dict[str, PipelineSpec] = {} # Legacy models for lk in ["sentiment_twitter", "sentiment_financial", "summarization", "crypto_sentiment"]: if lk in HUGGINGFACE_MODELS: MODEL_SPECS[lk] = PipelineSpec( key=lk, task="sentiment-analysis" if "sentiment" in lk else "summarization", model_id=HUGGINGFACE_MODELS[lk], category="legacy" ) # Crypto sentiment - Add named keys for required models for i, mid in enumerate(CRYPTO_SENTIMENT_MODELS): key = f"crypto_sent_{i}" MODEL_SPECS[key] = PipelineSpec( key=key, task="text-classification", model_id=mid, category="sentiment_crypto", requires_auth=("ElKulako" in mid) ) # Add specific named aliases for required models MODEL_SPECS["crypto_sent_kk08"] = PipelineSpec( key="crypto_sent_kk08", task="sentiment-analysis", model_id="kk08/CryptoBERT", category="sentiment_crypto", requires_auth=False ) # Social for i, mid in enumerate(SOCIAL_SENTIMENT_MODELS): key = f"social_sent_{i}" MODEL_SPECS[key] = PipelineSpec( key=key, task="text-classification", model_id=mid, category="sentiment_social", requires_auth=("ElKulako" in mid) ) # Add specific named alias MODEL_SPECS["crypto_sent_social"] = PipelineSpec( key="crypto_sent_social", task="text-classification", model_id="ElKulako/cryptobert", category="sentiment_social", requires_auth=True ) # Financial for i, mid in enumerate(FINANCIAL_SENTIMENT_MODELS): key = f"financial_sent_{i}" MODEL_SPECS[key] = PipelineSpec( key=key, task="text-classification", model_id=mid, category="sentiment_financial" ) # Add specific named alias MODEL_SPECS["crypto_sent_fin"] = PipelineSpec( key="crypto_sent_fin", task="sentiment-analysis", model_id="StephanAkkerman/FinTwitBERT-sentiment", category="sentiment_financial", requires_auth=False ) # News for i, mid in enumerate(NEWS_SENTIMENT_MODELS): key = f"news_sent_{i}" MODEL_SPECS[key] = PipelineSpec( key=key, task="text-classification", model_id=mid, category="sentiment_news" ) # Generation models (for crypto/DeFi text generation) for i, mid in enumerate(GENERATION_MODELS): key = f"crypto_gen_{i}" MODEL_SPECS[key] = PipelineSpec( key=key, task="text-generation", model_id=mid, category="analysis_generation" ) # Add specific named alias MODEL_SPECS["crypto_ai_analyst"] = PipelineSpec( key="crypto_ai_analyst", task="text-generation", model_id="OpenC/crypto-gpt-o3-mini", category="analysis_generation", requires_auth=False ) # Trading signal models for i, mid in enumerate(TRADING_SIGNAL_MODELS): key = f"crypto_trade_{i}" MODEL_SPECS[key] = PipelineSpec( key=key, task="text-generation", model_id=mid, category="trading_signal" ) # Add specific named alias MODEL_SPECS["crypto_trading_lm"] = PipelineSpec( key="crypto_trading_lm", task="text-generation", model_id="agarkovv/CryptoTrader-LM", category="trading_signal", requires_auth=False ) # Summarization models for i, mid in enumerate(SUMMARIZATION_MODELS): MODEL_SPECS[f"summarization_{i}"] = PipelineSpec( key=f"summarization_{i}", task="summarization", model_id=mid, category="summarization" ) # Add specific named alias for BART summarization MODEL_SPECS["summarization_bart"] = PipelineSpec( key="summarization_bart", task="summarization", model_id="facebook/bart-large-cnn", category="summarization", requires_auth=False ) # Zero-shot classification models for i, mid in enumerate(ZERO_SHOT_MODELS): key = f"zero_shot_{i}" MODEL_SPECS[key] = PipelineSpec( key=key, task="zero-shot-classification", model_id=mid, category="zero_shot" ) # Add specific named alias MODEL_SPECS["zero_shot_bart"] = PipelineSpec( key="zero_shot_bart", task="zero-shot-classification", model_id="facebook/bart-large-mnli", category="zero_shot", requires_auth=False ) # Classification models for i, mid in enumerate(CLASSIFICATION_MODELS): key = f"classification_{i}" MODEL_SPECS[key] = PipelineSpec( key=key, task="text-classification", model_id=mid, category="classification" ) # Add specific named alias for FinBERT tone MODEL_SPECS["classification_finbert_tone"] = PipelineSpec( key="classification_finbert_tone", task="text-classification", model_id="yiyanghkust/finbert-tone", category="classification", requires_auth=False ) class ModelNotAvailable(RuntimeError): pass @dataclass class ModelHealthEntry: """Health tracking entry for a model""" key: str name: str status: str = "unknown" # "healthy", "degraded", "unavailable", "unknown" last_success: Optional[float] = None last_error: Optional[float] = None error_count: int = 0 success_count: int = 0 cooldown_until: Optional[float] = None last_error_message: Optional[str] = None class ModelRegistry: def __init__(self): self._pipelines = {} self._lock = threading.Lock() self._initialized = False self._failed_models = {} # Track failed models with reasons # Health tracking for self-healing self._health_registry = {} # key -> health entry def _get_or_create_health_entry(self, key: str) -> ModelHealthEntry: """Get or create health entry for a model""" if key not in self._health_registry: spec = MODEL_SPECS.get(key) self._health_registry[key] = ModelHealthEntry( key=key, name=spec.model_id if spec else key, status="unknown" ) return self._health_registry[key] def _update_health_on_success(self, key: str): """Update health registry after successful model call""" entry = self._get_or_create_health_entry(key) entry.last_success = time.time() entry.success_count += 1 # Reset error count gradually or fully on success if entry.error_count > 0: entry.error_count = max(0, entry.error_count - 1) # Recovery logic: if we have enough successes, mark as healthy if entry.success_count >= settings.health_success_recovery_count: entry.status = "healthy" entry.cooldown_until = None # Clear from failed models if present if key in self._failed_models: del self._failed_models[key] def _update_health_on_failure(self, key: str, error_msg: str): """Update health registry after failed model call""" entry = self._get_or_create_health_entry(key) entry.last_error = time.time() entry.error_count += 1 entry.last_error_message = error_msg entry.success_count = 0 # Reset success count on failure # Determine status based on error count if entry.error_count >= settings.health_error_threshold: entry.status = "unavailable" # Set cooldown period entry.cooldown_until = time.time() + settings.health_cooldown_seconds elif entry.error_count >= (settings.health_error_threshold // 2): entry.status = "degraded" else: entry.status = "healthy" def _is_in_cooldown(self, key: str) -> bool: """Check if model is in cooldown period""" if key not in self._health_registry: return False entry = self._health_registry[key] if entry.cooldown_until is None: return False return time.time() < entry.cooldown_until def attempt_model_reinit(self, key: str) -> Dict[str, Any]: """ Attempt to re-initialize a failed model after cooldown. Returns result dict with status and message. """ if key not in MODEL_SPECS: return {"status": "error", "message": f"Unknown model key: {key}"} entry = self._get_or_create_health_entry(key) # Check if enough time has passed since last error if entry.last_error: time_since_error = time.time() - entry.last_error if time_since_error < settings.health_reinit_cooldown_seconds: return { "status": "cooldown", "message": f"Model in cooldown, wait {int(settings.health_reinit_cooldown_seconds - time_since_error)}s", "cooldown_remaining": int(settings.health_reinit_cooldown_seconds - time_since_error) } # Try to reinitialize with self._lock: # Remove from failed models and pipelines to force reload if key in self._failed_models: del self._failed_models[key] if key in self._pipelines: del self._pipelines[key] # Reset health entry entry.error_count = 0 entry.status = "unknown" entry.cooldown_until = None try: # Attempt to load pipe = self.get_pipeline(key) return { "status": "success", "message": f"Model {key} successfully reinitialized", "model": MODEL_SPECS[key].model_id } except Exception as e: return { "status": "failed", "message": f"Reinitialization failed: {str(e)[:200]}", "error": str(e)[:200] } def get_model_health_registry(self) -> List[Dict[str, Any]]: """Get health registry for all models""" result = [] for key, entry in self._health_registry.items(): spec = MODEL_SPECS.get(key) result.append({ "key": entry.key, "name": entry.name, "model_id": spec.model_id if spec else entry.name, "category": spec.category if spec else "unknown", "status": entry.status, "last_success": entry.last_success, "last_error": entry.last_error, "error_count": entry.error_count, "success_count": entry.success_count, "cooldown_until": entry.cooldown_until, "in_cooldown": self._is_in_cooldown(key), "last_error_message": entry.last_error_message, "loaded": key in self._pipelines }) # Add models that exist in specs but not in health registry for key, spec in MODEL_SPECS.items(): if key not in self._health_registry: result.append({ "key": key, "name": spec.model_id, "model_id": spec.model_id, "category": spec.category, "status": "unknown", "last_success": None, "last_error": None, "error_count": 0, "success_count": 0, "cooldown_until": None, "in_cooldown": False, "last_error_message": None, "loaded": key in self._pipelines }) return result def _should_use_token(self, spec: PipelineSpec) -> Optional[str]: """Determine if and which token to use for model loading""" if HF_MODE == "off": return None # In public mode, try to use token if available (for better rate limits) if HF_MODE == "public": # Use token if available to avoid rate limiting return HF_TOKEN_ENV if HF_TOKEN_ENV else None # In auth mode, always use token if available if HF_MODE == "auth": if HF_TOKEN_ENV: return HF_TOKEN_ENV else: logger.warning(f"Model {spec.model_id} - auth mode but no token available") return None return None def get_pipeline(self, key: str): """Get pipeline for a model key, with robust error handling and health tracking""" if HF_MODE == "off": raise ModelNotAvailable("HF_MODE=off") if not TRANSFORMERS_AVAILABLE: raise ModelNotAvailable("transformers not installed") if key not in MODEL_SPECS: # Provide helpful error with available keys available_keys = list(MODEL_SPECS.keys())[:20] # Show first 20 similar_keys = [k for k in MODEL_SPECS.keys() if key.lower() in k.lower() or k.lower() in key.lower()][:5] error_msg = f"Unknown model key: '{key}'. " if similar_keys: error_msg += f"Did you mean: {', '.join(similar_keys)}? " error_msg += f"Available keys: {len(MODEL_SPECS)} total. " if len(available_keys) < len(MODEL_SPECS): error_msg += f"Sample: {', '.join(available_keys[:10])}..." else: error_msg += f"Keys: {', '.join(available_keys)}" raise ModelNotAvailable(error_msg) spec = MODEL_SPECS[key] # Check if model is in cooldown if self._is_in_cooldown(key): entry = self._health_registry[key] cooldown_remaining = int(entry.cooldown_until - time.time()) raise ModelNotAvailable(f"Model in cooldown for {cooldown_remaining}s: {entry.last_error_message or 'previous failures'}") # Return cached pipeline if available if key in self._pipelines: return self._pipelines[key] # Check if this model already failed if key in self._failed_models: raise ModelNotAvailable(f"Model failed previously: {self._failed_models[key]}") with self._lock: # Double-check after acquiring lock if key in self._pipelines: return self._pipelines[key] if key in self._failed_models: raise ModelNotAvailable(f"Model failed previously: {self._failed_models[key]}") # Determine token usage auth_token = self._should_use_token(spec) logger.info(f"Loading model: {spec.model_id} (mode={HF_MODE}, auth={'yes' if auth_token else 'no'})") # Log token status for debugging if spec.requires_auth and not auth_token: logger.warning(f"Model {spec.model_id} requires auth but no token provided") try: # Use token parameter instead of deprecated use_auth_token pipeline_kwargs = { "task": spec.task, "model": spec.model_id, } # Add device configuration (GPU detection) if ENV_DETECTOR_AVAILABLE: device = get_device() if device == "cuda": pipeline_kwargs["device"] = 0 # Use first GPU logger.info(f"Loading {spec.model_id} on GPU") else: pipeline_kwargs["device"] = -1 # Use CPU logger.info(f"Loading {spec.model_id} on CPU") else: # Fallback: try to detect GPU manually try: import torch if torch.cuda.is_available(): pipeline_kwargs["device"] = 0 logger.info(f"Loading {spec.model_id} on GPU (fallback detection)") else: pipeline_kwargs["device"] = -1 except: pipeline_kwargs["device"] = -1 # CPU fallback # Only add token if we have one and it's needed if auth_token: pipeline_kwargs["token"] = auth_token logger.debug(f"Using authentication token for {spec.model_id}") elif spec.requires_auth: # Try with HF_TOKEN_ENV if available even if not explicitly required if HF_TOKEN_ENV: pipeline_kwargs["token"] = HF_TOKEN_ENV logger.info(f"Using HF_TOKEN_ENV for {spec.model_id} (requires_auth=True)") else: logger.warning(f"No token available for model {spec.model_id} that requires auth") else: # Explicitly set to None to avoid using expired tokens pipeline_kwargs["token"] = None self._pipelines[key] = pipeline(**pipeline_kwargs) logger.info(f"✅ Successfully loaded model: {spec.model_id}") # Update health on successful load self._update_health_on_success(key) return self._pipelines[key] except RepositoryNotFoundError as e: error_msg = f"Repository not found: {spec.model_id} - Model may not exist on Hugging Face Hub" logger.warning(f"{error_msg} - {str(e)}") logger.info(f"💡 Tip: Verify model exists at https://huggingface.co/{spec.model_id}") self._failed_models[key] = error_msg raise ModelNotAvailable(error_msg) from e except OSError as e: # Handle "not a valid model identifier" errors error_str = str(e) if "not a local folder" in error_str and "not a valid model identifier" in error_str: error_msg = f"Model identifier invalid: {spec.model_id} - May not exist or requires authentication" logger.warning(f"{error_msg}") if spec.requires_auth and not auth_token and not HF_TOKEN_ENV: logger.info(f"💡 Tip: This model may require HF_TOKEN. Set HF_TOKEN environment variable.") logger.info(f"💡 Tip: Check if model exists at https://huggingface.co/{spec.model_id}") else: error_msg = f"OSError loading {spec.model_id}: {str(e)[:200]}" logger.warning(error_msg) self._failed_models[key] = error_msg raise ModelNotAvailable(error_msg) from e except Exception as e: error_type = type(e).__name__ error_msg = f"{error_type}: {str(e)[:100]}" # Check for HTTP errors (401, 403, 404) if REQUESTS_AVAILABLE and isinstance(e, requests.exceptions.HTTPError): status_code = getattr(e.response, 'status_code', None) if status_code == 401: error_msg = f"Authentication failed (401) for {spec.model_id}" elif status_code == 403: error_msg = f"Access forbidden (403) for {spec.model_id}" elif status_code == 404: error_msg = f"Model not found (404): {spec.model_id}" # Check for OSError from transformers if isinstance(e, OSError): if "not a valid model identifier" in str(e): # For linked models in HF Space, skip validation error if spec.model_id in LINKED_MODEL_IDS: logger.info(f"Linked model {spec.model_id} - trying without validation check") # Don't mark as failed yet, it might work pass else: error_msg = f"Invalid model identifier: {spec.model_id}" elif "401" in str(e) or "403" in str(e): error_msg = f"Authentication required for {spec.model_id}" else: error_msg = f"OS Error loading {spec.model_id}: {str(e)[:100]}" logger.warning(f"Failed to load {spec.model_id}: {error_msg}") self._failed_models[key] = error_msg # Update health on failure self._update_health_on_failure(key, error_msg) raise ModelNotAvailable(error_msg) from e return self._pipelines[key] def call_model_safe(self, key: str, text: str, **kwargs) -> Dict[str, Any]: """ Safely call a model with health tracking. Returns result dict with status and data or error. """ try: pipe = self.get_pipeline(key) result = pipe(text[:512], **kwargs) # Update health on successful call self._update_health_on_success(key) return { "status": "success", "data": result, "model_key": key, "model_id": MODEL_SPECS[key].model_id if key in MODEL_SPECS else key } except ModelNotAvailable as e: # Don't update health here, already updated in get_pipeline return { "status": "unavailable", "error": str(e), "model_key": key } except Exception as e: error_msg = f"{type(e).__name__}: {str(e)[:200]}" logger.warning(f"Model call failed for {key}: {error_msg}") # Update health on call failure self._update_health_on_failure(key, error_msg) return { "status": "error", "error": error_msg, "model_key": key } def get_registry_status(self) -> Dict[str, Any]: """Get detailed registry status with all models""" items = [] for key, spec in MODEL_SPECS.items(): loaded = key in self._pipelines error = self._failed_models.get(key) if key in self._failed_models else None items.append({ "key": key, "name": spec.model_id, "task": spec.task, "category": spec.category, "loaded": loaded, "error": error, "requires_auth": spec.requires_auth }) return { "models_total": len(MODEL_SPECS), "models_loaded": len(self._pipelines), "models_failed": len(self._failed_models), "items": items, "hf_mode": HF_MODE, "transformers_available": TRANSFORMERS_AVAILABLE, "initialized": self._initialized } def initialize_models(self, force_reload: bool = False, max_models: int = None): """Initialize models with fallback logic - tries primary models first Args: force_reload: If True, reinitialize even if already initialized max_models: Maximum number of models to load (None = load all available) """ if self._initialized and not force_reload: return { "status": "already_initialized", "mode": HF_MODE, "models_loaded": len(self._pipelines), "failed_count": len(self._failed_models), "total_specs": len(MODEL_SPECS) } # Reset if forcing reload if force_reload: logger.info("Force reload requested - resetting initialization state") self._initialized = False # Don't clear pipelines - keep already loaded models if HF_MODE == "off": logger.info("HF_MODE=off, using fallback-only mode") self._initialized = True return { "status": "fallback_only", "mode": HF_MODE, "models_loaded": 0, "error": "HF_MODE=off - using lexical fallback", "total_specs": len(MODEL_SPECS) } if not TRANSFORMERS_AVAILABLE: logger.warning("Transformers not available, using fallback-only mode") self._initialized = True return { "status": "fallback_only", "mode": HF_MODE, "models_loaded": 0, "error": "transformers library not installed - using lexical fallback", "total_specs": len(MODEL_SPECS) } logger.info(f"Starting model initialization (HF_MODE={HF_MODE}, TRANSFORMERS_AVAILABLE={TRANSFORMERS_AVAILABLE})") logger.info(f"Total models in catalog: {len(MODEL_SPECS)}") logger.info(f"HF_TOKEN available: {bool(HF_TOKEN_ENV)}") loaded, failed = [], [] # Try to load at least one model from each category with expanded fallback categories_to_try = { "crypto": ["crypto_sent_0", "crypto_sent_1", "crypto_sent_kk08", "crypto_sent_2"], "financial": ["financial_sent_0", "financial_sent_1", "crypto_sent_fin"], "social": ["social_sent_0", "social_sent_1", "crypto_sent_social"], "news": ["news_sent_0", "news_sent_1", "financial_sent_0"] # Financial models can analyze news } # If max_models is set, try to load more models from each category models_per_category = 1 if max_models is None else max(1, max_models // len(categories_to_try)) for category, keys in categories_to_try.items(): category_loaded = False models_loaded_in_category = 0 logger.info(f"[{category}] Attempting to load models from category...") for key in keys: if max_models and len(loaded) >= max_models: logger.info(f"Reached max_models limit ({max_models}), stopping") break if models_loaded_in_category >= models_per_category: logger.debug(f"[{category}] Already loaded {models_loaded_in_category} model(s), moving to next category") break if key not in MODEL_SPECS: logger.debug(f"[{category}] Model key '{key}' not in MODEL_SPECS, trying alternatives...") # Try to find alternative key in same category alt_keys = [k for k in MODEL_SPECS.keys() if (k.startswith(f"{category.split('_')[0]}_sent_") or MODEL_SPECS[k].category == f"sentiment_{category.split('_')[0]}")] if alt_keys: logger.debug(f"[{category}] Found {len(alt_keys)} alternative keys, adding to queue") keys.extend(alt_keys[:2]) # Add 2 alternatives continue spec = MODEL_SPECS[key] logger.info(f"[{category}] Attempting to load model: {key} ({spec.model_id})") try: pipeline = self.get_pipeline(key) loaded.append(key) models_loaded_in_category += 1 category_loaded = True logger.info(f"[{category}] ✅ Successfully loaded model: {key} ({spec.model_id})") # If we've loaded one from this category and max_models is None, move to next category if max_models is None: break except ModelNotAvailable as e: error_msg = str(e)[:200] # Allow longer error messages logger.warning(f"[{category}] ⚠️ Model {key} not available: {error_msg}") failed.append((key, error_msg)) # Continue to next key in fallback chain continue except Exception as e: error_msg = f"{type(e).__name__}: {str(e)[:200]}" logger.error(f"[{category}] ❌ Model {key} initialization error: {error_msg}", exc_info=True) failed.append((key, error_msg)) # Continue to next key in fallback chain continue if category_loaded: logger.info(f"[{category}] Category initialization complete: {models_loaded_in_category} model(s) loaded") else: logger.warning(f"[{category}] ⚠️ No models loaded from this category") # Determine status - be more lenient if len(loaded) > 0: status = "ok" logger.info(f"✅ Model initialization complete: {len(loaded)} model(s) loaded successfully") else: # No models loaded, but that's OK - we have fallback logger.warning("⚠️ No HF models loaded, using fallback-only mode") status = "fallback_only" self._initialized = True result = { "status": status, "mode": HF_MODE, "models_loaded": len(loaded), "models_failed": len(failed), "loaded": loaded[:20], # Show more loaded models "failed": failed[:20], # Show more failed models "failed_count": len(self._failed_models), "total_available_keys": len(MODEL_SPECS), "available_keys_sample": list(MODEL_SPECS.keys())[:30], "transformers_available": TRANSFORMERS_AVAILABLE, "hf_token_available": bool(HF_TOKEN_ENV), "note": "Fallback lexical analysis available" if len(loaded) == 0 else None } # Add initialization error summary if any if len(failed) > 0: result["initialization_errors"] = { "total": len(failed), "summary": f"{len(failed)} model(s) failed to initialize", "details": failed[:10] # Show first 10 errors for debugging } if len(loaded) == 0: result["error"] = "No models could be initialized. Check model IDs, HF_TOKEN, or network connectivity." result["debugging_tips"] = [ "Verify HF_TOKEN is set in environment variables", "Check if models exist on Hugging Face Hub", "Verify network connectivity to huggingface.co", "Check transformers library is installed: pip install transformers", "Review logs for specific error messages" ] logger.info(f"Model initialization summary: {result['status']}, loaded={result['models_loaded']}, failed={result['models_failed']}, total_specs={result['total_available_keys']}") return result _registry = ModelRegistry() def initialize_models(force_reload: bool = False, max_models: int = None): """Initialize models with optional parameters Args: force_reload: If True, reinitialize even if already initialized max_models: Maximum number of models to load (None = load one per category) """ return _registry.initialize_models(force_reload=force_reload, max_models=max_models) def get_model_health_registry() -> List[Dict[str, Any]]: """Get health registry for all models""" return _registry.get_model_health_registry() def attempt_model_reinit(model_key: str) -> Dict[str, Any]: """Attempt to re-initialize a failed model""" return _registry.attempt_model_reinit(model_key) def call_model_safe(model_key: str, text: str, **kwargs) -> Dict[str, Any]: """Safely call a model with health tracking""" return _registry.call_model_safe(model_key, text, **kwargs) def ensemble_crypto_sentiment(text: str) -> Dict[str, Any]: """Ensemble crypto sentiment with fallback model selection""" if not TRANSFORMERS_AVAILABLE: logger.warning("Transformers not available, using fallback") return basic_sentiment_fallback(text) if HF_MODE == "off": logger.warning("HF_MODE=off, using fallback") return basic_sentiment_fallback(text) results, labels_count, total_conf = {}, {"bullish": 0, "bearish": 0, "neutral": 0}, 0.0 # Try models in order with expanded fallback chain # Primary candidates candidate_keys = ["crypto_sent_0", "crypto_sent_1", "crypto_sent_2"] # Fallback: try named aliases fallback_keys = ["crypto_sent_kk08", "crypto_sent_social"] # Last resort: try any crypto sentiment model all_crypto_keys = [k for k in MODEL_SPECS.keys() if k.startswith("crypto_sent_") or MODEL_SPECS[k].category == "sentiment_crypto"] # Combine all candidate keys all_candidates = candidate_keys + fallback_keys + [k for k in all_crypto_keys if k not in candidate_keys and k not in fallback_keys][:5] for key in all_candidates: if key not in MODEL_SPECS: continue try: pipe = _registry.get_pipeline(key) res = pipe(text[:512]) if isinstance(res, list) and res: res = res[0] label = res.get("label", "NEUTRAL").upper() score = res.get("score", 0.5) # Map labels to our standard format mapped = "bullish" if "POSITIVE" in label or "BULLISH" in label or "LABEL_2" in label else ( "bearish" if "NEGATIVE" in label or "BEARISH" in label or "LABEL_0" in label else "neutral" ) spec = MODEL_SPECS[key] results[spec.model_id] = {"label": mapped, "score": score} labels_count[mapped] += 1 total_conf += score # If we got at least one result, we can proceed if len(results) >= 1: break # Got at least one working model except ModelNotAvailable: continue # Try next model except Exception as e: logger.warning(f"Ensemble failed for {key}: {str(e)[:100]}") continue if not results: logger.warning("No HF models available, using fallback") return basic_sentiment_fallback(text) final = max(labels_count, key=labels_count.get) avg_conf = total_conf / len(results) return { "label": final, "confidence": avg_conf, "scores": results, "model_count": len(results), "available": True, "engine": "huggingface" } def analyze_crypto_sentiment(text: str): return ensemble_crypto_sentiment(text) def analyze_financial_sentiment(text: str): """Analyze financial sentiment with fallback""" if not TRANSFORMERS_AVAILABLE: logger.warning("Transformers not available, using fallback") return basic_sentiment_fallback(text) if HF_MODE == "off": logger.warning("HF_MODE=off, using fallback") return basic_sentiment_fallback(text) # Try models in order with expanded fallback primary_keys = ["financial_sent_0", "financial_sent_1"] fallback_keys = ["crypto_sent_fin"] # Try any financial sentiment model as last resort all_financial_keys = [k for k in MODEL_SPECS.keys() if k.startswith("financial_sent_") or MODEL_SPECS[k].category == "sentiment_financial"] all_candidates = primary_keys + fallback_keys + [k for k in all_financial_keys if k not in primary_keys and k not in fallback_keys][:3] for key in all_candidates: if key not in MODEL_SPECS: continue try: pipe = _registry.get_pipeline(key) res = pipe(text[:512]) if isinstance(res, list) and res: res = res[0] label = res.get("label", "neutral").upper() score = res.get("score", 0.5) # Map to standard format mapped = "bullish" if "POSITIVE" in label or "LABEL_2" in label else ( "bearish" if "NEGATIVE" in label or "LABEL_0" in label else "neutral" ) return {"label": mapped, "score": score, "confidence": score, "available": True, "engine": "huggingface", "model": MODEL_SPECS[key].model_id} except ModelNotAvailable: continue except Exception as e: logger.warning(f"Financial sentiment failed for {key}: {str(e)[:100]}") continue logger.warning("No HF financial models available, using fallback") return basic_sentiment_fallback(text) def analyze_social_sentiment(text: str): """Analyze social sentiment with fallback""" if not TRANSFORMERS_AVAILABLE: logger.warning("Transformers not available, using fallback") return basic_sentiment_fallback(text) if HF_MODE == "off": logger.warning("HF_MODE=off, using fallback") return basic_sentiment_fallback(text) # Try models in order with expanded fallback primary_keys = ["social_sent_0", "social_sent_1"] fallback_keys = ["crypto_sent_social"] # Try any social sentiment model as last resort all_social_keys = [k for k in MODEL_SPECS.keys() if k.startswith("social_sent_") or MODEL_SPECS[k].category == "sentiment_social"] all_candidates = primary_keys + fallback_keys + [k for k in all_social_keys if k not in primary_keys and k not in fallback_keys][:3] for key in all_candidates: if key not in MODEL_SPECS: continue try: pipe = _registry.get_pipeline(key) res = pipe(text[:512]) if isinstance(res, list) and res: res = res[0] label = res.get("label", "neutral").upper() score = res.get("score", 0.5) # Map to standard format mapped = "bullish" if "POSITIVE" in label or "LABEL_2" in label else ( "bearish" if "NEGATIVE" in label or "LABEL_0" in label else "neutral" ) return {"label": mapped, "score": score, "confidence": score, "available": True, "engine": "huggingface", "model": MODEL_SPECS[key].model_id} except ModelNotAvailable: continue except Exception as e: logger.warning(f"Social sentiment failed for {key}: {str(e)[:100]}") continue logger.warning("No HF social models available, using fallback") return basic_sentiment_fallback(text) def analyze_market_text(text: str): return ensemble_crypto_sentiment(text) def analyze_chart_points(data: Sequence[Mapping[str, Any]], indicators: Optional[List[str]] = None): if not data: return {"trend": "neutral", "strength": 0, "analysis": "No data"} prices = [float(p.get("price", 0)) for p in data if p.get("price")] if not prices: return {"trend": "neutral", "strength": 0, "analysis": "No price data"} first, last = prices[0], prices[-1] change = ((last - first) / first * 100) if first > 0 else 0 if change > 5: trend, strength = "bullish", min(abs(change) / 10, 1.0) elif change < -5: trend, strength = "bearish", min(abs(change) / 10, 1.0) else: trend, strength = "neutral", abs(change) / 5 return {"trend": trend, "strength": strength, "change_pct": change, "support": min(prices), "resistance": max(prices), "analysis": f"Price moved {change:.2f}% showing {trend} trend"} def analyze_news_item(item: Dict[str, Any]): text = item.get("title", "") + " " + item.get("description", "") sent = ensemble_crypto_sentiment(text) return {**item, "sentiment": sent["label"], "sentiment_confidence": sent["confidence"], "sentiment_details": sent} def get_model_info(): return { "transformers_available": TRANSFORMERS_AVAILABLE, "hf_auth_configured": bool(settings.hf_token), "models_initialized": _registry._initialized, "models_loaded": len(_registry._pipelines), "model_catalog": { "crypto_sentiment": CRYPTO_SENTIMENT_MODELS, "social_sentiment": SOCIAL_SENTIMENT_MODELS, "financial_sentiment": FINANCIAL_SENTIMENT_MODELS, "news_sentiment": NEWS_SENTIMENT_MODELS, "generation": GENERATION_MODELS, "trading_signals": TRADING_SIGNAL_MODELS, "summarization": SUMMARIZATION_MODELS, "zero_shot": ZERO_SHOT_MODELS, "classification": CLASSIFICATION_MODELS }, "total_models": len(MODEL_SPECS), "total_categories": 9 } def basic_sentiment_fallback(text: str) -> Dict[str, Any]: """ Simple lexical-based sentiment fallback that doesn't require transformers. Returns sentiment based on keyword matching. """ text_lower = text.lower() # Define keyword lists bullish_words = ["bullish", "rally", "surge", "pump", "breakout", "skyrocket", "uptrend", "buy", "accumulation", "moon", "gain", "profit", "up", "high", "rise", "growth", "positive", "strong"] bearish_words = ["bearish", "dump", "crash", "selloff", "downtrend", "collapse", "sell", "capitulation", "panic", "fear", "drop", "loss", "down", "low", "fall", "decline", "negative", "weak"] # Count matches bullish_count = sum(1 for word in bullish_words if word in text_lower) bearish_count = sum(1 for word in bearish_words if word in text_lower) # Determine sentiment if bullish_count == 0 and bearish_count == 0: label = "neutral" confidence = 0.5 bullish_score = 0.0 bearish_score = 0.0 neutral_score = 1.0 elif bullish_count > bearish_count: label = "bullish" diff = bullish_count - bearish_count confidence = min(0.6 + (diff * 0.05), 0.9) bullish_score = confidence bearish_score = 0.0 neutral_score = 0.0 else: # bearish_count > bullish_count label = "bearish" diff = bearish_count - bullish_count confidence = min(0.6 + (diff * 0.05), 0.9) bearish_score = confidence bullish_score = 0.0 neutral_score = 0.0 return { "label": label, "confidence": confidence, "score": confidence, "scores": { "bullish": round(bullish_score, 3), "bearish": round(bearish_score, 3), "neutral": round(neutral_score, 3) }, "available": True, # Set to True so frontend renders it "engine": "fallback_lexical", "keyword_matches": { "bullish": bullish_count, "bearish": bearish_count } } def list_available_model_keys() -> Dict[str, Any]: """List all available model keys with their details""" return { "total_keys": len(MODEL_SPECS), "keys": list(MODEL_SPECS.keys()), "by_category": { category: [key for key, spec in MODEL_SPECS.items() if spec.category == category] for category in set(spec.category for spec in MODEL_SPECS.values()) }, "details": { key: { "model_id": spec.model_id, "task": spec.task, "category": spec.category, "requires_auth": spec.requires_auth } for key, spec in MODEL_SPECS.items() } } def registry_status(): """Get registry status with detailed information""" status = { "ok": HF_MODE != "off" and TRANSFORMERS_AVAILABLE and len(_registry._pipelines) > 0, "initialized": _registry._initialized, "pipelines_loaded": len(_registry._pipelines), "pipelines_failed": len(_registry._failed_models), "available_models": list(_registry._pipelines.keys()), "failed_models": list(_registry._failed_models.keys())[:10], # Limit for brevity "transformers_available": TRANSFORMERS_AVAILABLE, "hf_mode": HF_MODE, "total_specs": len(MODEL_SPECS), "all_model_keys": list(MODEL_SPECS.keys())[:50] # Include sample of all keys } if HF_MODE == "off": status["error"] = "HF_MODE=off" elif not TRANSFORMERS_AVAILABLE: status["error"] = "transformers not installed" elif len(_registry._pipelines) == 0 and _registry._initialized: status["error"] = "No models loaded successfully" return status # ==================== GAP FILLING SERVICE ==================== class GapFillingService: """ Uses AI models to fill missing data gaps Combines interpolation, ML predictions, and external provider fallback """ def __init__(self, model_registry: Optional[ModelRegistry] = None): self.model_registry = model_registry or _registry self.gap_fill_attempts = {} # Track gap filling attempts async def fill_missing_ohlc( self, symbol: str, existing_data: List[Dict[str, Any]], missing_timestamps: List[int] ) -> Dict[str, Any]: """ Synthesize missing OHLC candles using interpolation + ML Args: symbol: Trading pair symbol (e.g., "BTCUSDT") existing_data: List of existing OHLC data points missing_timestamps: List of timestamps with missing data Returns: Dictionary with filled data and metadata """ try: if not existing_data or not missing_timestamps: return { "status": "error", "message": "Insufficient data for gap filling", "filled_count": 0, "fallback": True } # Validate data structure if not isinstance(existing_data, list) or not isinstance(missing_timestamps, list): return { "status": "error", "message": "Invalid data types for gap filling", "filled_count": 0, "fallback": True } filled_data = [] confidence_scores = [] # Sort existing data by timestamp try: existing_data.sort(key=lambda x: x.get("timestamp", 0)) except (TypeError, AttributeError) as e: logger.warning(f"Error sorting existing_data: {e}, using fallback") # Fallback: use first and last if sorting fails if len(existing_data) >= 2: existing_data = [existing_data[0], existing_data[-1]] else: return { "status": "error", "message": "Cannot sort existing data", "filled_count": 0, "fallback": True } for missing_ts in missing_timestamps: try: # Find surrounding data points before = [d for d in existing_data if d.get("timestamp", 0) < missing_ts] after = [d for d in existing_data if d.get("timestamp", 0) > missing_ts] if before and after: # Linear interpolation between surrounding points prev_point = before[-1] next_point = after[0] # Validate point structure if not all(k in prev_point for k in ["timestamp", "close"]) or \ not all(k in next_point for k in ["timestamp", "open", "close"]): logger.warning(f"Invalid data point structure, skipping timestamp {missing_ts}") continue # Calculate interpolation factor time_diff = next_point["timestamp"] - prev_point["timestamp"] position = (missing_ts - prev_point["timestamp"]) / time_diff if time_diff > 0 else 0.5 # Interpolate OHLC values with safe defaults prev_close = prev_point.get("close", prev_point.get("price", 0)) next_open = next_point.get("open", next_point.get("close", prev_close)) next_close = next_point.get("close", next_open) interpolated = { "timestamp": missing_ts, "open": prev_close * (1 - position) + next_open * position, "high": max(prev_point.get("high", prev_close), next_point.get("high", next_close)) * (0.98 + position * 0.04), "low": min(prev_point.get("low", prev_close), next_point.get("low", next_close)) * (1.02 - position * 0.04), "close": prev_close * (1 - position) + next_close * position, "volume": (prev_point.get("volume", 0) + next_point.get("volume", 0)) / 2, "is_synthetic": True, "method": "linear_interpolation" } # Calculate confidence based on distance confidence = 0.95 ** (len(missing_timestamps)) # Decay with gap size confidence_scores.append(confidence) interpolated["confidence"] = confidence filled_data.append(interpolated) elif before: # Only before data - use last known value prev_point = before[-1] filled_data.append({ "timestamp": missing_ts, "open": prev_point.get("close", prev_point.get("price", 0)), "high": prev_point.get("high", prev_point.get("close", 0)), "low": prev_point.get("low", prev_point.get("close", 0)), "close": prev_point.get("close", prev_point.get("price", 0)), "volume": prev_point.get("volume", 0), "is_synthetic": True, "method": "last_known_value", "confidence": 0.70 }) confidence_scores.append(0.70) elif after: # Only after data - use first known value next_point = after[0] filled_data.append({ "timestamp": missing_ts, "open": next_point.get("open", next_point.get("price", 0)), "high": next_point.get("high", next_point.get("open", 0)), "low": next_point.get("low", next_point.get("open", 0)), "close": next_point.get("open", next_point.get("price", 0)), "volume": next_point.get("volume", 0), "is_synthetic": True, "method": "first_known_value", "confidence": 0.70 }) confidence_scores.append(0.70) except Exception as e: logger.warning(f"Error filling timestamp {missing_ts}: {e}") continue return { "status": "success", "symbol": symbol, "filled_count": len(filled_data), "filled_data": filled_data, "average_confidence": sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0, "method": "interpolation", "metadata": { "existing_points": len(existing_data), "missing_points": len(missing_timestamps), "fill_rate": len(filled_data) / len(missing_timestamps) if missing_timestamps else 0 } } except Exception as e: logger.error(f"Gap filling failed for {symbol}: {e}", exc_info=True) return { "status": "error", "message": f"Gap filling failed: {str(e)[:200]}", "filled_count": 0, "fallback": True, "error": str(e)[:200] } async def estimate_orderbook_depth( self, symbol: str, mid_price: float, depth_levels: int = 10 ) -> Dict[str, Any]: """ Generate estimated order book when real data unavailable Uses statistical models + market patterns """ try: if mid_price <= 0: return { "status": "error", "error": "Invalid mid_price", "fallback": True } # Validate depth_levels if depth_levels <= 0 or depth_levels > 50: depth_levels = 10 # Default fallback # Generate synthetic orderbook with realistic spread spread_pct = 0.001 # 0.1% spread level_spacing = 0.0005 # 0.05% per level bids = [] asks = [] for i in range(depth_levels): try: # Bids (buy orders) below mid price bid_price = mid_price * (1 - spread_pct / 2 - i * level_spacing) bid_volume = 1.0 / (i + 1) * 10 # Decreasing volume with depth # Validate calculated values if bid_price <= 0 or not isinstance(bid_price, (int, float)): continue bids.append({ "price": round(bid_price, 8), "volume": round(bid_volume, 4), "is_synthetic": True }) # Asks (sell orders) above mid price ask_price = mid_price * (1 + spread_pct / 2 + i * level_spacing) ask_volume = 1.0 / (i + 1) * 10 # Validate calculated values if ask_price <= 0 or not isinstance(ask_price, (int, float)): continue asks.append({ "price": round(ask_price, 8), "volume": round(ask_volume, 4), "is_synthetic": True }) except Exception as e: logger.warning(f"Error generating orderbook level {i}: {e}") continue # Ensure we have at least some data if not bids or not asks: # Fallback: create minimal orderbook bids = [{"price": round(mid_price * 0.999, 8), "volume": 1.0, "is_synthetic": True}] asks = [{"price": round(mid_price * 1.001, 8), "volume": 1.0, "is_synthetic": True}] return { "status": "success", "symbol": symbol, "mid_price": mid_price, "bids": bids, "asks": asks, "is_synthetic": True, "confidence": 0.65, # Lower confidence for synthetic data "method": "statistical_estimation", "metadata": { "spread_pct": spread_pct, "depth_levels": depth_levels, "total_bid_volume": sum(b["volume"] for b in bids), "total_ask_volume": sum(a["volume"] for a in asks) } } except Exception as e: logger.error(f"Orderbook estimation failed for {symbol}: {e}", exc_info=True) return { "status": "error", "error": f"Orderbook estimation failed: {str(e)[:200]}", "symbol": symbol, "fallback": True } async def synthesize_whale_data( self, chain: str, token: str, historical_pattern: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Infer whale movements from partial data Uses on-chain analysis patterns """ try: # Validate inputs if not chain or not token: return { "status": "error", "error": "Invalid chain or token", "fallback": True } # Placeholder for whale data synthesis # In production, this would use ML models trained on historical whale patterns synthetic_movements = [] # Generate synthetic whale movement based on typical patterns if historical_pattern: # Use historical patterns to generate realistic movements avg_movement = historical_pattern.get("avg_movement_size", 1000000) frequency = historical_pattern.get("frequency_per_day", 5) # Validate values if not isinstance(avg_movement, (int, float)) or avg_movement <= 0: avg_movement = 1000000 if not isinstance(frequency, int) or frequency <= 0: frequency = 5 else: # Default patterns avg_movement = 1000000 frequency = 5 # Limit frequency to prevent excessive data frequency = min(frequency, 10) for i in range(frequency): try: movement = { "timestamp": int(time.time()) - (i * 3600), "from_address": f"0x{'0'*(40-len(str(i)))}{i}", "to_address": "0x" + "0" * 40, "amount": avg_movement * (0.8 + random.random() * 0.4), "token": token, "chain": chain, "is_synthetic": True, "confidence": 0.55 } synthetic_movements.append(movement) except Exception as e: logger.warning(f"Error generating whale movement {i}: {e}") continue # Ensure we have at least some data if not synthetic_movements: # Fallback: create one minimal movement synthetic_movements = [{ "timestamp": int(time.time()), "from_address": "0x" + "0" * 40, "to_address": "0x" + "0" * 40, "amount": avg_movement, "token": token, "chain": chain, "is_synthetic": True, "confidence": 0.50 }] return { "status": "success", "chain": chain, "token": token, "movements": synthetic_movements, "is_synthetic": True, "confidence": 0.55, "method": "pattern_based_synthesis", "metadata": { "movement_count": len(synthetic_movements), "total_volume": sum(m["amount"] for m in synthetic_movements) } } except Exception as e: logger.error(f"Whale data synthesis failed for {chain}/{token}: {e}", exc_info=True) return { "status": "error", "error": f"Whale data synthesis failed: {str(e)[:200]}", "chain": chain, "token": token, "fallback": True } async def analyze_trading_signal( self, symbol: str, market_data: Dict[str, Any], sentiment_data: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Generate trading signal using AI models Combines price action, volume, and sentiment analysis """ # Use trading signal model if available - try multiple models trading_model_keys = ["crypto_trading_lm", "crypto_trade_0"] for model_key in trading_model_keys: try: if model_key in MODEL_SPECS: # Prepare input text for model text = f"Analyze {symbol}: " if market_data: price = market_data.get("price", 0) change = market_data.get("percent_change_24h", 0) volume = market_data.get("volume_24h", 0) text += f"Price ${price:.2f}, Change {change:+.2f}%, Volume ${volume:,.0f}" if sentiment_data: sentiment = sentiment_data.get("label", "neutral") text += f", Sentiment: {sentiment}" # Call model result = self.model_registry.call_model_safe(model_key, text) if result["status"] == "success": # Parse model output model_output = result.get("data", {}) return { "status": "success", "symbol": symbol, "signal": "hold", # Default "confidence": 0.70, "reasoning": model_output, "is_ai_generated": True, "model_used": model_key } except Exception as e: logger.warning(f"Error in trading signal analysis with {model_key}: {e}") continue # Try next model # Fallback to rule-based signal signal = "hold" confidence = 0.60 if market_data: change = market_data.get("percent_change_24h", 0) volume_change = market_data.get("volume_change_24h", 0) # Simple rules if change > 5 and volume_change > 20: signal = "buy" confidence = 0.75 elif change < -5 and volume_change > 20: signal = "sell" confidence = 0.75 return { "status": "success", "symbol": symbol, "signal": signal, "confidence": confidence, "reasoning": "Rule-based analysis", "is_ai_generated": False, "method": "fallback_rules" } # Global gap filling service instance _gap_filler = GapFillingService() def get_gap_filler() -> GapFillingService: """Get global gap filling service instance""" return _gap_filler