File size: 8,359 Bytes
8b7b267 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
"""
OHLCV Service with Multi-Provider Fallback
Automatically switches between Binance, CoinGecko, and other providers
"""
import logging
from typing import Dict, List, Any, Optional
from fastapi import HTTPException
from .api_fallback_manager import get_fallback_manager
logger = logging.getLogger(__name__)
class OHLCVService:
"""Service for fetching OHLCV data with automatic fallback"""
def __init__(self):
self.manager = get_fallback_manager("OHLCV")
self._setup_providers()
def _setup_providers(self):
"""Setup OHLCV providers in priority order"""
# Priority 1: Binance (fastest, most reliable - but may have regional restrictions)
self.manager.add_provider(
name="Binance",
priority=1,
fetch_function=self._fetch_binance,
cooldown_seconds=180,
max_failures=3
)
# Priority 2: CoinGecko (reliable alternative, no geo-restrictions)
self.manager.add_provider(
name="CoinGecko",
priority=2,
fetch_function=self._fetch_coingecko,
cooldown_seconds=60,
max_failures=3
)
# Priority 3: HuggingFace Space (fallback)
self.manager.add_provider(
name="HuggingFace",
priority=3,
fetch_function=self._fetch_huggingface,
cooldown_seconds=300,
max_failures=5
)
# Priority 4: Mock/Demo data (always available)
self.manager.add_provider(
name="Demo",
priority=999,
fetch_function=self._fetch_demo,
cooldown_seconds=0,
max_failures=999 # Never fails
)
logger.info("✅ OHLCV Service initialized with 4 providers (Binance, CoinGecko, HuggingFace, Demo)")
async def _fetch_binance(self, symbol: str, timeframe: str, limit: int = 100) -> Dict:
"""Fetch from Binance API"""
try:
from backend.services.binance_client import BinanceClient
client = BinanceClient()
candles = await client.get_ohlcv(symbol, timeframe=timeframe, limit=limit)
return {
"symbol": symbol.upper(),
"timeframe": timeframe,
"interval": timeframe,
"limit": limit,
"count": len(candles),
"ohlcv": candles,
"source": "binance"
}
except HTTPException as e:
if e.status_code == 451:
logger.warning(f"⚠️ Binance access restricted (HTTP 451). Falling back to CoinGecko.")
else:
logger.error(f"Binance fetch failed: {e.detail}")
raise
except Exception as e:
logger.error(f"Binance fetch failed: {e}")
raise
async def _fetch_coingecko(self, symbol: str, timeframe: str, limit: int = 100) -> Dict:
"""Fetch from CoinGecko API"""
try:
from backend.services.coingecko_client import CoinGeckoClient
client = CoinGeckoClient()
# CoinGecko uses days, not limit
days = self._timeframe_to_days(timeframe, limit)
data = await client.get_ohlcv(symbol, days=days)
return {
"symbol": symbol.upper(),
"timeframe": timeframe,
"interval": timeframe,
"limit": limit,
"count": len(data.get("prices", [])),
"ohlcv": self._format_coingecko_data(data),
"source": "coingecko"
}
except Exception as e:
logger.error(f"CoinGecko fetch failed: {e}")
raise
def _timeframe_to_days(self, timeframe: str, limit: int) -> int:
"""Convert timeframe and limit to days for CoinGecko"""
# Map timeframes to approximate days
timeframe_hours = {
"1m": 1/60, "5m": 5/60, "15m": 15/60, "30m": 0.5,
"1h": 1, "4h": 4, "1d": 24, "1w": 168
}
hours = timeframe_hours.get(timeframe, 1)
days = max(1, int((hours * limit) / 24))
return min(days, 365) # CoinGecko max 365 days
def _format_coingecko_data(self, data: Dict) -> List[Dict]:
"""Format CoinGecko data to standard OHLCV format"""
candles = []
prices = data.get("prices", [])
for price_point in prices:
timestamp, price = price_point
candles.append({
"timestamp": int(timestamp),
"open": price,
"high": price * 1.01, # Approximate
"low": price * 0.99, # Approximate
"close": price,
"volume": 0 # CoinGecko doesn't provide volume in this endpoint
})
return candles
async def _fetch_huggingface(self, symbol: str, timeframe: str, limit: int = 100) -> Dict:
"""Fetch from HuggingFace Space"""
import httpx
import os
base_url = os.getenv("HF_SPACE_BASE_URL", "https://really-amin-datasourceforcryptocurrency.hf.space")
token = os.getenv("HF_API_TOKEN", "").strip()
headers = {"Authorization": f"Bearer {token}"} if token else {}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{base_url}/api/ohlcv/{symbol}",
params={"interval": timeframe, "limit": limit},
headers=headers,
timeout=15.0
)
response.raise_for_status()
return response.json()
async def _fetch_demo(self, symbol: str, timeframe: str, limit: int = 100) -> Dict:
"""Fetch demo/fallback data"""
import time
import random
# Generate realistic demo candles
base_price = 50000 if symbol.upper() == "BTC" else 3000
candles = []
for i in range(limit):
timestamp = int(time.time()) - (i * 3600) # 1 hour intervals
open_price = base_price + random.uniform(-1000, 1000)
close_price = open_price + random.uniform(-500, 500)
high_price = max(open_price, close_price) + random.uniform(0, 300)
low_price = min(open_price, close_price) - random.uniform(0, 300)
volume = random.uniform(1000, 10000)
candles.append({
"t": timestamp * 1000,
"o": round(open_price, 2),
"h": round(high_price, 2),
"l": round(low_price, 2),
"c": round(close_price, 2),
"v": round(volume, 2)
})
return {
"symbol": symbol.upper(),
"timeframe": timeframe,
"interval": timeframe,
"limit": limit,
"count": len(candles),
"ohlcv": candles[::-1], # Reverse to oldest first
"source": "demo",
"warning": "Using demo data - live data unavailable"
}
async def get_ohlcv(
self,
symbol: str,
timeframe: str = "1h",
limit: int = 100
) -> Dict[str, Any]:
"""
Get OHLCV data with automatic fallback
Args:
symbol: Trading symbol (e.g., "BTC", "ETH")
timeframe: Timeframe (e.g., "1h", "4h", "1d")
limit: Number of candles
Returns:
Dict with OHLCV data and metadata
"""
result = await self.manager.fetch_with_fallback(
symbol=symbol,
timeframe=timeframe,
limit=limit
)
if not result["success"]:
logger.error(f"All OHLCV providers failed for {symbol}")
return result
def get_status(self) -> Dict[str, Any]:
"""Get status of all OHLCV providers"""
return self.manager.get_status()
# Global singleton
_ohlcv_service: Optional[OHLCVService] = None
def get_ohlcv_service() -> OHLCVService:
"""Get or create the OHLCV service singleton"""
global _ohlcv_service
if _ohlcv_service is None:
_ohlcv_service = OHLCVService()
return _ohlcv_service
|