File size: 19,085 Bytes
8b7b267
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
#!/usr/bin/env python3
"""
Market API Router - Implements cryptocurrency market endpoints
Handles GET /api/market/price, GET /api/market/ohlc, POST /api/sentiment/analyze, and WebSocket /ws
"""

from fastapi import APIRouter, HTTPException, Query, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from datetime import datetime
import logging
import json
import asyncio
import time

# Import services
from backend.services.coingecko_client import coingecko_client
from backend.services.binance_client import BinanceClient
from backend.services.ai_service_unified import UnifiedAIService
from backend.services.market_data_aggregator import market_data_aggregator
from backend.services.sentiment_aggregator import sentiment_aggregator
from backend.services.hf_dataset_aggregator import hf_dataset_aggregator

logger = logging.getLogger(__name__)

router = APIRouter(tags=["Market API"])

# WebSocket connection manager
class WebSocketManager:
    """Manages WebSocket connections and subscriptions"""
    
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self.subscriptions: Dict[str, List[str]] = {}  # client_id -> [symbols]
        self.price_streams: Dict[str, asyncio.Task] = {}
    
    async def connect(self, websocket: WebSocket, client_id: str):
        """Accept WebSocket connection"""
        await websocket.accept()
        self.active_connections[client_id] = websocket
        self.subscriptions[client_id] = []
        logger.info(f"WebSocket client {client_id} connected")
    
    async def disconnect(self, client_id: str):
        """Disconnect WebSocket client"""
        if client_id in self.active_connections:
            del self.active_connections[client_id]
        if client_id in self.subscriptions:
            del self.subscriptions[client_id]
        if client_id in self.price_streams:
            self.price_streams[client_id].cancel()
            del self.price_streams[client_id]
        logger.info(f"WebSocket client {client_id} disconnected")
    
    async def subscribe(self, client_id: str, symbol: str):
        """Subscribe client to symbol updates"""
        if client_id not in self.subscriptions:
            self.subscriptions[client_id] = []
        if symbol.upper() not in self.subscriptions[client_id]:
            self.subscriptions[client_id].append(symbol.upper())
            logger.info(f"Client {client_id} subscribed to {symbol.upper()}")
    
    async def send_message(self, client_id: str, message: Dict[str, Any]):
        """Send message to specific client"""
        if client_id in self.active_connections:
            try:
                await self.active_connections[client_id].send_json(message)
            except Exception as e:
                logger.error(f"Error sending message to {client_id}: {e}")
                await self.disconnect(client_id)
    
    async def broadcast_to_subscribers(self, symbol: str, data: Dict[str, Any]):
        """Broadcast data to all clients subscribed to symbol"""
        symbol_upper = symbol.upper()
        for client_id, symbols in self.subscriptions.items():
            if symbol_upper in symbols:
                await self.send_message(client_id, data)

# Global WebSocket manager instance
ws_manager = WebSocketManager()

# Binance client instance
binance_client = BinanceClient()

# AI service instance
ai_service = UnifiedAIService()


# ============================================================================
# GET /api/market/price
# ============================================================================

@router.get("/api/market/price")
async def get_market_price(
    symbol: str = Query(..., description="Cryptocurrency symbol (e.g., BTC, ETH)")
):
    """
    Fetch the current market price of a specific cryptocurrency.
    Uses ALL free market data providers with intelligent fallback:
    CoinGecko, CoinPaprika, CoinCap, Binance, CoinLore, Messari, CoinStats
    
    Returns:
        - If symbol is valid: current price with timestamp
        - If symbol is invalid: 404 error
    """
    try:
        symbol_upper = symbol.upper()
        
        # Use market data aggregator with automatic fallback to ALL free providers
        price_data = await market_data_aggregator.get_price(symbol_upper)
        
        return {
            "symbol": price_data.get("symbol", symbol_upper),
            "price": price_data.get("price", 0),
            "source": price_data.get("source", "unknown"),
            "timestamp": price_data.get("timestamp", int(time.time() * 1000)) // 1000
        }
    
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error fetching price for {symbol}: {e}")
        raise HTTPException(
            status_code=502,
            detail=f"Error fetching price data: {str(e)}"
        )


# ============================================================================
# GET /api/market/ohlc
# ============================================================================

@router.get("/api/market/ohlc")
async def get_market_ohlc(
    symbol: str = Query(..., description="Cryptocurrency symbol (e.g., BTC, ETH)"),
    timeframe: str = Query("1h", description="Timeframe (1h, 4h, 1d)")
):
    """
    Fetch historical OHLC (Open, High, Low, Close) data for a cryptocurrency.
    Uses multiple sources with fallback:
    1. Binance Public API (real-time)
    2. HuggingFace Datasets (linxy/CryptoCoin - 26 symbols)
    3. HuggingFace Datasets (WinkingFace/CryptoLM - BTC, ETH, SOL, XRP)
    
    Returns:
        - If symbol and timeframe are valid: OHLC data array
        - If invalid: 404 error
    """
    try:
        symbol_upper = symbol.upper()
        
        # Validate timeframe
        valid_timeframes = ["1m", "5m", "15m", "30m", "1h", "4h", "1d", "1w"]
        if timeframe not in valid_timeframes:
            raise HTTPException(
                status_code=400,
                detail=f"Invalid timeframe '{timeframe}'. Valid timeframes: {', '.join(valid_timeframes)}"
            )
        
        # Try Binance first (real-time data)
        try:
            ohlcv_data = await binance_client.get_ohlcv(symbol_upper, timeframe, limit=100)
            
            if ohlcv_data and len(ohlcv_data) > 0:
                # Format response
                ohlc_list = []
                for item in ohlcv_data:
                    ohlc_list.append({
                        "open": item.get("open", 0),
                        "high": item.get("high", 0),
                        "low": item.get("low", 0),
                        "close": item.get("close", 0),
                        "timestamp": item.get("timestamp", int(time.time()))
                    })
                
                logger.info(f"✅ Binance: Fetched OHLC for {symbol_upper}/{timeframe}")
                return {
                    "symbol": symbol_upper,
                    "timeframe": timeframe,
                    "ohlc": ohlc_list,
                    "source": "binance"
                }
        except Exception as e:
            logger.warning(f"⚠️ Binance failed for {symbol_upper}/{timeframe}: {e}")
        
        # Fallback to HuggingFace Datasets (historical data)
        try:
            hf_ohlcv_data = await hf_dataset_aggregator.get_ohlcv(symbol_upper, timeframe, limit=100)
            
            if hf_ohlcv_data and len(hf_ohlcv_data) > 0:
                # Format response
                ohlc_list = []
                for item in hf_ohlcv_data:
                    ohlc_list.append({
                        "open": item.get("open", 0),
                        "high": item.get("high", 0),
                        "low": item.get("low", 0),
                        "close": item.get("close", 0),
                        "timestamp": item.get("timestamp", int(time.time()))
                    })
                
                logger.info(f"✅ HuggingFace Datasets: Fetched OHLC for {symbol_upper}/{timeframe}")
                return {
                    "symbol": symbol_upper,
                    "timeframe": timeframe,
                    "ohlc": ohlc_list,
                    "source": "huggingface"
                }
        except Exception as e:
            logger.warning(f"⚠️ HuggingFace Datasets failed for {symbol_upper}/{timeframe}: {e}")
        
        # No data found from any source
        raise HTTPException(
            status_code=404,
            detail=f"No OHLC data found for symbol '{symbol}' with timeframe '{timeframe}' from any source (Binance, HuggingFace)"
        )
    
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error fetching OHLC data: {e}")
        raise HTTPException(
            status_code=502,
            detail=f"Error fetching OHLC data: {str(e)}"
        )


# ============================================================================
# POST /api/sentiment/analyze
# ============================================================================

class SentimentAnalyzeRequest(BaseModel):
    """Request model for sentiment analysis"""
    text: str = Field(..., description="Text to analyze for sentiment", min_length=1)


@router.post("/api/sentiment/analyze")
async def analyze_sentiment(request: SentimentAnalyzeRequest):
    """
    Analyze the sentiment of a given text (Bullish, Bearish, Neutral).
    
    Returns:
        - If text is valid: sentiment analysis result
        - If text is missing or invalid: 400 error
    """
    try:
        if not request.text or len(request.text.strip()) == 0:
            raise HTTPException(
                status_code=400,
                detail="Text parameter is required and cannot be empty"
            )
        
        # Use AI service for sentiment analysis
        try:
            result = await ai_service.analyze_sentiment(
                text=request.text,
                category="crypto",
                use_ensemble=True
            )
            
            # Map sentiment to required format
            label = result.get("label", "neutral").lower()
            confidence = result.get("confidence", 0.5)
            
            # Map label to sentiment
            if "bullish" in label or "positive" in label:
                sentiment = "Bullish"
                score = confidence if confidence > 0.5 else 0.6
            elif "bearish" in label or "negative" in label:
                sentiment = "Bearish"
                score = 1 - confidence if confidence < 0.5 else 0.4
            else:
                sentiment = "Neutral"
                score = 0.5
            
            return {
                "sentiment": sentiment,
                "score": score,
                "confidence": confidence
            }
        
        except Exception as e:
            logger.error(f"Error analyzing sentiment: {e}")
            # Fallback to simple keyword-based analysis
            text_lower = request.text.lower()
            positive_words = ['bullish', 'buy', 'moon', 'pump', 'up', 'gain', 'profit', 'good', 'great', 'strong']
            negative_words = ['bearish', 'sell', 'dump', 'down', 'loss', 'crash', 'bad', 'fear', 'weak', 'drop']
            
            pos_count = sum(1 for word in positive_words if word in text_lower)
            neg_count = sum(1 for word in negative_words if word in text_lower)
            
            if pos_count > neg_count:
                sentiment = "Bullish"
            elif neg_count > pos_count:
                sentiment = "Bearish"
            else:
                sentiment = "Neutral"
            
            return {
                "sentiment": sentiment,
                "score": 0.65 if sentiment == "Bullish" else (0.35 if sentiment == "Bearish" else 0.5),
                "confidence": 0.6
            }
    
    except HTTPException:
        raise
    except Exception as e:
        logger.error(f"Error in sentiment analysis: {e}")
        raise HTTPException(
            status_code=502,
            detail=f"Error analyzing sentiment: {str(e)}"
        )


# ============================================================================
# WebSocket /ws
# ============================================================================

async def stream_price_updates(client_id: str, symbol: str):
    """Stream price updates for a subscribed symbol"""
    symbol_upper = symbol.upper()
    
    while client_id in ws_manager.active_connections:
        try:
            # Get current price
            try:
                market_data = await coingecko_client.get_market_prices(symbols=[symbol_upper], limit=1)
                if market_data and len(market_data) > 0:
                    coin = market_data[0]
                    price = coin.get("price", 0)
                else:
                    # Fallback to Binance
                    ticker = await binance_client.get_ticker(f"{symbol_upper}USDT")
                    price = float(ticker.get("lastPrice", 0)) if ticker else 0
            except Exception as e:
                logger.warning(f"Error fetching price for {symbol_upper}: {e}")
                price = 0
            
            # Send update to client
            await ws_manager.send_message(client_id, {
                "symbol": symbol_upper,
                "price": price,
                "timestamp": int(time.time())
            })
            
            # Wait 5 seconds before next update
            await asyncio.sleep(5)
        
        except asyncio.CancelledError:
            break
        except Exception as e:
            logger.error(f"Error in price stream for {symbol_upper}: {e}")
            await asyncio.sleep(5)


@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """
    WebSocket endpoint for real-time cryptocurrency data updates.
    
    Connection:
    - Clients connect to receive real-time data
    - Send subscription messages to subscribe to specific symbols
    
    Subscription Message:
    {
        "type": "subscribe",
        "symbol": "BTC"
    }
    
    Unsubscribe Message:
    {
        "type": "unsubscribe",
        "symbol": "BTC"
    }
    
    Ping Message:
    {
        "type": "ping"
    }
    """
    client_id = f"client_{int(time.time() * 1000)}_{id(websocket)}"
    
    try:
        await ws_manager.connect(websocket, client_id)
        
        # Send welcome message
        await websocket.send_json({
            "type": "connected",
            "client_id": client_id,
            "message": "Connected to cryptocurrency data WebSocket",
            "timestamp": int(time.time())
        })
        
        # Handle incoming messages
        while True:
            try:
                # Receive message with timeout
                data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
                
                try:
                    message = json.loads(data)
                    msg_type = message.get("type", "").lower()
                    
                    if msg_type == "subscribe":
                        symbol = message.get("symbol", "").upper()
                        if not symbol:
                            await websocket.send_json({
                                "type": "error",
                                "error": "Symbol is required for subscription",
                                "timestamp": int(time.time())
                            })
                            continue
                        
                        await ws_manager.subscribe(client_id, symbol)
                        
                        # Start price streaming task if not already running
                        task_key = f"{client_id}_{symbol}"
                        if task_key not in ws_manager.price_streams:
                            task = asyncio.create_task(stream_price_updates(client_id, symbol))
                            ws_manager.price_streams[task_key] = task
                        
                        await websocket.send_json({
                            "type": "subscribed",
                            "symbol": symbol,
                            "message": f"Subscribed to {symbol} updates",
                            "timestamp": int(time.time())
                        })
                    
                    elif msg_type == "unsubscribe":
                        symbol = message.get("symbol", "").upper()
                        if symbol in ws_manager.subscriptions.get(client_id, []):
                            ws_manager.subscriptions[client_id].remove(symbol)
                            task_key = f"{client_id}_{symbol}"
                            if task_key in ws_manager.price_streams:
                                ws_manager.price_streams[task_key].cancel()
                                del ws_manager.price_streams[task_key]
                        
                        await websocket.send_json({
                            "type": "unsubscribed",
                            "symbol": symbol,
                            "message": f"Unsubscribed from {symbol} updates",
                            "timestamp": int(time.time())
                        })
                    
                    elif msg_type == "ping":
                        await websocket.send_json({
                            "type": "pong",
                            "timestamp": int(time.time())
                        })
                    
                    else:
                        await websocket.send_json({
                            "type": "error",
                            "error": f"Unknown message type: {msg_type}",
                            "timestamp": int(time.time())
                        })
                
                except json.JSONDecodeError:
                    await websocket.send_json({
                        "type": "error",
                        "error": "Invalid JSON format",
                        "timestamp": int(time.time())
                    })
            
            except asyncio.TimeoutError:
                # Send heartbeat
                await websocket.send_json({
                    "type": "heartbeat",
                    "timestamp": int(time.time()),
                    "status": "alive"
                })
    
    except WebSocketDisconnect:
        logger.info(f"WebSocket client {client_id} disconnected normally")
        await ws_manager.disconnect(client_id)
    
    except Exception as e:
        logger.error(f"WebSocket error for {client_id}: {e}", exc_info=True)
        try:
            await websocket.send_json({
                "type": "error",
                "error": f"Server error: {str(e)}",
                "timestamp": int(time.time())
            })
        except:
            pass
        await ws_manager.disconnect(client_id)
    
    finally:
        await ws_manager.disconnect(client_id)