|
|
|
|
|
""" |
|
|
Direct Model Loader Service - NO PIPELINES |
|
|
Loads Hugging Face models directly using AutoModel and AutoTokenizer |
|
|
NO PIPELINE USAGE - Direct model inference only |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import os |
|
|
from typing import Dict, Any, Optional, List |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
try: |
|
|
import torch |
|
|
import numpy as np |
|
|
TORCH_AVAILABLE = True |
|
|
except ImportError: |
|
|
TORCH_AVAILABLE = False |
|
|
logger.warning("⚠️ Torch not available. Direct model loading will be disabled.") |
|
|
torch = None |
|
|
np = None |
|
|
|
|
|
|
|
|
try: |
|
|
from transformers import ( |
|
|
AutoTokenizer, |
|
|
AutoModelForSequenceClassification, |
|
|
AutoModelForCausalLM, |
|
|
BertTokenizer, |
|
|
BertForSequenceClassification |
|
|
) |
|
|
TRANSFORMERS_AVAILABLE = True |
|
|
except ImportError: |
|
|
TRANSFORMERS_AVAILABLE = False |
|
|
logger.warning("⚠️ Transformers library not available. Install with: pip install transformers torch") |
|
|
|
|
|
|
|
|
class DirectModelLoader: |
|
|
""" |
|
|
Direct Model Loader - NO PIPELINES |
|
|
Loads models directly and performs inference without using Hugging Face pipelines |
|
|
""" |
|
|
|
|
|
def __init__(self, cache_dir: Optional[str] = None): |
|
|
""" |
|
|
Initialize Direct Model Loader |
|
|
|
|
|
Args: |
|
|
cache_dir: Directory to cache models (default: ~/.cache/huggingface) |
|
|
""" |
|
|
if not TRANSFORMERS_AVAILABLE or not TORCH_AVAILABLE: |
|
|
logger.warning("⚠️ Direct Model Loader disabled: transformers or torch not available") |
|
|
self.enabled = False |
|
|
else: |
|
|
self.enabled = True |
|
|
|
|
|
self.cache_dir = cache_dir or os.path.expanduser("~/.cache/huggingface") |
|
|
self.models = {} |
|
|
self.tokenizers = {} |
|
|
self.device = "cuda" if (torch and torch.cuda.is_available()) else "cpu" |
|
|
|
|
|
logger.info(f"🚀 Direct Model Loader initialized") |
|
|
logger.info(f" Device: {self.device}") |
|
|
logger.info(f" Cache directory: {self.cache_dir}") |
|
|
|
|
|
|
|
|
|
|
|
self.model_configs = { |
|
|
"cryptobert_kk08": { |
|
|
"model_id": "kk08/CryptoBERT", |
|
|
"model_class": "BertForSequenceClassification", |
|
|
"task": "sentiment-analysis", |
|
|
"description": "CryptoBERT by KK08 for crypto sentiment", |
|
|
"loaded": False, |
|
|
"requires_auth": False, |
|
|
"priority": 1 |
|
|
}, |
|
|
"twitter_sentiment": { |
|
|
"model_id": "cardiffnlp/twitter-roberta-base-sentiment-latest", |
|
|
"model_class": "AutoModelForSequenceClassification", |
|
|
"task": "sentiment-analysis", |
|
|
"description": "Twitter RoBERTa for sentiment analysis", |
|
|
"loaded": False, |
|
|
"requires_auth": False, |
|
|
"priority": 2 |
|
|
}, |
|
|
"finbert": { |
|
|
"model_id": "ProsusAI/finbert", |
|
|
"model_class": "AutoModelForSequenceClassification", |
|
|
"task": "sentiment-analysis", |
|
|
"description": "FinBERT for financial sentiment", |
|
|
"loaded": False, |
|
|
"requires_auth": False, |
|
|
"priority": 3 |
|
|
}, |
|
|
"cryptobert_elkulako": { |
|
|
"model_id": "ElKulako/cryptobert", |
|
|
"model_class": "BertForSequenceClassification", |
|
|
"task": "sentiment-analysis", |
|
|
"description": "CryptoBERT by ElKulako for crypto sentiment", |
|
|
"loaded": False, |
|
|
"requires_auth": True, |
|
|
"priority": 4 |
|
|
} |
|
|
} |
|
|
|
|
|
def is_enabled(self) -> bool: |
|
|
"""Check if direct model loader is enabled""" |
|
|
return getattr(self, 'enabled', False) and TRANSFORMERS_AVAILABLE and TORCH_AVAILABLE |
|
|
|
|
|
async def load_model(self, model_key: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Load a specific model directly (NO PIPELINE) |
|
|
|
|
|
Args: |
|
|
model_key: Key of the model to load |
|
|
|
|
|
Returns: |
|
|
Status dict with model info |
|
|
""" |
|
|
if not self.is_enabled(): |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Direct model loader is disabled (transformers or torch not available)" |
|
|
} |
|
|
if model_key not in self.model_configs: |
|
|
raise ValueError(f"Unknown model: {model_key}") |
|
|
|
|
|
config = self.model_configs[model_key] |
|
|
|
|
|
|
|
|
if model_key in self.models and model_key in self.tokenizers: |
|
|
logger.info(f"✅ Model {model_key} already loaded") |
|
|
config["loaded"] = True |
|
|
return { |
|
|
"success": True, |
|
|
"model_key": model_key, |
|
|
"model_id": config["model_id"], |
|
|
"status": "already_loaded", |
|
|
"device": self.device |
|
|
} |
|
|
|
|
|
try: |
|
|
logger.info(f"📥 Loading model: {config['model_id']} (NO PIPELINE)") |
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
|
config["model_id"], |
|
|
cache_dir=self.cache_dir |
|
|
) |
|
|
|
|
|
|
|
|
if config["model_class"] == "BertForSequenceClassification": |
|
|
model = BertForSequenceClassification.from_pretrained( |
|
|
config["model_id"], |
|
|
cache_dir=self.cache_dir |
|
|
) |
|
|
elif config["model_class"] == "AutoModelForSequenceClassification": |
|
|
model = AutoModelForSequenceClassification.from_pretrained( |
|
|
config["model_id"], |
|
|
cache_dir=self.cache_dir |
|
|
) |
|
|
elif config["model_class"] == "AutoModelForCausalLM": |
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
config["model_id"], |
|
|
cache_dir=self.cache_dir |
|
|
) |
|
|
else: |
|
|
raise ValueError(f"Unknown model class: {config['model_class']}") |
|
|
|
|
|
|
|
|
model.to(self.device) |
|
|
model.eval() |
|
|
|
|
|
|
|
|
self.models[model_key] = model |
|
|
self.tokenizers[model_key] = tokenizer |
|
|
config["loaded"] = True |
|
|
|
|
|
logger.info(f"✅ Model loaded successfully: {config['model_id']}") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"model_key": model_key, |
|
|
"model_id": config["model_id"], |
|
|
"status": "loaded", |
|
|
"device": self.device, |
|
|
"task": config["task"] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to load model {model_key}: {e}") |
|
|
|
|
|
raise Exception(f"Failed to load model {model_key}: {str(e)}") |
|
|
|
|
|
async def load_all_models(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Load all configured models |
|
|
|
|
|
Returns: |
|
|
Status dict with all models |
|
|
""" |
|
|
results = [] |
|
|
success_count = 0 |
|
|
|
|
|
for model_key in self.model_configs.keys(): |
|
|
try: |
|
|
result = await self.load_model(model_key) |
|
|
results.append(result) |
|
|
if result["success"]: |
|
|
success_count += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to load {model_key}: {e}") |
|
|
results.append({ |
|
|
"success": False, |
|
|
"model_key": model_key, |
|
|
"error": str(e) |
|
|
}) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"total_models": len(self.model_configs), |
|
|
"loaded_models": success_count, |
|
|
"failed_models": len(self.model_configs) - success_count, |
|
|
"results": results, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
async def predict_sentiment( |
|
|
self, |
|
|
text: str, |
|
|
model_key: str = "cryptobert_elkulako", |
|
|
max_length: int = 512 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Predict sentiment directly (NO PIPELINE) |
|
|
|
|
|
Args: |
|
|
text: Input text |
|
|
model_key: Model to use |
|
|
max_length: Maximum sequence length |
|
|
|
|
|
Returns: |
|
|
Sentiment prediction |
|
|
""" |
|
|
|
|
|
if model_key not in self.models: |
|
|
await self.load_model(model_key) |
|
|
|
|
|
try: |
|
|
model = self.models[model_key] |
|
|
tokenizer = self.tokenizers[model_key] |
|
|
|
|
|
|
|
|
inputs = tokenizer( |
|
|
text, |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
padding=True, |
|
|
max_length=max_length |
|
|
) |
|
|
|
|
|
|
|
|
inputs = {k: v.to(self.device) for k, v in inputs.items()} |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
|
|
|
|
|
|
probs = torch.softmax(logits, dim=1) |
|
|
predicted_class = torch.argmax(probs, dim=1).item() |
|
|
confidence = probs[0][predicted_class].item() |
|
|
|
|
|
|
|
|
label_map = {0: "negative", 1: "neutral", 2: "positive"} |
|
|
|
|
|
|
|
|
if hasattr(model.config, "id2label"): |
|
|
label = model.config.id2label.get(predicted_class, label_map.get(predicted_class, "unknown")) |
|
|
else: |
|
|
label = label_map.get(predicted_class, "unknown") |
|
|
|
|
|
|
|
|
all_probs = { |
|
|
label_map.get(i, f"class_{i}"): probs[0][i].item() |
|
|
for i in range(probs.shape[1]) |
|
|
} |
|
|
|
|
|
logger.info(f"✅ Sentiment predicted: {label} (confidence: {confidence:.4f})") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"text": text[:100] + "..." if len(text) > 100 else text, |
|
|
"sentiment": label, |
|
|
"label": label, |
|
|
"score": confidence, |
|
|
"confidence": confidence, |
|
|
"all_scores": all_probs, |
|
|
"model": model_key, |
|
|
"model_id": self.model_configs[model_key]["model_id"], |
|
|
"inference_type": "direct_no_pipeline", |
|
|
"device": self.device, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Sentiment prediction failed: {e}") |
|
|
raise Exception(f"Sentiment prediction failed: {str(e)}") |
|
|
|
|
|
async def batch_predict_sentiment( |
|
|
self, |
|
|
texts: List[str], |
|
|
model_key: str = "cryptobert_elkulako", |
|
|
max_length: int = 512 |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Batch sentiment prediction (NO PIPELINE) |
|
|
|
|
|
Args: |
|
|
texts: List of input texts |
|
|
model_key: Model to use |
|
|
max_length: Maximum sequence length |
|
|
|
|
|
Returns: |
|
|
Batch predictions |
|
|
""" |
|
|
|
|
|
if model_key not in self.models: |
|
|
await self.load_model(model_key) |
|
|
|
|
|
try: |
|
|
model = self.models[model_key] |
|
|
tokenizer = self.tokenizers[model_key] |
|
|
|
|
|
|
|
|
inputs = tokenizer( |
|
|
texts, |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
padding=True, |
|
|
max_length=max_length |
|
|
) |
|
|
|
|
|
|
|
|
inputs = {k: v.to(self.device) for k, v in inputs.items()} |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
logits = outputs.logits |
|
|
|
|
|
|
|
|
probs = torch.softmax(logits, dim=1) |
|
|
predicted_classes = torch.argmax(probs, dim=1).cpu().numpy() |
|
|
confidences = probs.max(dim=1).values.cpu().numpy() |
|
|
|
|
|
|
|
|
label_map = {0: "negative", 1: "neutral", 2: "positive"} |
|
|
|
|
|
|
|
|
results = [] |
|
|
for i, text in enumerate(texts): |
|
|
predicted_class = predicted_classes[i] |
|
|
confidence = confidences[i] |
|
|
|
|
|
if hasattr(model.config, "id2label"): |
|
|
label = model.config.id2label.get(predicted_class, label_map.get(predicted_class, "unknown")) |
|
|
else: |
|
|
label = label_map.get(predicted_class, "unknown") |
|
|
|
|
|
results.append({ |
|
|
"text": text[:100] + "..." if len(text) > 100 else text, |
|
|
"sentiment": label, |
|
|
"label": label, |
|
|
"score": float(confidence), |
|
|
"confidence": float(confidence) |
|
|
}) |
|
|
|
|
|
logger.info(f"✅ Batch sentiment predicted for {len(texts)} texts") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"count": len(results), |
|
|
"results": results, |
|
|
"model": model_key, |
|
|
"model_id": self.model_configs[model_key]["model_id"], |
|
|
"inference_type": "direct_batch_no_pipeline", |
|
|
"device": self.device, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Batch sentiment prediction failed: {e}") |
|
|
raise Exception(f"Batch sentiment prediction failed: {str(e)}") |
|
|
|
|
|
def get_loaded_models(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get list of loaded models |
|
|
|
|
|
Returns: |
|
|
Dict with loaded models info |
|
|
""" |
|
|
models_info = [] |
|
|
for model_key, config in self.model_configs.items(): |
|
|
models_info.append({ |
|
|
"model_key": model_key, |
|
|
"model_id": config["model_id"], |
|
|
"task": config["task"], |
|
|
"description": config["description"], |
|
|
"loaded": model_key in self.models, |
|
|
"device": self.device if model_key in self.models else None |
|
|
}) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"total_configured": len(self.model_configs), |
|
|
"total_loaded": len(self.models), |
|
|
"device": self.device, |
|
|
"models": models_info, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
} |
|
|
|
|
|
def unload_model(self, model_key: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Unload a specific model from memory |
|
|
|
|
|
Args: |
|
|
model_key: Key of the model to unload |
|
|
|
|
|
Returns: |
|
|
Status dict |
|
|
""" |
|
|
if model_key not in self.models: |
|
|
return { |
|
|
"success": False, |
|
|
"model_key": model_key, |
|
|
"message": "Model not loaded" |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
del self.models[model_key] |
|
|
del self.tokenizers[model_key] |
|
|
|
|
|
|
|
|
self.model_configs[model_key]["loaded"] = False |
|
|
|
|
|
|
|
|
if self.device == "cuda": |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
logger.info(f"✅ Model unloaded: {model_key}") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"model_key": model_key, |
|
|
"message": "Model unloaded successfully" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to unload model {model_key}: {e}") |
|
|
return { |
|
|
"success": False, |
|
|
"model_key": model_key, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
direct_model_loader = DirectModelLoader() |
|
|
|
|
|
|
|
|
|
|
|
__all__ = ["DirectModelLoader", "direct_model_loader"] |
|
|
|