Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
raw
history blame
67.6 kB
#!/usr/bin/env python3
"""Centralized access to Hugging Face models with ensemble sentiment."""
from __future__ import annotations
import logging
import os
import random
import threading
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional, Sequence
from config import HUGGINGFACE_MODELS, get_settings
try:
from transformers import pipeline
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
try:
from huggingface_hub.errors import RepositoryNotFoundError
HF_HUB_AVAILABLE = True
except ImportError:
HF_HUB_AVAILABLE = False
RepositoryNotFoundError = Exception
try:
import requests
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
logger = logging.getLogger(__name__)
settings = get_settings()
HF_TOKEN_ENV = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
_is_hf_space = bool(os.getenv("SPACE_ID"))
# Changed default to "public" to enable models by default
_default_hf_mode = "public"
HF_MODE = os.getenv("HF_MODE", _default_hf_mode).lower()
if HF_MODE not in ("off", "public", "auth"):
HF_MODE = "off"
logger.warning(f"Invalid HF_MODE, resetting to 'off'")
if HF_MODE == "auth" and not HF_TOKEN_ENV:
HF_MODE = "off"
logger.warning("HF_MODE='auth' but no HF_TOKEN found, resetting to 'off'")
# Linked models in HF Space - these are pre-validated
LINKED_MODEL_IDS = {
"cardiffnlp/twitter-roberta-base-sentiment-latest",
"ProsusAI/finbert",
"mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis",
"ElKulako/cryptobert",
"kk08/CryptoBERT",
"agarkovv/CryptoTrader-LM",
"StephanAkkerman/FinTwitBERT-sentiment",
"OpenC/crypto-gpt-o3-mini",
"burakutf/finetuned-finbert-crypto",
"mathugo/crypto_news_bert",
"mayurjadhav/crypto-sentiment-model",
"yiyanghkust/finbert-tone",
"facebook/bart-large-cnn",
"facebook/bart-large-mnli",
"distilbert-base-uncased-finetuned-sst-2-english",
"nlptown/bert-base-multilingual-uncased-sentiment",
"finiteautomata/bertweet-base-sentiment-analysis",
}
# Extended Model Catalog - Using VERIFIED public models only
# These models are tested and confirmed working on HuggingFace Hub
CRYPTO_SENTIMENT_MODELS = [
"kk08/CryptoBERT", # Crypto-specific sentiment binary classification
"ElKulako/cryptobert", # Crypto social sentiment (Bullish/Neutral/Bearish)
"mayurjadhav/crypto-sentiment-model", # Crypto sentiment analysis
"mathugo/crypto_news_bert", # Crypto news sentiment
"burakutf/finetuned-finbert-crypto", # Finetuned FinBERT for crypto
"cardiffnlp/twitter-roberta-base-sentiment-latest", # Fallback
"distilbert-base-uncased-finetuned-sst-2-english", # General sentiment
]
SOCIAL_SENTIMENT_MODELS = [
"ElKulako/cryptobert", # Crypto social sentiment
"cardiffnlp/twitter-roberta-base-sentiment-latest", # Twitter sentiment
"finiteautomata/bertweet-base-sentiment-analysis", # BERTweet sentiment
"nlptown/bert-base-multilingual-uncased-sentiment", # Multilingual sentiment
"distilbert-base-uncased-finetuned-sst-2-english", # General sentiment
]
FINANCIAL_SENTIMENT_MODELS = [
"StephanAkkerman/FinTwitBERT-sentiment", # Financial tweet sentiment
"ProsusAI/finbert", # Financial sentiment
"yiyanghkust/finbert-tone", # Financial tone classification
"mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", # Financial news
"cardiffnlp/twitter-roberta-base-sentiment-latest", # Fallback
]
NEWS_SENTIMENT_MODELS = [
"StephanAkkerman/FinTwitBERT-sentiment", # News sentiment
"mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", # Financial news
"ProsusAI/finbert", # Financial news sentiment
"cardiffnlp/twitter-roberta-base-sentiment-latest", # Fallback
]
GENERATION_MODELS = [
"OpenC/crypto-gpt-o3-mini", # Crypto/DeFi text generation
"gpt2", # General text generation fallback
"distilgpt2", # Lightweight text generation
]
TRADING_SIGNAL_MODELS = [
"agarkovv/CryptoTrader-LM", # BTC/ETH trading signals (buy/sell/hold)
]
SUMMARIZATION_MODELS = [
"FurkanGozukara/Crypto-Financial-News-Summarizer", # Crypto/Financial news summarization
"facebook/bart-large-cnn", # BART summarization
"facebook/bart-large-mnli", # BART zero-shot classification
"google/pegasus-xsum", # Pegasus summarization
]
ZERO_SHOT_MODELS = [
"facebook/bart-large-mnli", # Zero-shot classification
"typeform/distilbert-base-uncased-mnli", # DistilBERT NLI
]
CLASSIFICATION_MODELS = [
"yiyanghkust/finbert-tone", # Financial tone classification
"distilbert-base-uncased-finetuned-sst-2-english", # Sentiment classification
]
@dataclass(frozen=True)
class PipelineSpec:
key: str
task: str
model_id: str
requires_auth: bool = False
category: str = "sentiment"
MODEL_SPECS: Dict[str, PipelineSpec] = {}
# Legacy models
for lk in ["sentiment_twitter", "sentiment_financial", "summarization", "crypto_sentiment"]:
if lk in HUGGINGFACE_MODELS:
MODEL_SPECS[lk] = PipelineSpec(
key=lk,
task="sentiment-analysis" if "sentiment" in lk else "summarization",
model_id=HUGGINGFACE_MODELS[lk],
category="legacy"
)
# Crypto sentiment - Add named keys for required models
for i, mid in enumerate(CRYPTO_SENTIMENT_MODELS):
key = f"crypto_sent_{i}"
MODEL_SPECS[key] = PipelineSpec(
key=key, task="text-classification", model_id=mid,
category="sentiment_crypto", requires_auth=("ElKulako" in mid)
)
# Add specific named aliases for required models
MODEL_SPECS["crypto_sent_kk08"] = PipelineSpec(
key="crypto_sent_kk08", task="sentiment-analysis", model_id="kk08/CryptoBERT",
category="sentiment_crypto", requires_auth=False
)
# Social
for i, mid in enumerate(SOCIAL_SENTIMENT_MODELS):
key = f"social_sent_{i}"
MODEL_SPECS[key] = PipelineSpec(
key=key, task="text-classification", model_id=mid,
category="sentiment_social", requires_auth=("ElKulako" in mid)
)
# Add specific named alias
MODEL_SPECS["crypto_sent_social"] = PipelineSpec(
key="crypto_sent_social", task="text-classification", model_id="ElKulako/cryptobert",
category="sentiment_social", requires_auth=True
)
# Financial
for i, mid in enumerate(FINANCIAL_SENTIMENT_MODELS):
key = f"financial_sent_{i}"
MODEL_SPECS[key] = PipelineSpec(
key=key, task="text-classification", model_id=mid, category="sentiment_financial"
)
# Add specific named alias
MODEL_SPECS["crypto_sent_fin"] = PipelineSpec(
key="crypto_sent_fin", task="sentiment-analysis", model_id="StephanAkkerman/FinTwitBERT-sentiment",
category="sentiment_financial", requires_auth=False
)
# News
for i, mid in enumerate(NEWS_SENTIMENT_MODELS):
key = f"news_sent_{i}"
MODEL_SPECS[key] = PipelineSpec(
key=key, task="text-classification", model_id=mid, category="sentiment_news"
)
# Generation models (for crypto/DeFi text generation)
for i, mid in enumerate(GENERATION_MODELS):
key = f"crypto_gen_{i}"
MODEL_SPECS[key] = PipelineSpec(
key=key, task="text-generation", model_id=mid, category="analysis_generation"
)
# Add specific named alias
MODEL_SPECS["crypto_ai_analyst"] = PipelineSpec(
key="crypto_ai_analyst", task="text-generation", model_id="OpenC/crypto-gpt-o3-mini",
category="analysis_generation", requires_auth=False
)
# Trading signal models
for i, mid in enumerate(TRADING_SIGNAL_MODELS):
key = f"crypto_trade_{i}"
MODEL_SPECS[key] = PipelineSpec(
key=key, task="text-generation", model_id=mid, category="trading_signal"
)
# Add specific named alias
MODEL_SPECS["crypto_trading_lm"] = PipelineSpec(
key="crypto_trading_lm", task="text-generation", model_id="agarkovv/CryptoTrader-LM",
category="trading_signal", requires_auth=False
)
# Summarization models
for i, mid in enumerate(SUMMARIZATION_MODELS):
MODEL_SPECS[f"summarization_{i}"] = PipelineSpec(
key=f"summarization_{i}", task="summarization", model_id=mid, category="summarization"
)
# Add specific named alias for BART summarization
MODEL_SPECS["summarization_bart"] = PipelineSpec(
key="summarization_bart", task="summarization", model_id="facebook/bart-large-cnn",
category="summarization", requires_auth=False
)
# Zero-shot classification models
for i, mid in enumerate(ZERO_SHOT_MODELS):
key = f"zero_shot_{i}"
MODEL_SPECS[key] = PipelineSpec(
key=key, task="zero-shot-classification", model_id=mid, category="zero_shot"
)
# Add specific named alias
MODEL_SPECS["zero_shot_bart"] = PipelineSpec(
key="zero_shot_bart", task="zero-shot-classification", model_id="facebook/bart-large-mnli",
category="zero_shot", requires_auth=False
)
# Classification models
for i, mid in enumerate(CLASSIFICATION_MODELS):
key = f"classification_{i}"
MODEL_SPECS[key] = PipelineSpec(
key=key, task="text-classification", model_id=mid, category="classification"
)
# Add specific named alias for FinBERT tone
MODEL_SPECS["classification_finbert_tone"] = PipelineSpec(
key="classification_finbert_tone", task="text-classification", model_id="yiyanghkust/finbert-tone",
category="classification", requires_auth=False
)
class ModelNotAvailable(RuntimeError): pass
@dataclass
class ModelHealthEntry:
"""Health tracking entry for a model"""
key: str
name: str
status: str = "unknown" # "healthy", "degraded", "unavailable", "unknown"
last_success: Optional[float] = None
last_error: Optional[float] = None
error_count: int = 0
success_count: int = 0
cooldown_until: Optional[float] = None
last_error_message: Optional[str] = None
class ModelRegistry:
def __init__(self):
self._pipelines = {}
self._lock = threading.Lock()
self._initialized = False
self._failed_models = {} # Track failed models with reasons
# Health tracking for self-healing
self._health_registry = {} # key -> health entry
def _get_or_create_health_entry(self, key: str) -> ModelHealthEntry:
"""Get or create health entry for a model"""
if key not in self._health_registry:
spec = MODEL_SPECS.get(key)
self._health_registry[key] = ModelHealthEntry(
key=key,
name=spec.model_id if spec else key,
status="unknown"
)
return self._health_registry[key]
def _update_health_on_success(self, key: str):
"""Update health registry after successful model call"""
entry = self._get_or_create_health_entry(key)
entry.last_success = time.time()
entry.success_count += 1
# Reset error count gradually or fully on success
if entry.error_count > 0:
entry.error_count = max(0, entry.error_count - 1)
# Recovery logic: if we have enough successes, mark as healthy
if entry.success_count >= settings.health_success_recovery_count:
entry.status = "healthy"
entry.cooldown_until = None
# Clear from failed models if present
if key in self._failed_models:
del self._failed_models[key]
def _update_health_on_failure(self, key: str, error_msg: str):
"""Update health registry after failed model call"""
entry = self._get_or_create_health_entry(key)
entry.last_error = time.time()
entry.error_count += 1
entry.last_error_message = error_msg
entry.success_count = 0 # Reset success count on failure
# Determine status based on error count
if entry.error_count >= settings.health_error_threshold:
entry.status = "unavailable"
# Set cooldown period
entry.cooldown_until = time.time() + settings.health_cooldown_seconds
elif entry.error_count >= (settings.health_error_threshold // 2):
entry.status = "degraded"
else:
entry.status = "healthy"
def _is_in_cooldown(self, key: str) -> bool:
"""Check if model is in cooldown period"""
if key not in self._health_registry:
return False
entry = self._health_registry[key]
if entry.cooldown_until is None:
return False
return time.time() < entry.cooldown_until
def attempt_model_reinit(self, key: str) -> Dict[str, Any]:
"""
Attempt to re-initialize a failed model after cooldown.
Returns result dict with status and message.
"""
if key not in MODEL_SPECS:
return {"status": "error", "message": f"Unknown model key: {key}"}
entry = self._get_or_create_health_entry(key)
# Check if enough time has passed since last error
if entry.last_error:
time_since_error = time.time() - entry.last_error
if time_since_error < settings.health_reinit_cooldown_seconds:
return {
"status": "cooldown",
"message": f"Model in cooldown, wait {int(settings.health_reinit_cooldown_seconds - time_since_error)}s",
"cooldown_remaining": int(settings.health_reinit_cooldown_seconds - time_since_error)
}
# Try to reinitialize
with self._lock:
# Remove from failed models and pipelines to force reload
if key in self._failed_models:
del self._failed_models[key]
if key in self._pipelines:
del self._pipelines[key]
# Reset health entry
entry.error_count = 0
entry.status = "unknown"
entry.cooldown_until = None
try:
# Attempt to load
pipe = self.get_pipeline(key)
return {
"status": "success",
"message": f"Model {key} successfully reinitialized",
"model": MODEL_SPECS[key].model_id
}
except Exception as e:
return {
"status": "failed",
"message": f"Reinitialization failed: {str(e)[:200]}",
"error": str(e)[:200]
}
def get_model_health_registry(self) -> List[Dict[str, Any]]:
"""Get health registry for all models"""
result = []
for key, entry in self._health_registry.items():
spec = MODEL_SPECS.get(key)
result.append({
"key": entry.key,
"name": entry.name,
"model_id": spec.model_id if spec else entry.name,
"category": spec.category if spec else "unknown",
"status": entry.status,
"last_success": entry.last_success,
"last_error": entry.last_error,
"error_count": entry.error_count,
"success_count": entry.success_count,
"cooldown_until": entry.cooldown_until,
"in_cooldown": self._is_in_cooldown(key),
"last_error_message": entry.last_error_message,
"loaded": key in self._pipelines
})
# Add models that exist in specs but not in health registry
for key, spec in MODEL_SPECS.items():
if key not in self._health_registry:
result.append({
"key": key,
"name": spec.model_id,
"model_id": spec.model_id,
"category": spec.category,
"status": "unknown",
"last_success": None,
"last_error": None,
"error_count": 0,
"success_count": 0,
"cooldown_until": None,
"in_cooldown": False,
"last_error_message": None,
"loaded": key in self._pipelines
})
return result
def _should_use_token(self, spec: PipelineSpec) -> Optional[str]:
"""Determine if and which token to use for model loading"""
if HF_MODE == "off":
return None
# In public mode, try to use token if available (for better rate limits)
if HF_MODE == "public":
# Use token if available to avoid rate limiting
return HF_TOKEN_ENV if HF_TOKEN_ENV else None
# In auth mode, always use token if available
if HF_MODE == "auth":
if HF_TOKEN_ENV:
return HF_TOKEN_ENV
else:
logger.warning(f"Model {spec.model_id} - auth mode but no token available")
return None
return None
def get_pipeline(self, key: str):
"""Get pipeline for a model key, with robust error handling and health tracking"""
if HF_MODE == "off":
raise ModelNotAvailable("HF_MODE=off")
if not TRANSFORMERS_AVAILABLE:
raise ModelNotAvailable("transformers not installed")
if key not in MODEL_SPECS:
# Provide helpful error with available keys
available_keys = list(MODEL_SPECS.keys())[:20] # Show first 20
similar_keys = [k for k in MODEL_SPECS.keys() if key.lower() in k.lower() or k.lower() in key.lower()][:5]
error_msg = f"Unknown model key: '{key}'. "
if similar_keys:
error_msg += f"Did you mean: {', '.join(similar_keys)}? "
error_msg += f"Available keys: {len(MODEL_SPECS)} total. "
if len(available_keys) < len(MODEL_SPECS):
error_msg += f"Sample: {', '.join(available_keys[:10])}..."
else:
error_msg += f"Keys: {', '.join(available_keys)}"
raise ModelNotAvailable(error_msg)
spec = MODEL_SPECS[key]
# Check if model is in cooldown
if self._is_in_cooldown(key):
entry = self._health_registry[key]
cooldown_remaining = int(entry.cooldown_until - time.time())
raise ModelNotAvailable(f"Model in cooldown for {cooldown_remaining}s: {entry.last_error_message or 'previous failures'}")
# Return cached pipeline if available
if key in self._pipelines:
return self._pipelines[key]
# Check if this model already failed
if key in self._failed_models:
raise ModelNotAvailable(f"Model failed previously: {self._failed_models[key]}")
with self._lock:
# Double-check after acquiring lock
if key in self._pipelines:
return self._pipelines[key]
if key in self._failed_models:
raise ModelNotAvailable(f"Model failed previously: {self._failed_models[key]}")
# Determine token usage
auth_token = self._should_use_token(spec)
logger.info(f"Loading model: {spec.model_id} (mode={HF_MODE}, auth={'yes' if auth_token else 'no'})")
# Log token status for debugging
if spec.requires_auth and not auth_token:
logger.warning(f"Model {spec.model_id} requires auth but no token provided")
try:
# Use token parameter instead of deprecated use_auth_token
pipeline_kwargs = {
"task": spec.task,
"model": spec.model_id,
}
# Only add token if we have one and it's needed
if auth_token:
pipeline_kwargs["token"] = auth_token
logger.debug(f"Using authentication token for {spec.model_id}")
elif spec.requires_auth:
# Try with HF_TOKEN_ENV if available even if not explicitly required
if HF_TOKEN_ENV:
pipeline_kwargs["token"] = HF_TOKEN_ENV
logger.info(f"Using HF_TOKEN_ENV for {spec.model_id} (requires_auth=True)")
else:
logger.warning(f"No token available for model {spec.model_id} that requires auth")
else:
# Explicitly set to None to avoid using expired tokens
pipeline_kwargs["token"] = None
self._pipelines[key] = pipeline(**pipeline_kwargs)
logger.info(f"✅ Successfully loaded model: {spec.model_id}")
# Update health on successful load
self._update_health_on_success(key)
return self._pipelines[key]
except RepositoryNotFoundError as e:
error_msg = f"Repository not found: {spec.model_id} - Model may not exist on Hugging Face Hub"
logger.warning(f"{error_msg} - {str(e)}")
logger.info(f"💡 Tip: Verify model exists at https://huggingface.co/{spec.model_id}")
self._failed_models[key] = error_msg
raise ModelNotAvailable(error_msg) from e
except OSError as e:
# Handle "not a valid model identifier" errors
error_str = str(e)
if "not a local folder" in error_str and "not a valid model identifier" in error_str:
error_msg = f"Model identifier invalid: {spec.model_id} - May not exist or requires authentication"
logger.warning(f"{error_msg}")
if spec.requires_auth and not auth_token and not HF_TOKEN_ENV:
logger.info(f"💡 Tip: This model may require HF_TOKEN. Set HF_TOKEN environment variable.")
logger.info(f"💡 Tip: Check if model exists at https://huggingface.co/{spec.model_id}")
else:
error_msg = f"OSError loading {spec.model_id}: {str(e)[:200]}"
logger.warning(error_msg)
self._failed_models[key] = error_msg
raise ModelNotAvailable(error_msg) from e
except Exception as e:
error_type = type(e).__name__
error_msg = f"{error_type}: {str(e)[:100]}"
# Check for HTTP errors (401, 403, 404)
if REQUESTS_AVAILABLE and isinstance(e, requests.exceptions.HTTPError):
status_code = getattr(e.response, 'status_code', None)
if status_code == 401:
error_msg = f"Authentication failed (401) for {spec.model_id}"
elif status_code == 403:
error_msg = f"Access forbidden (403) for {spec.model_id}"
elif status_code == 404:
error_msg = f"Model not found (404): {spec.model_id}"
# Check for OSError from transformers
if isinstance(e, OSError):
if "not a valid model identifier" in str(e):
# For linked models in HF Space, skip validation error
if spec.model_id in LINKED_MODEL_IDS:
logger.info(f"Linked model {spec.model_id} - trying without validation check")
# Don't mark as failed yet, it might work
pass
else:
error_msg = f"Invalid model identifier: {spec.model_id}"
elif "401" in str(e) or "403" in str(e):
error_msg = f"Authentication required for {spec.model_id}"
else:
error_msg = f"OS Error loading {spec.model_id}: {str(e)[:100]}"
logger.warning(f"Failed to load {spec.model_id}: {error_msg}")
self._failed_models[key] = error_msg
# Update health on failure
self._update_health_on_failure(key, error_msg)
raise ModelNotAvailable(error_msg) from e
return self._pipelines[key]
def call_model_safe(self, key: str, text: str, **kwargs) -> Dict[str, Any]:
"""
Safely call a model with health tracking.
Returns result dict with status and data or error.
"""
try:
pipe = self.get_pipeline(key)
result = pipe(text[:512], **kwargs)
# Update health on successful call
self._update_health_on_success(key)
return {
"status": "success",
"data": result,
"model_key": key,
"model_id": MODEL_SPECS[key].model_id if key in MODEL_SPECS else key
}
except ModelNotAvailable as e:
# Don't update health here, already updated in get_pipeline
return {
"status": "unavailable",
"error": str(e),
"model_key": key
}
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)[:200]}"
logger.warning(f"Model call failed for {key}: {error_msg}")
# Update health on call failure
self._update_health_on_failure(key, error_msg)
return {
"status": "error",
"error": error_msg,
"model_key": key
}
def get_registry_status(self) -> Dict[str, Any]:
"""Get detailed registry status with all models"""
items = []
for key, spec in MODEL_SPECS.items():
loaded = key in self._pipelines
error = self._failed_models.get(key) if key in self._failed_models else None
items.append({
"key": key,
"name": spec.model_id,
"task": spec.task,
"category": spec.category,
"loaded": loaded,
"error": error,
"requires_auth": spec.requires_auth
})
return {
"models_total": len(MODEL_SPECS),
"models_loaded": len(self._pipelines),
"models_failed": len(self._failed_models),
"items": items,
"hf_mode": HF_MODE,
"transformers_available": TRANSFORMERS_AVAILABLE,
"initialized": self._initialized
}
def initialize_models(self, force_reload: bool = False, max_models: int = None):
"""Initialize models with fallback logic - tries primary models first
Args:
force_reload: If True, reinitialize even if already initialized
max_models: Maximum number of models to load (None = load all available)
"""
if self._initialized and not force_reload:
return {
"status": "already_initialized",
"mode": HF_MODE,
"models_loaded": len(self._pipelines),
"failed_count": len(self._failed_models),
"total_specs": len(MODEL_SPECS)
}
# Reset if forcing reload
if force_reload:
logger.info("Force reload requested - resetting initialization state")
self._initialized = False
# Don't clear pipelines - keep already loaded models
if HF_MODE == "off":
logger.info("HF_MODE=off, using fallback-only mode")
self._initialized = True
return {
"status": "fallback_only",
"mode": HF_MODE,
"models_loaded": 0,
"error": "HF_MODE=off - using lexical fallback",
"total_specs": len(MODEL_SPECS)
}
if not TRANSFORMERS_AVAILABLE:
logger.warning("Transformers not available, using fallback-only mode")
self._initialized = True
return {
"status": "fallback_only",
"mode": HF_MODE,
"models_loaded": 0,
"error": "transformers library not installed - using lexical fallback",
"total_specs": len(MODEL_SPECS)
}
logger.info(f"Starting model initialization (HF_MODE={HF_MODE}, TRANSFORMERS_AVAILABLE={TRANSFORMERS_AVAILABLE})")
logger.info(f"Total models in catalog: {len(MODEL_SPECS)}")
logger.info(f"HF_TOKEN available: {bool(HF_TOKEN_ENV)}")
loaded, failed = [], []
# Try to load at least one model from each category with expanded fallback
categories_to_try = {
"crypto": ["crypto_sent_0", "crypto_sent_1", "crypto_sent_kk08", "crypto_sent_2"],
"financial": ["financial_sent_0", "financial_sent_1", "crypto_sent_fin"],
"social": ["social_sent_0", "social_sent_1", "crypto_sent_social"],
"news": ["news_sent_0", "news_sent_1", "financial_sent_0"] # Financial models can analyze news
}
# If max_models is set, try to load more models from each category
models_per_category = 1 if max_models is None else max(1, max_models // len(categories_to_try))
for category, keys in categories_to_try.items():
category_loaded = False
models_loaded_in_category = 0
logger.info(f"[{category}] Attempting to load models from category...")
for key in keys:
if max_models and len(loaded) >= max_models:
logger.info(f"Reached max_models limit ({max_models}), stopping")
break
if models_loaded_in_category >= models_per_category:
logger.debug(f"[{category}] Already loaded {models_loaded_in_category} model(s), moving to next category")
break
if key not in MODEL_SPECS:
logger.debug(f"[{category}] Model key '{key}' not in MODEL_SPECS, trying alternatives...")
# Try to find alternative key in same category
alt_keys = [k for k in MODEL_SPECS.keys()
if (k.startswith(f"{category.split('_')[0]}_sent_") or
MODEL_SPECS[k].category == f"sentiment_{category.split('_')[0]}")]
if alt_keys:
logger.debug(f"[{category}] Found {len(alt_keys)} alternative keys, adding to queue")
keys.extend(alt_keys[:2]) # Add 2 alternatives
continue
spec = MODEL_SPECS[key]
logger.info(f"[{category}] Attempting to load model: {key} ({spec.model_id})")
try:
pipeline = self.get_pipeline(key)
loaded.append(key)
models_loaded_in_category += 1
category_loaded = True
logger.info(f"[{category}] ✅ Successfully loaded model: {key} ({spec.model_id})")
# If we've loaded one from this category and max_models is None, move to next category
if max_models is None:
break
except ModelNotAvailable as e:
error_msg = str(e)[:200] # Allow longer error messages
logger.warning(f"[{category}] ⚠️ Model {key} not available: {error_msg}")
failed.append((key, error_msg))
# Continue to next key in fallback chain
continue
except Exception as e:
error_msg = f"{type(e).__name__}: {str(e)[:200]}"
logger.error(f"[{category}] ❌ Model {key} initialization error: {error_msg}", exc_info=True)
failed.append((key, error_msg))
# Continue to next key in fallback chain
continue
if category_loaded:
logger.info(f"[{category}] Category initialization complete: {models_loaded_in_category} model(s) loaded")
else:
logger.warning(f"[{category}] ⚠️ No models loaded from this category")
# Determine status - be more lenient
if len(loaded) > 0:
status = "ok"
logger.info(f"✅ Model initialization complete: {len(loaded)} model(s) loaded successfully")
else:
# No models loaded, but that's OK - we have fallback
logger.warning("⚠️ No HF models loaded, using fallback-only mode")
status = "fallback_only"
self._initialized = True
result = {
"status": status,
"mode": HF_MODE,
"models_loaded": len(loaded),
"models_failed": len(failed),
"loaded": loaded[:20], # Show more loaded models
"failed": failed[:20], # Show more failed models
"failed_count": len(self._failed_models),
"total_available_keys": len(MODEL_SPECS),
"available_keys_sample": list(MODEL_SPECS.keys())[:30],
"transformers_available": TRANSFORMERS_AVAILABLE,
"hf_token_available": bool(HF_TOKEN_ENV),
"note": "Fallback lexical analysis available" if len(loaded) == 0 else None
}
# Add initialization error summary if any
if len(failed) > 0:
result["initialization_errors"] = {
"total": len(failed),
"summary": f"{len(failed)} model(s) failed to initialize",
"details": failed[:10] # Show first 10 errors for debugging
}
if len(loaded) == 0:
result["error"] = "No models could be initialized. Check model IDs, HF_TOKEN, or network connectivity."
result["debugging_tips"] = [
"Verify HF_TOKEN is set in environment variables",
"Check if models exist on Hugging Face Hub",
"Verify network connectivity to huggingface.co",
"Check transformers library is installed: pip install transformers",
"Review logs for specific error messages"
]
logger.info(f"Model initialization summary: {result['status']}, loaded={result['models_loaded']}, failed={result['models_failed']}, total_specs={result['total_available_keys']}")
return result
_registry = ModelRegistry()
def initialize_models(force_reload: bool = False, max_models: int = None):
"""Initialize models with optional parameters
Args:
force_reload: If True, reinitialize even if already initialized
max_models: Maximum number of models to load (None = load one per category)
"""
return _registry.initialize_models(force_reload=force_reload, max_models=max_models)
def get_model_health_registry() -> List[Dict[str, Any]]:
"""Get health registry for all models"""
return _registry.get_model_health_registry()
def attempt_model_reinit(model_key: str) -> Dict[str, Any]:
"""Attempt to re-initialize a failed model"""
return _registry.attempt_model_reinit(model_key)
def call_model_safe(model_key: str, text: str, **kwargs) -> Dict[str, Any]:
"""Safely call a model with health tracking"""
return _registry.call_model_safe(model_key, text, **kwargs)
def ensemble_crypto_sentiment(text: str) -> Dict[str, Any]:
"""Ensemble crypto sentiment with fallback model selection"""
if not TRANSFORMERS_AVAILABLE:
logger.warning("Transformers not available, using fallback")
return basic_sentiment_fallback(text)
if HF_MODE == "off":
logger.warning("HF_MODE=off, using fallback")
return basic_sentiment_fallback(text)
results, labels_count, total_conf = {}, {"bullish": 0, "bearish": 0, "neutral": 0}, 0.0
# Try models in order with expanded fallback chain
# Primary candidates
candidate_keys = ["crypto_sent_0", "crypto_sent_1", "crypto_sent_2"]
# Fallback: try named aliases
fallback_keys = ["crypto_sent_kk08", "crypto_sent_social"]
# Last resort: try any crypto sentiment model
all_crypto_keys = [k for k in MODEL_SPECS.keys() if k.startswith("crypto_sent_") or MODEL_SPECS[k].category == "sentiment_crypto"]
# Combine all candidate keys
all_candidates = candidate_keys + fallback_keys + [k for k in all_crypto_keys if k not in candidate_keys and k not in fallback_keys][:5]
for key in all_candidates:
if key not in MODEL_SPECS:
continue
try:
pipe = _registry.get_pipeline(key)
res = pipe(text[:512])
if isinstance(res, list) and res:
res = res[0]
label = res.get("label", "NEUTRAL").upper()
score = res.get("score", 0.5)
# Map labels to our standard format
mapped = "bullish" if "POSITIVE" in label or "BULLISH" in label or "LABEL_2" in label else (
"bearish" if "NEGATIVE" in label or "BEARISH" in label or "LABEL_0" in label else "neutral"
)
spec = MODEL_SPECS[key]
results[spec.model_id] = {"label": mapped, "score": score}
labels_count[mapped] += 1
total_conf += score
# If we got at least one result, we can proceed
if len(results) >= 1:
break # Got at least one working model
except ModelNotAvailable:
continue # Try next model
except Exception as e:
logger.warning(f"Ensemble failed for {key}: {str(e)[:100]}")
continue
if not results:
logger.warning("No HF models available, using fallback")
return basic_sentiment_fallback(text)
final = max(labels_count, key=labels_count.get)
avg_conf = total_conf / len(results)
return {
"label": final,
"confidence": avg_conf,
"scores": results,
"model_count": len(results),
"available": True,
"engine": "huggingface"
}
def analyze_crypto_sentiment(text: str): return ensemble_crypto_sentiment(text)
def analyze_financial_sentiment(text: str):
"""Analyze financial sentiment with fallback"""
if not TRANSFORMERS_AVAILABLE:
logger.warning("Transformers not available, using fallback")
return basic_sentiment_fallback(text)
if HF_MODE == "off":
logger.warning("HF_MODE=off, using fallback")
return basic_sentiment_fallback(text)
# Try models in order with expanded fallback
primary_keys = ["financial_sent_0", "financial_sent_1"]
fallback_keys = ["crypto_sent_fin"]
# Try any financial sentiment model as last resort
all_financial_keys = [k for k in MODEL_SPECS.keys() if k.startswith("financial_sent_") or MODEL_SPECS[k].category == "sentiment_financial"]
all_candidates = primary_keys + fallback_keys + [k for k in all_financial_keys if k not in primary_keys and k not in fallback_keys][:3]
for key in all_candidates:
if key not in MODEL_SPECS:
continue
try:
pipe = _registry.get_pipeline(key)
res = pipe(text[:512])
if isinstance(res, list) and res:
res = res[0]
label = res.get("label", "neutral").upper()
score = res.get("score", 0.5)
# Map to standard format
mapped = "bullish" if "POSITIVE" in label or "LABEL_2" in label else (
"bearish" if "NEGATIVE" in label or "LABEL_0" in label else "neutral"
)
return {"label": mapped, "score": score, "confidence": score, "available": True, "engine": "huggingface", "model": MODEL_SPECS[key].model_id}
except ModelNotAvailable:
continue
except Exception as e:
logger.warning(f"Financial sentiment failed for {key}: {str(e)[:100]}")
continue
logger.warning("No HF financial models available, using fallback")
return basic_sentiment_fallback(text)
def analyze_social_sentiment(text: str):
"""Analyze social sentiment with fallback"""
if not TRANSFORMERS_AVAILABLE:
logger.warning("Transformers not available, using fallback")
return basic_sentiment_fallback(text)
if HF_MODE == "off":
logger.warning("HF_MODE=off, using fallback")
return basic_sentiment_fallback(text)
# Try models in order with expanded fallback
primary_keys = ["social_sent_0", "social_sent_1"]
fallback_keys = ["crypto_sent_social"]
# Try any social sentiment model as last resort
all_social_keys = [k for k in MODEL_SPECS.keys() if k.startswith("social_sent_") or MODEL_SPECS[k].category == "sentiment_social"]
all_candidates = primary_keys + fallback_keys + [k for k in all_social_keys if k not in primary_keys and k not in fallback_keys][:3]
for key in all_candidates:
if key not in MODEL_SPECS:
continue
try:
pipe = _registry.get_pipeline(key)
res = pipe(text[:512])
if isinstance(res, list) and res:
res = res[0]
label = res.get("label", "neutral").upper()
score = res.get("score", 0.5)
# Map to standard format
mapped = "bullish" if "POSITIVE" in label or "LABEL_2" in label else (
"bearish" if "NEGATIVE" in label or "LABEL_0" in label else "neutral"
)
return {"label": mapped, "score": score, "confidence": score, "available": True, "engine": "huggingface", "model": MODEL_SPECS[key].model_id}
except ModelNotAvailable:
continue
except Exception as e:
logger.warning(f"Social sentiment failed for {key}: {str(e)[:100]}")
continue
logger.warning("No HF social models available, using fallback")
return basic_sentiment_fallback(text)
def analyze_market_text(text: str): return ensemble_crypto_sentiment(text)
def analyze_chart_points(data: Sequence[Mapping[str, Any]], indicators: Optional[List[str]] = None):
if not data: return {"trend": "neutral", "strength": 0, "analysis": "No data"}
prices = [float(p.get("price", 0)) for p in data if p.get("price")]
if not prices: return {"trend": "neutral", "strength": 0, "analysis": "No price data"}
first, last = prices[0], prices[-1]
change = ((last - first) / first * 100) if first > 0 else 0
if change > 5: trend, strength = "bullish", min(abs(change) / 10, 1.0)
elif change < -5: trend, strength = "bearish", min(abs(change) / 10, 1.0)
else: trend, strength = "neutral", abs(change) / 5
return {"trend": trend, "strength": strength, "change_pct": change, "support": min(prices), "resistance": max(prices), "analysis": f"Price moved {change:.2f}% showing {trend} trend"}
def analyze_news_item(item: Dict[str, Any]):
text = item.get("title", "") + " " + item.get("description", "")
sent = ensemble_crypto_sentiment(text)
return {**item, "sentiment": sent["label"], "sentiment_confidence": sent["confidence"], "sentiment_details": sent}
def get_model_info():
return {
"transformers_available": TRANSFORMERS_AVAILABLE,
"hf_auth_configured": bool(settings.hf_token),
"models_initialized": _registry._initialized,
"models_loaded": len(_registry._pipelines),
"model_catalog": {
"crypto_sentiment": CRYPTO_SENTIMENT_MODELS,
"social_sentiment": SOCIAL_SENTIMENT_MODELS,
"financial_sentiment": FINANCIAL_SENTIMENT_MODELS,
"news_sentiment": NEWS_SENTIMENT_MODELS,
"generation": GENERATION_MODELS,
"trading_signals": TRADING_SIGNAL_MODELS,
"summarization": SUMMARIZATION_MODELS,
"zero_shot": ZERO_SHOT_MODELS,
"classification": CLASSIFICATION_MODELS
},
"total_models": len(MODEL_SPECS),
"total_categories": 9
}
def basic_sentiment_fallback(text: str) -> Dict[str, Any]:
"""
Simple lexical-based sentiment fallback that doesn't require transformers.
Returns sentiment based on keyword matching.
"""
text_lower = text.lower()
# Define keyword lists
bullish_words = ["bullish", "rally", "surge", "pump", "breakout", "skyrocket",
"uptrend", "buy", "accumulation", "moon", "gain", "profit",
"up", "high", "rise", "growth", "positive", "strong"]
bearish_words = ["bearish", "dump", "crash", "selloff", "downtrend", "collapse",
"sell", "capitulation", "panic", "fear", "drop", "loss",
"down", "low", "fall", "decline", "negative", "weak"]
# Count matches
bullish_count = sum(1 for word in bullish_words if word in text_lower)
bearish_count = sum(1 for word in bearish_words if word in text_lower)
# Determine sentiment
if bullish_count == 0 and bearish_count == 0:
label = "neutral"
confidence = 0.5
bullish_score = 0.0
bearish_score = 0.0
neutral_score = 1.0
elif bullish_count > bearish_count:
label = "bullish"
diff = bullish_count - bearish_count
confidence = min(0.6 + (diff * 0.05), 0.9)
bullish_score = confidence
bearish_score = 0.0
neutral_score = 0.0
else: # bearish_count > bullish_count
label = "bearish"
diff = bearish_count - bullish_count
confidence = min(0.6 + (diff * 0.05), 0.9)
bearish_score = confidence
bullish_score = 0.0
neutral_score = 0.0
return {
"label": label,
"confidence": confidence,
"score": confidence,
"scores": {
"bullish": round(bullish_score, 3),
"bearish": round(bearish_score, 3),
"neutral": round(neutral_score, 3)
},
"available": True, # Set to True so frontend renders it
"engine": "fallback_lexical",
"keyword_matches": {
"bullish": bullish_count,
"bearish": bearish_count
}
}
def list_available_model_keys() -> Dict[str, Any]:
"""List all available model keys with their details"""
return {
"total_keys": len(MODEL_SPECS),
"keys": list(MODEL_SPECS.keys()),
"by_category": {
category: [key for key, spec in MODEL_SPECS.items() if spec.category == category]
for category in set(spec.category for spec in MODEL_SPECS.values())
},
"details": {
key: {
"model_id": spec.model_id,
"task": spec.task,
"category": spec.category,
"requires_auth": spec.requires_auth
}
for key, spec in MODEL_SPECS.items()
}
}
def registry_status():
"""Get registry status with detailed information"""
status = {
"ok": HF_MODE != "off" and TRANSFORMERS_AVAILABLE and len(_registry._pipelines) > 0,
"initialized": _registry._initialized,
"pipelines_loaded": len(_registry._pipelines),
"pipelines_failed": len(_registry._failed_models),
"available_models": list(_registry._pipelines.keys()),
"failed_models": list(_registry._failed_models.keys())[:10], # Limit for brevity
"transformers_available": TRANSFORMERS_AVAILABLE,
"hf_mode": HF_MODE,
"total_specs": len(MODEL_SPECS),
"all_model_keys": list(MODEL_SPECS.keys())[:50] # Include sample of all keys
}
if HF_MODE == "off":
status["error"] = "HF_MODE=off"
elif not TRANSFORMERS_AVAILABLE:
status["error"] = "transformers not installed"
elif len(_registry._pipelines) == 0 and _registry._initialized:
status["error"] = "No models loaded successfully"
return status
# ==================== GAP FILLING SERVICE ====================
class GapFillingService:
"""
Uses AI models to fill missing data gaps
Combines interpolation, ML predictions, and external provider fallback
"""
def __init__(self, model_registry: Optional[ModelRegistry] = None):
self.model_registry = model_registry or _registry
self.gap_fill_attempts = {} # Track gap filling attempts
async def fill_missing_ohlc(
self,
symbol: str,
existing_data: List[Dict[str, Any]],
missing_timestamps: List[int]
) -> Dict[str, Any]:
"""
Synthesize missing OHLC candles using interpolation + ML
Args:
symbol: Trading pair symbol (e.g., "BTCUSDT")
existing_data: List of existing OHLC data points
missing_timestamps: List of timestamps with missing data
Returns:
Dictionary with filled data and metadata
"""
try:
if not existing_data or not missing_timestamps:
return {
"status": "error",
"message": "Insufficient data for gap filling",
"filled_count": 0,
"fallback": True
}
# Validate data structure
if not isinstance(existing_data, list) or not isinstance(missing_timestamps, list):
return {
"status": "error",
"message": "Invalid data types for gap filling",
"filled_count": 0,
"fallback": True
}
filled_data = []
confidence_scores = []
# Sort existing data by timestamp
try:
existing_data.sort(key=lambda x: x.get("timestamp", 0))
except (TypeError, AttributeError) as e:
logger.warning(f"Error sorting existing_data: {e}, using fallback")
# Fallback: use first and last if sorting fails
if len(existing_data) >= 2:
existing_data = [existing_data[0], existing_data[-1]]
else:
return {
"status": "error",
"message": "Cannot sort existing data",
"filled_count": 0,
"fallback": True
}
for missing_ts in missing_timestamps:
try:
# Find surrounding data points
before = [d for d in existing_data if d.get("timestamp", 0) < missing_ts]
after = [d for d in existing_data if d.get("timestamp", 0) > missing_ts]
if before and after:
# Linear interpolation between surrounding points
prev_point = before[-1]
next_point = after[0]
# Validate point structure
if not all(k in prev_point for k in ["timestamp", "close"]) or \
not all(k in next_point for k in ["timestamp", "open", "close"]):
logger.warning(f"Invalid data point structure, skipping timestamp {missing_ts}")
continue
# Calculate interpolation factor
time_diff = next_point["timestamp"] - prev_point["timestamp"]
position = (missing_ts - prev_point["timestamp"]) / time_diff if time_diff > 0 else 0.5
# Interpolate OHLC values with safe defaults
prev_close = prev_point.get("close", prev_point.get("price", 0))
next_open = next_point.get("open", next_point.get("close", prev_close))
next_close = next_point.get("close", next_open)
interpolated = {
"timestamp": missing_ts,
"open": prev_close * (1 - position) + next_open * position,
"high": max(prev_point.get("high", prev_close), next_point.get("high", next_close)) * (0.98 + position * 0.04),
"low": min(prev_point.get("low", prev_close), next_point.get("low", next_close)) * (1.02 - position * 0.04),
"close": prev_close * (1 - position) + next_close * position,
"volume": (prev_point.get("volume", 0) + next_point.get("volume", 0)) / 2,
"is_synthetic": True,
"method": "linear_interpolation"
}
# Calculate confidence based on distance
confidence = 0.95 ** (len(missing_timestamps)) # Decay with gap size
confidence_scores.append(confidence)
interpolated["confidence"] = confidence
filled_data.append(interpolated)
elif before:
# Only before data - use last known value
prev_point = before[-1]
filled_data.append({
"timestamp": missing_ts,
"open": prev_point.get("close", prev_point.get("price", 0)),
"high": prev_point.get("high", prev_point.get("close", 0)),
"low": prev_point.get("low", prev_point.get("close", 0)),
"close": prev_point.get("close", prev_point.get("price", 0)),
"volume": prev_point.get("volume", 0),
"is_synthetic": True,
"method": "last_known_value",
"confidence": 0.70
})
confidence_scores.append(0.70)
elif after:
# Only after data - use first known value
next_point = after[0]
filled_data.append({
"timestamp": missing_ts,
"open": next_point.get("open", next_point.get("price", 0)),
"high": next_point.get("high", next_point.get("open", 0)),
"low": next_point.get("low", next_point.get("open", 0)),
"close": next_point.get("open", next_point.get("price", 0)),
"volume": next_point.get("volume", 0),
"is_synthetic": True,
"method": "first_known_value",
"confidence": 0.70
})
confidence_scores.append(0.70)
except Exception as e:
logger.warning(f"Error filling timestamp {missing_ts}: {e}")
continue
return {
"status": "success",
"symbol": symbol,
"filled_count": len(filled_data),
"filled_data": filled_data,
"average_confidence": sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0,
"method": "interpolation",
"metadata": {
"existing_points": len(existing_data),
"missing_points": len(missing_timestamps),
"fill_rate": len(filled_data) / len(missing_timestamps) if missing_timestamps else 0
}
}
except Exception as e:
logger.error(f"Gap filling failed for {symbol}: {e}", exc_info=True)
return {
"status": "error",
"message": f"Gap filling failed: {str(e)[:200]}",
"filled_count": 0,
"fallback": True,
"error": str(e)[:200]
}
async def estimate_orderbook_depth(
self,
symbol: str,
mid_price: float,
depth_levels: int = 10
) -> Dict[str, Any]:
"""
Generate estimated order book when real data unavailable
Uses statistical models + market patterns
"""
try:
if mid_price <= 0:
return {
"status": "error",
"error": "Invalid mid_price",
"fallback": True
}
# Validate depth_levels
if depth_levels <= 0 or depth_levels > 50:
depth_levels = 10 # Default fallback
# Generate synthetic orderbook with realistic spread
spread_pct = 0.001 # 0.1% spread
level_spacing = 0.0005 # 0.05% per level
bids = []
asks = []
for i in range(depth_levels):
try:
# Bids (buy orders) below mid price
bid_price = mid_price * (1 - spread_pct / 2 - i * level_spacing)
bid_volume = 1.0 / (i + 1) * 10 # Decreasing volume with depth
# Validate calculated values
if bid_price <= 0 or not isinstance(bid_price, (int, float)):
continue
bids.append({
"price": round(bid_price, 8),
"volume": round(bid_volume, 4),
"is_synthetic": True
})
# Asks (sell orders) above mid price
ask_price = mid_price * (1 + spread_pct / 2 + i * level_spacing)
ask_volume = 1.0 / (i + 1) * 10
# Validate calculated values
if ask_price <= 0 or not isinstance(ask_price, (int, float)):
continue
asks.append({
"price": round(ask_price, 8),
"volume": round(ask_volume, 4),
"is_synthetic": True
})
except Exception as e:
logger.warning(f"Error generating orderbook level {i}: {e}")
continue
# Ensure we have at least some data
if not bids or not asks:
# Fallback: create minimal orderbook
bids = [{"price": round(mid_price * 0.999, 8), "volume": 1.0, "is_synthetic": True}]
asks = [{"price": round(mid_price * 1.001, 8), "volume": 1.0, "is_synthetic": True}]
return {
"status": "success",
"symbol": symbol,
"mid_price": mid_price,
"bids": bids,
"asks": asks,
"is_synthetic": True,
"confidence": 0.65, # Lower confidence for synthetic data
"method": "statistical_estimation",
"metadata": {
"spread_pct": spread_pct,
"depth_levels": depth_levels,
"total_bid_volume": sum(b["volume"] for b in bids),
"total_ask_volume": sum(a["volume"] for a in asks)
}
}
except Exception as e:
logger.error(f"Orderbook estimation failed for {symbol}: {e}", exc_info=True)
return {
"status": "error",
"error": f"Orderbook estimation failed: {str(e)[:200]}",
"symbol": symbol,
"fallback": True
}
async def synthesize_whale_data(
self,
chain: str,
token: str,
historical_pattern: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Infer whale movements from partial data
Uses on-chain analysis patterns
"""
try:
# Validate inputs
if not chain or not token:
return {
"status": "error",
"error": "Invalid chain or token",
"fallback": True
}
# Placeholder for whale data synthesis
# In production, this would use ML models trained on historical whale patterns
synthetic_movements = []
# Generate synthetic whale movement based on typical patterns
if historical_pattern:
# Use historical patterns to generate realistic movements
avg_movement = historical_pattern.get("avg_movement_size", 1000000)
frequency = historical_pattern.get("frequency_per_day", 5)
# Validate values
if not isinstance(avg_movement, (int, float)) or avg_movement <= 0:
avg_movement = 1000000
if not isinstance(frequency, int) or frequency <= 0:
frequency = 5
else:
# Default patterns
avg_movement = 1000000
frequency = 5
# Limit frequency to prevent excessive data
frequency = min(frequency, 10)
for i in range(frequency):
try:
movement = {
"timestamp": int(time.time()) - (i * 3600),
"from_address": f"0x{'0'*(40-len(str(i)))}{i}",
"to_address": "0x" + "0" * 40,
"amount": avg_movement * (0.8 + random.random() * 0.4),
"token": token,
"chain": chain,
"is_synthetic": True,
"confidence": 0.55
}
synthetic_movements.append(movement)
except Exception as e:
logger.warning(f"Error generating whale movement {i}: {e}")
continue
# Ensure we have at least some data
if not synthetic_movements:
# Fallback: create one minimal movement
synthetic_movements = [{
"timestamp": int(time.time()),
"from_address": "0x" + "0" * 40,
"to_address": "0x" + "0" * 40,
"amount": avg_movement,
"token": token,
"chain": chain,
"is_synthetic": True,
"confidence": 0.50
}]
return {
"status": "success",
"chain": chain,
"token": token,
"movements": synthetic_movements,
"is_synthetic": True,
"confidence": 0.55,
"method": "pattern_based_synthesis",
"metadata": {
"movement_count": len(synthetic_movements),
"total_volume": sum(m["amount"] for m in synthetic_movements)
}
}
except Exception as e:
logger.error(f"Whale data synthesis failed for {chain}/{token}: {e}", exc_info=True)
return {
"status": "error",
"error": f"Whale data synthesis failed: {str(e)[:200]}",
"chain": chain,
"token": token,
"fallback": True
}
async def analyze_trading_signal(
self,
symbol: str,
market_data: Dict[str, Any],
sentiment_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Generate trading signal using AI models
Combines price action, volume, and sentiment analysis
"""
# Use trading signal model if available - try multiple models
trading_model_keys = ["crypto_trading_lm", "crypto_trade_0"]
for model_key in trading_model_keys:
try:
if model_key in MODEL_SPECS:
# Prepare input text for model
text = f"Analyze {symbol}: "
if market_data:
price = market_data.get("price", 0)
change = market_data.get("percent_change_24h", 0)
volume = market_data.get("volume_24h", 0)
text += f"Price ${price:.2f}, Change {change:+.2f}%, Volume ${volume:,.0f}"
if sentiment_data:
sentiment = sentiment_data.get("label", "neutral")
text += f", Sentiment: {sentiment}"
# Call model
result = self.model_registry.call_model_safe(model_key, text)
if result["status"] == "success":
# Parse model output
model_output = result.get("data", {})
return {
"status": "success",
"symbol": symbol,
"signal": "hold", # Default
"confidence": 0.70,
"reasoning": model_output,
"is_ai_generated": True,
"model_used": model_key
}
except Exception as e:
logger.warning(f"Error in trading signal analysis with {model_key}: {e}")
continue # Try next model
# Fallback to rule-based signal
signal = "hold"
confidence = 0.60
if market_data:
change = market_data.get("percent_change_24h", 0)
volume_change = market_data.get("volume_change_24h", 0)
# Simple rules
if change > 5 and volume_change > 20:
signal = "buy"
confidence = 0.75
elif change < -5 and volume_change > 20:
signal = "sell"
confidence = 0.75
return {
"status": "success",
"symbol": symbol,
"signal": signal,
"confidence": confidence,
"reasoning": "Rule-based analysis",
"is_ai_generated": False,
"method": "fallback_rules"
}
# Global gap filling service instance
_gap_filler = GapFillingService()
def get_gap_filler() -> GapFillingService:
"""Get global gap filling service instance"""
return _gap_filler