File size: 15,241 Bytes
8b7b267
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
#!/usr/bin/env python3
"""
HuggingFace Data Hub API Endpoints
Serve data FROM HuggingFace Datasets to clients

This API ensures all data comes from HuggingFace Datasets:
    External APIs → Workers → HuggingFace Datasets → THIS API → Clients
"""

import os
import logging
from typing import List, Optional, Dict, Any
from datetime import datetime

from fastapi import APIRouter, HTTPException, Query, Depends
from pydantic import BaseModel, Field

# Import authentication
from api.hf_auth import verify_hf_token

try:
    from datasets import load_dataset
    DATASETS_AVAILABLE = True
except ImportError:
    DATASETS_AVAILABLE = False

from utils.logger import setup_logger

logger = setup_logger("hf_data_hub_api")

# Create router
router = APIRouter(prefix="/api/hub", tags=["data-hub"])


# Response models
class MarketDataResponse(BaseModel):
    """Market data response model"""
    symbol: str
    price: float
    market_cap: Optional[float] = None
    volume_24h: Optional[float] = None
    change_24h: Optional[float] = None
    high_24h: Optional[float] = None
    low_24h: Optional[float] = None
    provider: str
    timestamp: str
    fetched_at: str


class OHLCDataResponse(BaseModel):
    """OHLC data response model"""
    symbol: str
    interval: str
    timestamp: str
    open: float
    high: float
    low: float
    close: float
    volume: float
    provider: str
    fetched_at: str


class DataHubStatus(BaseModel):
    """Data hub status response"""
    status: str
    message: str
    market_dataset: Dict[str, Any]
    ohlc_dataset: Dict[str, Any]
    timestamp: str


# Configuration
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HF_API_TOKEN")
HF_USERNAME = os.getenv("HF_USERNAME", "crypto-data-hub")
MARKET_DATASET = f"{HF_USERNAME}/crypto-market-data"
OHLC_DATASET = f"{HF_USERNAME}/crypto-ohlc-data"


def _load_market_dataset():
    """Load market data dataset from HuggingFace"""
    try:
        if not DATASETS_AVAILABLE:
            raise ImportError("datasets library not available")

        logger.info(f"Loading market dataset from HuggingFace: {MARKET_DATASET}")
        dataset = load_dataset(
            MARKET_DATASET,
            split="train",
            token=HF_TOKEN
        )
        return dataset

    except Exception as e:
        logger.error(f"Error loading market dataset: {e}")
        return None


def _load_ohlc_dataset():
    """Load OHLC dataset from HuggingFace"""
    try:
        if not DATASETS_AVAILABLE:
            raise ImportError("datasets library not available")

        logger.info(f"Loading OHLC dataset from HuggingFace: {OHLC_DATASET}")
        dataset = load_dataset(
            OHLC_DATASET,
            split="train",
            token=HF_TOKEN
        )
        return dataset

    except Exception as e:
        logger.error(f"Error loading OHLC dataset: {e}")
        return None


@router.get(
    "/status",
    response_model=DataHubStatus,
    summary="Data Hub Status",
    description="Get status of HuggingFace Data Hub and available datasets"
)
async def get_hub_status():
    """
    Get Data Hub status and dataset information

    Returns information about available HuggingFace Datasets:
    - Market data dataset (prices, volumes, market caps)
    - OHLC dataset (candlestick data)
    - Dataset sizes and last update times

    This endpoint does NOT require authentication.
    """
    try:
        market_info = {"available": False, "records": 0, "error": None}
        ohlc_info = {"available": False, "records": 0, "error": None}

        # Check market dataset
        try:
            market_dataset = _load_market_dataset()
            if market_dataset:
                market_info = {
                    "available": True,
                    "records": len(market_dataset),
                    "columns": market_dataset.column_names,
                    "url": f"https://huggingface.co/datasets/{MARKET_DATASET}"
                }
        except Exception as e:
            market_info["error"] = str(e)

        # Check OHLC dataset
        try:
            ohlc_dataset = _load_ohlc_dataset()
            if ohlc_dataset:
                ohlc_info = {
                    "available": True,
                    "records": len(ohlc_dataset),
                    "columns": ohlc_dataset.column_names,
                    "url": f"https://huggingface.co/datasets/{OHLC_DATASET}"
                }
        except Exception as e:
            ohlc_info["error"] = str(e)

        return DataHubStatus(
            status="healthy" if (market_info["available"] or ohlc_info["available"]) else "degraded",
            message="Data Hub operational" if (market_info["available"] or ohlc_info["available"]) else "No datasets available",
            market_dataset=market_info,
            ohlc_dataset=ohlc_info,
            timestamp=datetime.utcnow().isoformat() + "Z"
        )

    except Exception as e:
        logger.error(f"Error getting hub status: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=f"Error getting hub status: {str(e)}")


@router.get(
    "/market",
    response_model=List[MarketDataResponse],
    summary="Get Market Data from HuggingFace",
    description="Fetch real-time cryptocurrency market data FROM HuggingFace Datasets"
)
async def get_market_data_from_hub(
    symbols: Optional[str] = Query(None, description="Comma-separated list of symbols (e.g., 'BTC,ETH')"),
    limit: int = Query(100, ge=1, le=1000, description="Maximum number of records to return"),
    _: dict = Depends(verify_hf_token)
):
    """
    Get market data FROM HuggingFace Dataset

    Data Flow:
        HuggingFace Dataset → THIS API → Client

    Authentication: Required (HF_TOKEN)

    Query Parameters:
        - symbols: Filter by specific symbols (comma-separated)
        - limit: Maximum records to return (1-1000)

    Returns:
        List of market data records with prices, volumes, market caps, etc.

    This endpoint ensures data is served FROM HuggingFace Datasets,
    NOT from local cache or external APIs.
    """
    try:
        # Load dataset from HuggingFace
        logger.info(f"Fetching market data FROM HuggingFace Dataset: {MARKET_DATASET}")
        dataset = _load_market_dataset()

        if not dataset:
            raise HTTPException(
                status_code=503,
                detail="Market dataset not available on HuggingFace"
            )

        # Convert to pandas for filtering
        df = dataset.to_pandas()

        if df.empty:
            raise HTTPException(
                status_code=404,
                detail="No market data available in HuggingFace Dataset"
            )

        # Filter by symbols if provided
        if symbols:
            symbol_list = [s.strip().upper() for s in symbols.split(",")]
            df = df[df["symbol"].isin(symbol_list)]

        # Sort by timestamp descending (most recent first)
        if "timestamp" in df.columns:
            df = df.sort_values("timestamp", ascending=False)
        elif "fetched_at" in df.columns:
            df = df.sort_values("fetched_at", ascending=False)

        # Apply limit
        df = df.head(limit)

        # Convert to response model
        results = df.to_dict("records")

        logger.info(f"✅ Serving {len(results)} market records FROM HuggingFace Dataset")

        return results

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error fetching market data from HuggingFace: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Error fetching market data from HuggingFace: {str(e)}"
        )


@router.get(
    "/ohlc",
    response_model=List[OHLCDataResponse],
    summary="Get OHLC Data from HuggingFace",
    description="Fetch cryptocurrency candlestick data FROM HuggingFace Datasets"
)
async def get_ohlc_data_from_hub(
    symbol: str = Query(..., description="Trading pair symbol (e.g., 'BTCUSDT')"),
    interval: str = Query("1h", description="Candle interval (e.g., '1h', '4h', '1d')"),
    limit: int = Query(500, ge=1, le=5000, description="Maximum number of candles to return"),
    _: dict = Depends(verify_hf_token)
):
    """
    Get OHLC/candlestick data FROM HuggingFace Dataset

    Data Flow:
        HuggingFace Dataset → THIS API → Client

    Authentication: Required (HF_TOKEN)

    Query Parameters:
        - symbol: Trading pair (e.g., 'BTCUSDT')
        - interval: Candle interval ('1h', '4h', '1d')
        - limit: Maximum candles to return (1-5000)

    Returns:
        List of OHLC candles with open, high, low, close, volume data

    This endpoint ensures data is served FROM HuggingFace Datasets,
    NOT from local cache or external APIs.
    """
    try:
        # Load dataset from HuggingFace
        logger.info(f"Fetching OHLC data FROM HuggingFace Dataset: {OHLC_DATASET}")
        dataset = _load_ohlc_dataset()

        if not dataset:
            raise HTTPException(
                status_code=503,
                detail="OHLC dataset not available on HuggingFace"
            )

        # Convert to pandas for filtering
        df = dataset.to_pandas()

        if df.empty:
            raise HTTPException(
                status_code=404,
                detail="No OHLC data available in HuggingFace Dataset"
            )

        # Filter by symbol and interval
        symbol_upper = symbol.upper()
        df = df[(df["symbol"] == symbol_upper) & (df["interval"] == interval)]

        if df.empty:
            raise HTTPException(
                status_code=404,
                detail=f"No OHLC data for {symbol_upper} {interval} in HuggingFace Dataset"
            )

        # Sort by timestamp descending (most recent first)
        if "timestamp" in df.columns:
            df = df.sort_values("timestamp", ascending=False)

        # Apply limit
        df = df.head(limit)

        # Convert to response model
        results = df.to_dict("records")

        logger.info(f"✅ Serving {len(results)} OHLC candles FROM HuggingFace Dataset")

        return results

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error fetching OHLC data from HuggingFace: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Error fetching OHLC data from HuggingFace: {str(e)}"
        )


@router.get(
    "/dataset-info",
    summary="Get Dataset Information",
    description="Get detailed information about HuggingFace Datasets"
)
async def get_dataset_info(
    dataset_type: str = Query("market", description="Dataset type: 'market' or 'ohlc'")
):
    """
    Get detailed information about a specific HuggingFace Dataset

    Query Parameters:
        - dataset_type: 'market' or 'ohlc'

    Returns:
        Detailed dataset information including:
        - Dataset name and URL
        - Number of records
        - Column names and types
        - Last update time
        - Dataset size

    This endpoint does NOT require authentication.
    """
    try:
        if dataset_type == "market":
            dataset_name = MARKET_DATASET
            dataset = _load_market_dataset()
        elif dataset_type == "ohlc":
            dataset_name = OHLC_DATASET
            dataset = _load_ohlc_dataset()
        else:
            raise HTTPException(
                status_code=400,
                detail="Invalid dataset_type. Must be 'market' or 'ohlc'"
            )

        if not dataset:
            raise HTTPException(
                status_code=404,
                detail=f"Dataset not found: {dataset_name}"
            )

        # Get dataset info
        df = dataset.to_pandas()

        info = {
            "name": dataset_name,
            "url": f"https://huggingface.co/datasets/{dataset_name}",
            "records": len(dataset),
            "columns": dataset.column_names,
            "features": str(dataset.features),
            "size_mb": df.memory_usage(deep=True).sum() / 1024 / 1024,
            "sample_records": df.head(3).to_dict("records") if not df.empty else []
        }

        # Add timestamp info if available
        if "timestamp" in df.columns:
            info["latest_timestamp"] = str(df["timestamp"].max())
            info["oldest_timestamp"] = str(df["timestamp"].min())
        elif "fetched_at" in df.columns:
            info["latest_timestamp"] = str(df["fetched_at"].max())
            info["oldest_timestamp"] = str(df["fetched_at"].min())

        return info

    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error getting dataset info: {e}", exc_info=True)
        raise HTTPException(
            status_code=500,
            detail=f"Error getting dataset info: {str(e)}"
        )


# Health check for Data Hub
@router.get(
    "/health",
    summary="Data Hub Health Check",
    description="Check if Data Hub is operational and datasets are accessible"
)
async def data_hub_health():
    """
    Health check for Data Hub

    Returns:
        - Status of HuggingFace connection
        - Dataset availability
        - Number of records in each dataset
        - Last update times

    This endpoint does NOT require authentication.
    """
    try:
        health = {
            "status": "healthy",
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "datasets": {}
        }

        # Check market dataset
        try:
            market_dataset = _load_market_dataset()
            if market_dataset:
                df = market_dataset.to_pandas()
                health["datasets"]["market"] = {
                    "available": True,
                    "records": len(market_dataset),
                    "latest_update": str(df["fetched_at"].max()) if "fetched_at" in df.columns else None
                }
            else:
                health["datasets"]["market"] = {"available": False, "error": "Could not load dataset"}
                health["status"] = "degraded"
        except Exception as e:
            health["datasets"]["market"] = {"available": False, "error": str(e)}
            health["status"] = "degraded"

        # Check OHLC dataset
        try:
            ohlc_dataset = _load_ohlc_dataset()
            if ohlc_dataset:
                df = ohlc_dataset.to_pandas()
                health["datasets"]["ohlc"] = {
                    "available": True,
                    "records": len(ohlc_dataset),
                    "latest_update": str(df["fetched_at"].max()) if "fetched_at" in df.columns else None
                }
            else:
                health["datasets"]["ohlc"] = {"available": False, "error": "Could not load dataset"}
                health["status"] = "degraded"
        except Exception as e:
            health["datasets"]["ohlc"] = {"available": False, "error": str(e)}
            health["status"] = "degraded"

        return health

    except Exception as e:
        logger.error(f"Error in health check: {e}", exc_info=True)
        return {
            "status": "unhealthy",
            "error": str(e),
            "timestamp": datetime.utcnow().isoformat() + "Z"
        }