nimazasinich
Fix HF Space deployment: dependencies, port config, error handling (#109)
232dd4f
raw
history blame
16.9 kB
#!/usr/bin/env python3
"""
Direct Model Loader Service - NO PIPELINES
Loads Hugging Face models directly using AutoModel and AutoTokenizer
NO PIPELINE USAGE - Direct model inference only
"""
import logging
import os
from typing import Dict, Any, Optional, List
from datetime import datetime
from pathlib import Path
logger = logging.getLogger(__name__)
# Try to import torch (optional for HF Space deployment)
try:
import torch
import numpy as np
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
logger.warning("⚠️ Torch not available. Direct model loading will be disabled.")
torch = None
np = None
# Try to import transformers
try:
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
AutoModelForCausalLM,
BertTokenizer,
BertForSequenceClassification
)
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
logger.warning("⚠️ Transformers library not available. Install with: pip install transformers torch")
class DirectModelLoader:
"""
Direct Model Loader - NO PIPELINES
Loads models directly and performs inference without using Hugging Face pipelines
"""
def __init__(self, cache_dir: Optional[str] = None):
"""
Initialize Direct Model Loader
Args:
cache_dir: Directory to cache models (default: ~/.cache/huggingface)
"""
if not TRANSFORMERS_AVAILABLE or not TORCH_AVAILABLE:
logger.warning("⚠️ Direct Model Loader disabled: transformers or torch not available")
self.enabled = False
else:
self.enabled = True
self.cache_dir = cache_dir or os.path.expanduser("~/.cache/huggingface")
self.models = {}
self.tokenizers = {}
self.device = "cuda" if (torch and torch.cuda.is_available()) else "cpu"
logger.info(f"🚀 Direct Model Loader initialized")
logger.info(f" Device: {self.device}")
logger.info(f" Cache directory: {self.cache_dir}")
# Model configurations - DIRECT LOADING ONLY
# Ordered by preference (most reliable first)
self.model_configs = {
"cryptobert_kk08": {
"model_id": "kk08/CryptoBERT",
"model_class": "BertForSequenceClassification",
"task": "sentiment-analysis",
"description": "CryptoBERT by KK08 for crypto sentiment",
"loaded": False,
"requires_auth": False,
"priority": 1
},
"twitter_sentiment": {
"model_id": "cardiffnlp/twitter-roberta-base-sentiment-latest",
"model_class": "AutoModelForSequenceClassification",
"task": "sentiment-analysis",
"description": "Twitter RoBERTa for sentiment analysis",
"loaded": False,
"requires_auth": False,
"priority": 2
},
"finbert": {
"model_id": "ProsusAI/finbert",
"model_class": "AutoModelForSequenceClassification",
"task": "sentiment-analysis",
"description": "FinBERT for financial sentiment",
"loaded": False,
"requires_auth": False,
"priority": 3
},
"cryptobert_elkulako": {
"model_id": "ElKulako/cryptobert",
"model_class": "BertForSequenceClassification",
"task": "sentiment-analysis",
"description": "CryptoBERT by ElKulako for crypto sentiment",
"loaded": False,
"requires_auth": True,
"priority": 4
}
}
def is_enabled(self) -> bool:
"""Check if direct model loader is enabled"""
return getattr(self, 'enabled', False) and TRANSFORMERS_AVAILABLE and TORCH_AVAILABLE
async def load_model(self, model_key: str) -> Dict[str, Any]:
"""
Load a specific model directly (NO PIPELINE)
Args:
model_key: Key of the model to load
Returns:
Status dict with model info
"""
if not self.is_enabled():
return {
"success": False,
"error": "Direct model loader is disabled (transformers or torch not available)"
}
if model_key not in self.model_configs:
raise ValueError(f"Unknown model: {model_key}")
config = self.model_configs[model_key]
# Check if already loaded
if model_key in self.models and model_key in self.tokenizers:
logger.info(f"✅ Model {model_key} already loaded")
config["loaded"] = True
return {
"success": True,
"model_key": model_key,
"model_id": config["model_id"],
"status": "already_loaded",
"device": self.device
}
try:
logger.info(f"📥 Loading model: {config['model_id']} (NO PIPELINE)")
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(
config["model_id"],
cache_dir=self.cache_dir
)
# Load model based on class
if config["model_class"] == "BertForSequenceClassification":
model = BertForSequenceClassification.from_pretrained(
config["model_id"],
cache_dir=self.cache_dir
)
elif config["model_class"] == "AutoModelForSequenceClassification":
model = AutoModelForSequenceClassification.from_pretrained(
config["model_id"],
cache_dir=self.cache_dir
)
elif config["model_class"] == "AutoModelForCausalLM":
model = AutoModelForCausalLM.from_pretrained(
config["model_id"],
cache_dir=self.cache_dir
)
else:
raise ValueError(f"Unknown model class: {config['model_class']}")
# Move model to device
model.to(self.device)
model.eval() # Set to evaluation mode
# Store model and tokenizer
self.models[model_key] = model
self.tokenizers[model_key] = tokenizer
config["loaded"] = True
logger.info(f"✅ Model loaded successfully: {config['model_id']}")
return {
"success": True,
"model_key": model_key,
"model_id": config["model_id"],
"status": "loaded",
"device": self.device,
"task": config["task"]
}
except Exception as e:
logger.error(f"❌ Failed to load model {model_key}: {e}")
# Don't raise - allow fallback to other models
raise Exception(f"Failed to load model {model_key}: {str(e)}")
async def load_all_models(self) -> Dict[str, Any]:
"""
Load all configured models
Returns:
Status dict with all models
"""
results = []
success_count = 0
for model_key in self.model_configs.keys():
try:
result = await self.load_model(model_key)
results.append(result)
if result["success"]:
success_count += 1
except Exception as e:
logger.error(f"❌ Failed to load {model_key}: {e}")
results.append({
"success": False,
"model_key": model_key,
"error": str(e)
})
return {
"success": True,
"total_models": len(self.model_configs),
"loaded_models": success_count,
"failed_models": len(self.model_configs) - success_count,
"results": results,
"timestamp": datetime.utcnow().isoformat()
}
async def predict_sentiment(
self,
text: str,
model_key: str = "cryptobert_elkulako",
max_length: int = 512
) -> Dict[str, Any]:
"""
Predict sentiment directly (NO PIPELINE)
Args:
text: Input text
model_key: Model to use
max_length: Maximum sequence length
Returns:
Sentiment prediction
"""
# Ensure model is loaded
if model_key not in self.models:
await self.load_model(model_key)
try:
model = self.models[model_key]
tokenizer = self.tokenizers[model_key]
# Tokenize input - NO PIPELINE
inputs = tokenizer(
text,
return_tensors="pt",
truncation=True,
padding=True,
max_length=max_length
)
# Move inputs to device
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Forward pass - Direct inference
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
# Get predictions - Direct calculation
probs = torch.softmax(logits, dim=1)
predicted_class = torch.argmax(probs, dim=1).item()
confidence = probs[0][predicted_class].item()
# Map class to label (standard 3-class sentiment)
label_map = {0: "negative", 1: "neutral", 2: "positive"}
# Try to get actual labels from model config
if hasattr(model.config, "id2label"):
label = model.config.id2label.get(predicted_class, label_map.get(predicted_class, "unknown"))
else:
label = label_map.get(predicted_class, "unknown")
# Get all class probabilities
all_probs = {
label_map.get(i, f"class_{i}"): probs[0][i].item()
for i in range(probs.shape[1])
}
logger.info(f"✅ Sentiment predicted: {label} (confidence: {confidence:.4f})")
return {
"success": True,
"text": text[:100] + "..." if len(text) > 100 else text,
"sentiment": label,
"label": label,
"score": confidence,
"confidence": confidence,
"all_scores": all_probs,
"model": model_key,
"model_id": self.model_configs[model_key]["model_id"],
"inference_type": "direct_no_pipeline",
"device": self.device,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Sentiment prediction failed: {e}")
raise Exception(f"Sentiment prediction failed: {str(e)}")
async def batch_predict_sentiment(
self,
texts: List[str],
model_key: str = "cryptobert_elkulako",
max_length: int = 512
) -> Dict[str, Any]:
"""
Batch sentiment prediction (NO PIPELINE)
Args:
texts: List of input texts
model_key: Model to use
max_length: Maximum sequence length
Returns:
Batch predictions
"""
# Ensure model is loaded
if model_key not in self.models:
await self.load_model(model_key)
try:
model = self.models[model_key]
tokenizer = self.tokenizers[model_key]
# Tokenize all inputs - NO PIPELINE
inputs = tokenizer(
texts,
return_tensors="pt",
truncation=True,
padding=True,
max_length=max_length
)
# Move inputs to device
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Forward pass - Direct inference
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
# Get predictions - Direct calculation
probs = torch.softmax(logits, dim=1)
predicted_classes = torch.argmax(probs, dim=1).cpu().numpy()
confidences = probs.max(dim=1).values.cpu().numpy()
# Map classes to labels
label_map = {0: "negative", 1: "neutral", 2: "positive"}
# Build results
results = []
for i, text in enumerate(texts):
predicted_class = predicted_classes[i]
confidence = confidences[i]
if hasattr(model.config, "id2label"):
label = model.config.id2label.get(predicted_class, label_map.get(predicted_class, "unknown"))
else:
label = label_map.get(predicted_class, "unknown")
results.append({
"text": text[:100] + "..." if len(text) > 100 else text,
"sentiment": label,
"label": label,
"score": float(confidence),
"confidence": float(confidence)
})
logger.info(f"✅ Batch sentiment predicted for {len(texts)} texts")
return {
"success": True,
"count": len(results),
"results": results,
"model": model_key,
"model_id": self.model_configs[model_key]["model_id"],
"inference_type": "direct_batch_no_pipeline",
"device": self.device,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Batch sentiment prediction failed: {e}")
raise Exception(f"Batch sentiment prediction failed: {str(e)}")
def get_loaded_models(self) -> Dict[str, Any]:
"""
Get list of loaded models
Returns:
Dict with loaded models info
"""
models_info = []
for model_key, config in self.model_configs.items():
models_info.append({
"model_key": model_key,
"model_id": config["model_id"],
"task": config["task"],
"description": config["description"],
"loaded": model_key in self.models,
"device": self.device if model_key in self.models else None
})
return {
"success": True,
"total_configured": len(self.model_configs),
"total_loaded": len(self.models),
"device": self.device,
"models": models_info,
"timestamp": datetime.utcnow().isoformat()
}
def unload_model(self, model_key: str) -> Dict[str, Any]:
"""
Unload a specific model from memory
Args:
model_key: Key of the model to unload
Returns:
Status dict
"""
if model_key not in self.models:
return {
"success": False,
"model_key": model_key,
"message": "Model not loaded"
}
try:
# Remove model and tokenizer
del self.models[model_key]
del self.tokenizers[model_key]
# Update config
self.model_configs[model_key]["loaded"] = False
# Clear CUDA cache if using GPU
if self.device == "cuda":
torch.cuda.empty_cache()
logger.info(f"✅ Model unloaded: {model_key}")
return {
"success": True,
"model_key": model_key,
"message": "Model unloaded successfully"
}
except Exception as e:
logger.error(f"❌ Failed to unload model {model_key}: {e}")
return {
"success": False,
"model_key": model_key,
"error": str(e)
}
# Global instance
direct_model_loader = DirectModelLoader()
# Export
__all__ = ["DirectModelLoader", "direct_model_loader"]