Datasourceforcryptocurrency-5 / api /hf_data_hub_endpoints.py
Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
#!/usr/bin/env python3
"""
HuggingFace Data Hub API Endpoints
Serve data FROM HuggingFace Datasets to clients
This API ensures all data comes from HuggingFace Datasets:
External APIs → Workers → HuggingFace Datasets → THIS API → Clients
"""
import os
import logging
from typing import List, Optional, Dict, Any
from datetime import datetime
from fastapi import APIRouter, HTTPException, Query, Depends
from pydantic import BaseModel, Field
# Import authentication
from api.hf_auth import verify_hf_token
try:
from datasets import load_dataset
DATASETS_AVAILABLE = True
except ImportError:
DATASETS_AVAILABLE = False
from utils.logger import setup_logger
logger = setup_logger("hf_data_hub_api")
# Create router
router = APIRouter(prefix="/api/hub", tags=["data-hub"])
# Response models
class MarketDataResponse(BaseModel):
"""Market data response model"""
symbol: str
price: float
market_cap: Optional[float] = None
volume_24h: Optional[float] = None
change_24h: Optional[float] = None
high_24h: Optional[float] = None
low_24h: Optional[float] = None
provider: str
timestamp: str
fetched_at: str
class OHLCDataResponse(BaseModel):
"""OHLC data response model"""
symbol: str
interval: str
timestamp: str
open: float
high: float
low: float
close: float
volume: float
provider: str
fetched_at: str
class DataHubStatus(BaseModel):
"""Data hub status response"""
status: str
message: str
market_dataset: Dict[str, Any]
ohlc_dataset: Dict[str, Any]
timestamp: str
# Configuration
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")
HF_USERNAME = os.getenv("HF_USERNAME", "crypto-data-hub")
MARKET_DATASET = f"{HF_USERNAME}/crypto-market-data"
OHLC_DATASET = f"{HF_USERNAME}/crypto-ohlc-data"
def _load_market_dataset():
"""Load market data dataset from HuggingFace"""
try:
if not DATASETS_AVAILABLE:
raise ImportError("datasets library not available")
logger.info(f"Loading market dataset from HuggingFace: {MARKET_DATASET}")
dataset = load_dataset(
MARKET_DATASET,
split="train",
token=HF_TOKEN
)
return dataset
except Exception as e:
logger.error(f"Error loading market dataset: {e}")
return None
def _load_ohlc_dataset():
"""Load OHLC dataset from HuggingFace"""
try:
if not DATASETS_AVAILABLE:
raise ImportError("datasets library not available")
logger.info(f"Loading OHLC dataset from HuggingFace: {OHLC_DATASET}")
dataset = load_dataset(
OHLC_DATASET,
split="train",
token=HF_TOKEN
)
return dataset
except Exception as e:
logger.error(f"Error loading OHLC dataset: {e}")
return None
@router.get(
"/status",
response_model=DataHubStatus,
summary="Data Hub Status",
description="Get status of HuggingFace Data Hub and available datasets"
)
async def get_hub_status():
"""
Get Data Hub status and dataset information
Returns information about available HuggingFace Datasets:
- Market data dataset (prices, volumes, market caps)
- OHLC dataset (candlestick data)
- Dataset sizes and last update times
This endpoint does NOT require authentication.
"""
try:
market_info = {"available": False, "records": 0, "error": None}
ohlc_info = {"available": False, "records": 0, "error": None}
# Check market dataset
try:
market_dataset = _load_market_dataset()
if market_dataset:
market_info = {
"available": True,
"records": len(market_dataset),
"columns": market_dataset.column_names,
"url": f"https://huggingface.co/datasets/{MARKET_DATASET}"
}
except Exception as e:
market_info["error"] = str(e)
# Check OHLC dataset
try:
ohlc_dataset = _load_ohlc_dataset()
if ohlc_dataset:
ohlc_info = {
"available": True,
"records": len(ohlc_dataset),
"columns": ohlc_dataset.column_names,
"url": f"https://huggingface.co/datasets/{OHLC_DATASET}"
}
except Exception as e:
ohlc_info["error"] = str(e)
return DataHubStatus(
status="healthy" if (market_info["available"] or ohlc_info["available"]) else "degraded",
message="Data Hub operational" if (market_info["available"] or ohlc_info["available"]) else "No datasets available",
market_dataset=market_info,
ohlc_dataset=ohlc_info,
timestamp=datetime.utcnow().isoformat() + "Z"
)
except Exception as e:
logger.error(f"Error getting hub status: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Error getting hub status: {str(e)}")
@router.get(
"/market",
response_model=List[MarketDataResponse],
summary="Get Market Data from HuggingFace",
description="Fetch real-time cryptocurrency market data FROM HuggingFace Datasets"
)
async def get_market_data_from_hub(
symbols: Optional[str] = Query(None, description="Comma-separated list of symbols (e.g., 'BTC,ETH')"),
limit: int = Query(100, ge=1, le=1000, description="Maximum number of records to return"),
_: dict = Depends(verify_hf_token)
):
"""
Get market data FROM HuggingFace Dataset
Data Flow:
HuggingFace Dataset → THIS API → Client
Authentication: Required (HF_TOKEN)
Query Parameters:
- symbols: Filter by specific symbols (comma-separated)
- limit: Maximum records to return (1-1000)
Returns:
List of market data records with prices, volumes, market caps, etc.
This endpoint ensures data is served FROM HuggingFace Datasets,
NOT from local cache or external APIs.
"""
try:
# Load dataset from HuggingFace
logger.info(f"Fetching market data FROM HuggingFace Dataset: {MARKET_DATASET}")
dataset = _load_market_dataset()
if not dataset:
raise HTTPException(
status_code=503,
detail="Market dataset not available on HuggingFace"
)
# Convert to pandas for filtering
df = dataset.to_pandas()
if df.empty:
raise HTTPException(
status_code=404,
detail="No market data available in HuggingFace Dataset"
)
# Filter by symbols if provided
if symbols:
symbol_list = [s.strip().upper() for s in symbols.split(",")]
df = df[df["symbol"].isin(symbol_list)]
# Sort by timestamp descending (most recent first)
if "timestamp" in df.columns:
df = df.sort_values("timestamp", ascending=False)
elif "fetched_at" in df.columns:
df = df.sort_values("fetched_at", ascending=False)
# Apply limit
df = df.head(limit)
# Convert to response model
results = df.to_dict("records")
logger.info(f"✅ Serving {len(results)} market records FROM HuggingFace Dataset")
return results
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching market data from HuggingFace: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Error fetching market data from HuggingFace: {str(e)}"
)
@router.get(
"/ohlc",
response_model=List[OHLCDataResponse],
summary="Get OHLC Data from HuggingFace",
description="Fetch cryptocurrency candlestick data FROM HuggingFace Datasets"
)
async def get_ohlc_data_from_hub(
symbol: str = Query(..., description="Trading pair symbol (e.g., 'BTCUSDT')"),
interval: str = Query("1h", description="Candle interval (e.g., '1h', '4h', '1d')"),
limit: int = Query(500, ge=1, le=5000, description="Maximum number of candles to return"),
_: dict = Depends(verify_hf_token)
):
"""
Get OHLC/candlestick data FROM HuggingFace Dataset
Data Flow:
HuggingFace Dataset → THIS API → Client
Authentication: Required (HF_TOKEN)
Query Parameters:
- symbol: Trading pair (e.g., 'BTCUSDT')
- interval: Candle interval ('1h', '4h', '1d')
- limit: Maximum candles to return (1-5000)
Returns:
List of OHLC candles with open, high, low, close, volume data
This endpoint ensures data is served FROM HuggingFace Datasets,
NOT from local cache or external APIs.
"""
try:
# Load dataset from HuggingFace
logger.info(f"Fetching OHLC data FROM HuggingFace Dataset: {OHLC_DATASET}")
dataset = _load_ohlc_dataset()
if not dataset:
raise HTTPException(
status_code=503,
detail="OHLC dataset not available on HuggingFace"
)
# Convert to pandas for filtering
df = dataset.to_pandas()
if df.empty:
raise HTTPException(
status_code=404,
detail="No OHLC data available in HuggingFace Dataset"
)
# Filter by symbol and interval
symbol_upper = symbol.upper()
df = df[(df["symbol"] == symbol_upper) & (df["interval"] == interval)]
if df.empty:
raise HTTPException(
status_code=404,
detail=f"No OHLC data for {symbol_upper} {interval} in HuggingFace Dataset"
)
# Sort by timestamp descending (most recent first)
if "timestamp" in df.columns:
df = df.sort_values("timestamp", ascending=False)
# Apply limit
df = df.head(limit)
# Convert to response model
results = df.to_dict("records")
logger.info(f"✅ Serving {len(results)} OHLC candles FROM HuggingFace Dataset")
return results
except HTTPException:
raise
except Exception as e:
logger.error(f"Error fetching OHLC data from HuggingFace: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Error fetching OHLC data from HuggingFace: {str(e)}"
)
@router.get(
"/dataset-info",
summary="Get Dataset Information",
description="Get detailed information about HuggingFace Datasets"
)
async def get_dataset_info(
dataset_type: str = Query("market", description="Dataset type: 'market' or 'ohlc'")
):
"""
Get detailed information about a specific HuggingFace Dataset
Query Parameters:
- dataset_type: 'market' or 'ohlc'
Returns:
Detailed dataset information including:
- Dataset name and URL
- Number of records
- Column names and types
- Last update time
- Dataset size
This endpoint does NOT require authentication.
"""
try:
if dataset_type == "market":
dataset_name = MARKET_DATASET
dataset = _load_market_dataset()
elif dataset_type == "ohlc":
dataset_name = OHLC_DATASET
dataset = _load_ohlc_dataset()
else:
raise HTTPException(
status_code=400,
detail="Invalid dataset_type. Must be 'market' or 'ohlc'"
)
if not dataset:
raise HTTPException(
status_code=404,
detail=f"Dataset not found: {dataset_name}"
)
# Get dataset info
df = dataset.to_pandas()
info = {
"name": dataset_name,
"url": f"https://huggingface.co/datasets/{dataset_name}",
"records": len(dataset),
"columns": dataset.column_names,
"features": str(dataset.features),
"size_mb": df.memory_usage(deep=True).sum() / 1024 / 1024,
"sample_records": df.head(3).to_dict("records") if not df.empty else []
}
# Add timestamp info if available
if "timestamp" in df.columns:
info["latest_timestamp"] = str(df["timestamp"].max())
info["oldest_timestamp"] = str(df["timestamp"].min())
elif "fetched_at" in df.columns:
info["latest_timestamp"] = str(df["fetched_at"].max())
info["oldest_timestamp"] = str(df["fetched_at"].min())
return info
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting dataset info: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Error getting dataset info: {str(e)}"
)
# Health check for Data Hub
@router.get(
"/health",
summary="Data Hub Health Check",
description="Check if Data Hub is operational and datasets are accessible"
)
async def data_hub_health():
"""
Health check for Data Hub
Returns:
- Status of HuggingFace connection
- Dataset availability
- Number of records in each dataset
- Last update times
This endpoint does NOT require authentication.
"""
try:
health = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat() + "Z",
"datasets": {}
}
# Check market dataset
try:
market_dataset = _load_market_dataset()
if market_dataset:
df = market_dataset.to_pandas()
health["datasets"]["market"] = {
"available": True,
"records": len(market_dataset),
"latest_update": str(df["fetched_at"].max()) if "fetched_at" in df.columns else None
}
else:
health["datasets"]["market"] = {"available": False, "error": "Could not load dataset"}
health["status"] = "degraded"
except Exception as e:
health["datasets"]["market"] = {"available": False, "error": str(e)}
health["status"] = "degraded"
# Check OHLC dataset
try:
ohlc_dataset = _load_ohlc_dataset()
if ohlc_dataset:
df = ohlc_dataset.to_pandas()
health["datasets"]["ohlc"] = {
"available": True,
"records": len(ohlc_dataset),
"latest_update": str(df["fetched_at"].max()) if "fetched_at" in df.columns else None
}
else:
health["datasets"]["ohlc"] = {"available": False, "error": "Could not load dataset"}
health["status"] = "degraded"
except Exception as e:
health["datasets"]["ohlc"] = {"available": False, "error": str(e)}
health["status"] = "degraded"
return health
except Exception as e:
logger.error(f"Error in health check: {e}", exc_info=True)
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.utcnow().isoformat() + "Z"
}