#!/usr/bin/env python3 """ Direct Model Loader Service - NO PIPELINES Loads Hugging Face models directly using AutoModel and AutoTokenizer NO PIPELINE USAGE - Direct model inference only """ import logging import os from typing import Dict, Any, Optional, List from datetime import datetime from pathlib import Path logger = logging.getLogger(__name__) # Try to import torch (optional for HF Space deployment) try: import torch import numpy as np TORCH_AVAILABLE = True except ImportError: TORCH_AVAILABLE = False logger.warning("⚠️ Torch not available. Direct model loading will be disabled.") torch = None np = None # Try to import transformers try: from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM, BertTokenizer, BertForSequenceClassification ) TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False logger.warning("⚠️ Transformers library not available. Install with: pip install transformers torch") class DirectModelLoader: """ Direct Model Loader - NO PIPELINES Loads models directly and performs inference without using Hugging Face pipelines """ def __init__(self, cache_dir: Optional[str] = None): """ Initialize Direct Model Loader Args: cache_dir: Directory to cache models (default: ~/.cache/huggingface) """ if not TRANSFORMERS_AVAILABLE or not TORCH_AVAILABLE: logger.warning("⚠️ Direct Model Loader disabled: transformers or torch not available") self.enabled = False else: self.enabled = True self.cache_dir = cache_dir or os.path.expanduser("~/.cache/huggingface") self.models = {} self.tokenizers = {} self.device = "cuda" if (torch and torch.cuda.is_available()) else "cpu" logger.info(f"🚀 Direct Model Loader initialized") logger.info(f" Device: {self.device}") logger.info(f" Cache directory: {self.cache_dir}") # Model configurations - DIRECT LOADING ONLY # Ordered by preference (most reliable first) self.model_configs = { "cryptobert_kk08": { "model_id": "kk08/CryptoBERT", "model_class": "BertForSequenceClassification", "task": "sentiment-analysis", "description": "CryptoBERT by KK08 for crypto sentiment", "loaded": False, "requires_auth": False, "priority": 1 }, "twitter_sentiment": { "model_id": "cardiffnlp/twitter-roberta-base-sentiment-latest", "model_class": "AutoModelForSequenceClassification", "task": "sentiment-analysis", "description": "Twitter RoBERTa for sentiment analysis", "loaded": False, "requires_auth": False, "priority": 2 }, "finbert": { "model_id": "ProsusAI/finbert", "model_class": "AutoModelForSequenceClassification", "task": "sentiment-analysis", "description": "FinBERT for financial sentiment", "loaded": False, "requires_auth": False, "priority": 3 }, "cryptobert_elkulako": { "model_id": "ElKulako/cryptobert", "model_class": "BertForSequenceClassification", "task": "sentiment-analysis", "description": "CryptoBERT by ElKulako for crypto sentiment", "loaded": False, "requires_auth": True, "priority": 4 } } def is_enabled(self) -> bool: """Check if direct model loader is enabled""" return getattr(self, 'enabled', False) and TRANSFORMERS_AVAILABLE and TORCH_AVAILABLE async def load_model(self, model_key: str) -> Dict[str, Any]: """ Load a specific model directly (NO PIPELINE) Args: model_key: Key of the model to load Returns: Status dict with model info """ if not self.is_enabled(): return { "success": False, "error": "Direct model loader is disabled (transformers or torch not available)" } if model_key not in self.model_configs: raise ValueError(f"Unknown model: {model_key}") config = self.model_configs[model_key] # Check if already loaded if model_key in self.models and model_key in self.tokenizers: logger.info(f"✅ Model {model_key} already loaded") config["loaded"] = True return { "success": True, "model_key": model_key, "model_id": config["model_id"], "status": "already_loaded", "device": self.device } try: logger.info(f"📥 Loading model: {config['model_id']} (NO PIPELINE)") # Load tokenizer tokenizer = AutoTokenizer.from_pretrained( config["model_id"], cache_dir=self.cache_dir ) # Load model based on class if config["model_class"] == "BertForSequenceClassification": model = BertForSequenceClassification.from_pretrained( config["model_id"], cache_dir=self.cache_dir ) elif config["model_class"] == "AutoModelForSequenceClassification": model = AutoModelForSequenceClassification.from_pretrained( config["model_id"], cache_dir=self.cache_dir ) elif config["model_class"] == "AutoModelForCausalLM": model = AutoModelForCausalLM.from_pretrained( config["model_id"], cache_dir=self.cache_dir ) else: raise ValueError(f"Unknown model class: {config['model_class']}") # Move model to device model.to(self.device) model.eval() # Set to evaluation mode # Store model and tokenizer self.models[model_key] = model self.tokenizers[model_key] = tokenizer config["loaded"] = True logger.info(f"✅ Model loaded successfully: {config['model_id']}") return { "success": True, "model_key": model_key, "model_id": config["model_id"], "status": "loaded", "device": self.device, "task": config["task"] } except Exception as e: logger.error(f"❌ Failed to load model {model_key}: {e}") # Don't raise - allow fallback to other models raise Exception(f"Failed to load model {model_key}: {str(e)}") async def load_all_models(self) -> Dict[str, Any]: """ Load all configured models Returns: Status dict with all models """ results = [] success_count = 0 for model_key in self.model_configs.keys(): try: result = await self.load_model(model_key) results.append(result) if result["success"]: success_count += 1 except Exception as e: logger.error(f"❌ Failed to load {model_key}: {e}") results.append({ "success": False, "model_key": model_key, "error": str(e) }) return { "success": True, "total_models": len(self.model_configs), "loaded_models": success_count, "failed_models": len(self.model_configs) - success_count, "results": results, "timestamp": datetime.utcnow().isoformat() } async def predict_sentiment( self, text: str, model_key: str = "cryptobert_elkulako", max_length: int = 512 ) -> Dict[str, Any]: """ Predict sentiment directly (NO PIPELINE) Args: text: Input text model_key: Model to use max_length: Maximum sequence length Returns: Sentiment prediction """ # Ensure model is loaded if model_key not in self.models: await self.load_model(model_key) try: model = self.models[model_key] tokenizer = self.tokenizers[model_key] # Tokenize input - NO PIPELINE inputs = tokenizer( text, return_tensors="pt", truncation=True, padding=True, max_length=max_length ) # Move inputs to device inputs = {k: v.to(self.device) for k, v in inputs.items()} # Forward pass - Direct inference with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits # Get predictions - Direct calculation probs = torch.softmax(logits, dim=1) predicted_class = torch.argmax(probs, dim=1).item() confidence = probs[0][predicted_class].item() # Map class to label (standard 3-class sentiment) label_map = {0: "negative", 1: "neutral", 2: "positive"} # Try to get actual labels from model config if hasattr(model.config, "id2label"): label = model.config.id2label.get(predicted_class, label_map.get(predicted_class, "unknown")) else: label = label_map.get(predicted_class, "unknown") # Get all class probabilities all_probs = { label_map.get(i, f"class_{i}"): probs[0][i].item() for i in range(probs.shape[1]) } logger.info(f"✅ Sentiment predicted: {label} (confidence: {confidence:.4f})") return { "success": True, "text": text[:100] + "..." if len(text) > 100 else text, "sentiment": label, "label": label, "score": confidence, "confidence": confidence, "all_scores": all_probs, "model": model_key, "model_id": self.model_configs[model_key]["model_id"], "inference_type": "direct_no_pipeline", "device": self.device, "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ Sentiment prediction failed: {e}") raise Exception(f"Sentiment prediction failed: {str(e)}") async def batch_predict_sentiment( self, texts: List[str], model_key: str = "cryptobert_elkulako", max_length: int = 512 ) -> Dict[str, Any]: """ Batch sentiment prediction (NO PIPELINE) Args: texts: List of input texts model_key: Model to use max_length: Maximum sequence length Returns: Batch predictions """ # Ensure model is loaded if model_key not in self.models: await self.load_model(model_key) try: model = self.models[model_key] tokenizer = self.tokenizers[model_key] # Tokenize all inputs - NO PIPELINE inputs = tokenizer( texts, return_tensors="pt", truncation=True, padding=True, max_length=max_length ) # Move inputs to device inputs = {k: v.to(self.device) for k, v in inputs.items()} # Forward pass - Direct inference with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits # Get predictions - Direct calculation probs = torch.softmax(logits, dim=1) predicted_classes = torch.argmax(probs, dim=1).cpu().numpy() confidences = probs.max(dim=1).values.cpu().numpy() # Map classes to labels label_map = {0: "negative", 1: "neutral", 2: "positive"} # Build results results = [] for i, text in enumerate(texts): predicted_class = predicted_classes[i] confidence = confidences[i] if hasattr(model.config, "id2label"): label = model.config.id2label.get(predicted_class, label_map.get(predicted_class, "unknown")) else: label = label_map.get(predicted_class, "unknown") results.append({ "text": text[:100] + "..." if len(text) > 100 else text, "sentiment": label, "label": label, "score": float(confidence), "confidence": float(confidence) }) logger.info(f"✅ Batch sentiment predicted for {len(texts)} texts") return { "success": True, "count": len(results), "results": results, "model": model_key, "model_id": self.model_configs[model_key]["model_id"], "inference_type": "direct_batch_no_pipeline", "device": self.device, "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ Batch sentiment prediction failed: {e}") raise Exception(f"Batch sentiment prediction failed: {str(e)}") def get_loaded_models(self) -> Dict[str, Any]: """ Get list of loaded models Returns: Dict with loaded models info """ models_info = [] for model_key, config in self.model_configs.items(): models_info.append({ "model_key": model_key, "model_id": config["model_id"], "task": config["task"], "description": config["description"], "loaded": model_key in self.models, "device": self.device if model_key in self.models else None }) return { "success": True, "total_configured": len(self.model_configs), "total_loaded": len(self.models), "device": self.device, "models": models_info, "timestamp": datetime.utcnow().isoformat() } def unload_model(self, model_key: str) -> Dict[str, Any]: """ Unload a specific model from memory Args: model_key: Key of the model to unload Returns: Status dict """ if model_key not in self.models: return { "success": False, "model_key": model_key, "message": "Model not loaded" } try: # Remove model and tokenizer del self.models[model_key] del self.tokenizers[model_key] # Update config self.model_configs[model_key]["loaded"] = False # Clear CUDA cache if using GPU if self.device == "cuda": torch.cuda.empty_cache() logger.info(f"✅ Model unloaded: {model_key}") return { "success": True, "model_key": model_key, "message": "Model unloaded successfully" } except Exception as e: logger.error(f"❌ Failed to unload model {model_key}: {e}") return { "success": False, "model_key": model_key, "error": str(e) } # Global instance direct_model_loader = DirectModelLoader() # Export __all__ = ["DirectModelLoader", "direct_model_loader"]