Datasourceforcryptocurrency-5 / patches /provider_rotation.patch
nimazasinich
Replace mock data with real (#115)
8ff9278
raw
history blame
43.1 kB
diff --git a/api/ws_data_broadcaster.py b/api/ws_data_broadcaster.py
index a4ee37a..1b9888e 100644
--- a/api/ws_data_broadcaster.py
+++ b/api/ws_data_broadcaster.py
@@ -1,23 +1,18 @@
-"""
-WebSocket Data Broadcaster
-Broadcasts real-time cryptocurrency data from database to connected clients
-"""
-
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any
-from database.db_manager import db_manager
+from backend.orchestration.provider_manager import provider_manager
from backend.services.ws_service_manager import ws_manager, ServiceType
from utils.logger import setup_logger
logger = setup_logger("ws_data_broadcaster")
-
class DataBroadcaster:
"""
Broadcasts cryptocurrency data updates to WebSocket clients
+ using the Provider Orchestrator for data fetching.
"""
def __init__(self):
@@ -37,7 +32,6 @@ class DataBroadcaster:
self.broadcast_market_data(),
self.broadcast_news(),
self.broadcast_sentiment(),
- self.broadcast_whales(),
self.broadcast_gas_prices()
]
@@ -59,25 +53,49 @@ class DataBroadcaster:
while self.is_running:
try:
- prices = db_manager.get_latest_prices(limit=50)
-
- if prices:
+ # Use Orchestrator to fetch market data
+ # Using 30s TTL to prevent provider spam, but broadcast often
+ response = await provider_manager.fetch_data(
+ "market",
+ params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"},
+ use_cache=True,
+ ttl=10 # Short TTL for live prices if provider allows
+ )
+
+ if response["success"] and response["data"]:
+ coins = response["data"]
+
# Format data for broadcast
+ prices = {}
+ price_changes = {}
+ volumes = {}
+ market_caps = {}
+
+ for coin in coins:
+ symbol = coin.get("symbol", "").upper()
+ prices[symbol] = coin.get("current_price")
+ price_changes[symbol] = coin.get("price_change_percentage_24h")
+ volumes[symbol] = coin.get("total_volume")
+ market_caps[symbol] = coin.get("market_cap")
+
data = {
"type": "market_data",
"data": {
- "prices": {p.symbol: p.price_usd for p in prices},
- "volumes": {p.symbol: p.volume_24h for p in prices if p.volume_24h},
- "market_caps": {p.symbol: p.market_cap for p in prices if p.market_cap},
- "price_changes": {p.symbol: p.price_change_24h for p in prices if p.price_change_24h}
+ "prices": prices,
+ "volumes": volumes,
+ "market_caps": market_caps,
+ "price_changes": price_changes
},
- "count": len(prices),
- "timestamp": datetime.utcnow().isoformat()
+ "count": len(coins),
+ "timestamp": datetime.utcnow().isoformat(),
+ "source": response["source"]
}
+ # Diff check could be here (optimization)
+
# Broadcast to subscribed clients
await ws_manager.broadcast_to_service(ServiceType.MARKET_DATA, data)
- logger.debug(f"Broadcasted {len(prices)} price updates")
+ logger.debug(f"Broadcasted {len(coins)} price updates from {response['source']}")
except Exception as e:
logger.error(f"Error broadcasting market data: {e}", exc_info=True)
@@ -87,113 +105,98 @@ class DataBroadcaster:
async def broadcast_news(self):
"""Broadcast news updates"""
logger.info("Starting news broadcast...")
- last_news_id = 0
-
+
while self.is_running:
try:
- news = db_manager.get_latest_news(limit=10)
-
- if news and (not last_news_id or news[0].id != last_news_id):
- # New news available
- last_news_id = news[0].id
-
- data = {
- "type": "news",
- "data": {
- "articles": [
- {
- "id": article.id,
- "title": article.title,
- "source": article.source,
- "url": article.url,
- "published_at": article.published_at.isoformat(),
- "sentiment": article.sentiment
- }
- for article in news[:5] # Only send 5 latest
- ]
- },
- "count": len(news[:5]),
- "timestamp": datetime.utcnow().isoformat()
- }
-
- await ws_manager.broadcast_to_service(ServiceType.NEWS, data)
- logger.info(f"Broadcasted {len(news[:5])} news articles")
+ response = await provider_manager.fetch_data(
+ "news",
+ params={"filter": "hot"},
+ use_cache=True,
+ ttl=300
+ )
+
+ if response["success"] and response["data"]:
+ # Transform/Normalize
+ data = response["data"]
+ articles = []
+
+ if "results" in data: # CryptoPanic
+ for post in data.get('results', [])[:5]:
+ articles.append({
+ "id": str(post.get('id')),
+ "title": post.get('title', ''),
+ "source": post.get('source', {}).get('title', 'Unknown'),
+ "url": post.get('url', ''),
+ "published_at": post.get('published_at', datetime.now().isoformat())
+ })
+ elif "articles" in data: # NewsAPI
+ for post in data.get('articles', [])[:5]:
+ articles.append({
+ "id": str(hash(post.get('url', ''))),
+ "title": post.get('title', ''),
+ "source": post.get('source', {}).get('name', 'Unknown'),
+ "url": post.get('url', ''),
+ "published_at": post.get('publishedAt', datetime.now().isoformat())
+ })
+
+ if articles:
+ payload = {
+ "type": "news",
+ "data": {"articles": articles},
+ "count": len(articles),
+ "timestamp": datetime.utcnow().isoformat(),
+ "source": response["source"]
+ }
+
+ await ws_manager.broadcast_to_service(ServiceType.NEWS, payload)
+ logger.info(f"Broadcasted {len(articles)} news articles from {response['source']}")
except Exception as e:
logger.error(f"Error broadcasting news: {e}", exc_info=True)
- await asyncio.sleep(30) # Check every 30 seconds
+ await asyncio.sleep(60)
async def broadcast_sentiment(self):
"""Broadcast sentiment updates"""
logger.info("Starting sentiment broadcast...")
- last_sentiment_value = None
while self.is_running:
try:
- sentiment = db_manager.get_latest_sentiment()
-
- if sentiment and sentiment.value != last_sentiment_value:
- last_sentiment_value = sentiment.value
-
- data = {
+ response = await provider_manager.fetch_data(
+ "sentiment",
+ params={"limit": 1},
+ use_cache=True,
+ ttl=3600
+ )
+
+ if response["success"] and response["data"]:
+ data = response["data"]
+ fng_value = 50
+ classification = "Neutral"
+
+ if data.get('data'):
+ item = data['data'][0]
+ fng_value = int(item.get('value', 50))
+ classification = item.get('value_classification', 'Neutral')
+
+ payload = {
"type": "sentiment",
"data": {
- "fear_greed_index": sentiment.value,
- "classification": sentiment.classification,
- "metric_name": sentiment.metric_name,
- "source": sentiment.source,
- "timestamp": sentiment.timestamp.isoformat()
+ "fear_greed_index": fng_value,
+ "classification": classification,
+ "timestamp": datetime.utcnow().isoformat()
},
- "timestamp": datetime.utcnow().isoformat()
+ "timestamp": datetime.utcnow().isoformat(),
+ "source": response["source"]
}
- await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, data)
- logger.info(f"Broadcasted sentiment: {sentiment.value} ({sentiment.classification})")
+ await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, payload)
+ logger.info(f"Broadcasted sentiment: {fng_value} from {response['source']}")
except Exception as e:
logger.error(f"Error broadcasting sentiment: {e}", exc_info=True)
- await asyncio.sleep(60) # Check every minute
-
- async def broadcast_whales(self):
- """Broadcast whale transaction updates"""
- logger.info("Starting whale transaction broadcast...")
- last_whale_id = 0
-
- while self.is_running:
- try:
- whales = db_manager.get_whale_transactions(limit=5)
-
- if whales and (not last_whale_id or whales[0].id != last_whale_id):
- last_whale_id = whales[0].id
-
- data = {
- "type": "whale_transaction",
- "data": {
- "transactions": [
- {
- "id": tx.id,
- "blockchain": tx.blockchain,
- "amount_usd": tx.amount_usd,
- "from_address": tx.from_address[:20] + "...",
- "to_address": tx.to_address[:20] + "...",
- "timestamp": tx.timestamp.isoformat()
- }
- for tx in whales
- ]
- },
- "count": len(whales),
- "timestamp": datetime.utcnow().isoformat()
- }
-
- await ws_manager.broadcast_to_service(ServiceType.WHALE_TRACKING, data)
- logger.info(f"Broadcasted {len(whales)} whale transactions")
-
- except Exception as e:
- logger.error(f"Error broadcasting whales: {e}", exc_info=True)
-
- await asyncio.sleep(15) # Check every 15 seconds
+ await asyncio.sleep(60)
async def broadcast_gas_prices(self):
"""Broadcast gas price updates"""
@@ -201,23 +204,37 @@ class DataBroadcaster:
while self.is_running:
try:
- gas_prices = db_manager.get_latest_gas_prices()
-
- if gas_prices:
- data = {
- "type": "gas_prices",
- "data": gas_prices,
- "timestamp": datetime.utcnow().isoformat()
- }
-
- # Broadcast to RPC_NODES service type (gas prices are blockchain-related)
- await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, data)
- logger.debug("Broadcasted gas prices")
+ response = await provider_manager.fetch_data(
+ "onchain",
+ params={},
+ use_cache=True,
+ ttl=15
+ )
+
+ if response["success"] and response["data"]:
+ data = response["data"]
+ result = data.get("result", {})
+
+ if result:
+ payload = {
+ "type": "gas_prices",
+ "data": {
+ "fast": result.get("FastGasPrice"),
+ "standard": result.get("ProposeGasPrice"),
+ "slow": result.get("SafeGasPrice")
+ },
+ "timestamp": datetime.utcnow().isoformat(),
+ "source": response["source"]
+ }
+
+ # Broadcast to RPC_NODES service type (gas prices are blockchain-related)
+ await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, payload)
+ logger.debug(f"Broadcasted gas prices from {response['source']}")
except Exception as e:
logger.error(f"Error broadcasting gas prices: {e}", exc_info=True)
- await asyncio.sleep(30) # Every 30 seconds
+ await asyncio.sleep(30)
# Global broadcaster instance
diff --git a/backend/live_data/providers.py b/backend/live_data/providers.py
index 7452f30..3b54472 100644
--- a/backend/live_data/providers.py
+++ b/backend/live_data/providers.py
@@ -4,125 +4,264 @@ import os
import asyncio
from typing import Dict, List, Optional, Any
from datetime import datetime
+from backend.orchestration.provider_manager import provider_manager, ProviderConfig
logger = logging.getLogger(__name__)
-class BaseProvider:
- def __init__(self, name: str, base_url: str):
- self.name = name
- self.base_url = base_url
- self.session = None
-
- async def _get_session(self):
- if self.session is None or self.session.closed:
- self.session = aiohttp.ClientSession()
- return self.session
-
- async def close(self):
- if self.session and not self.session.closed:
- await self.session.close()
-
- async def _get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Any:
- try:
- session = await self._get_session()
- url = f"{self.base_url}{endpoint}"
- async with session.get(url, params=params, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
- response.raise_for_status()
- return await response.json()
- except Exception as e:
- logger.error(f"Error fetching from {self.name}: {e}")
- raise
-
-class CoinGeckoProvider(BaseProvider):
- def __init__(self):
- super().__init__("CoinGecko", "https://api.coingecko.com/api/v3")
- self.api_key = os.getenv("COINGECKO_API_KEY")
-
- async def get_market_data(self, vs_currency: str = "usd", ids: str = "bitcoin,ethereum") -> List[Dict]:
- params = {
- "vs_currency": vs_currency,
- "ids": ids,
- "order": "market_cap_desc",
- "per_page": 100,
- "page": 1,
- "sparkline": "false",
- "price_change_percentage": "24h"
- }
- if self.api_key:
- params["x_cg_demo_api_key"] = self.api_key
-
- return await self._get("/coins/markets", params=params)
-
- async def get_coin_price(self, coin_id: str, vs_currencies: str = "usd") -> Dict:
- params = {"ids": coin_id, "vs_currencies": vs_currencies}
- return await self._get("/simple/price", params=params)
-
-class BinanceProvider(BaseProvider):
- def __init__(self):
- super().__init__("Binance", "https://api.binance.com/api/v3")
-
- async def get_ticker_price(self, symbol: str) -> Dict:
- # Symbol example: BTCUSDT
- return await self._get("/ticker/price", params={"symbol": symbol.upper()})
-
- async def get_klines(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[List]:
- params = {
- "symbol": symbol.upper(),
- "interval": interval,
- "limit": limit
- }
- return await self._get("/klines", params=params)
-
-class CryptoPanicProvider(BaseProvider):
- def __init__(self):
- super().__init__("CryptoPanic", "https://cryptopanic.com/api/v1")
- self.api_key = os.getenv("CRYPTOPANIC_API_KEY")
-
- async def get_news(self, filter_type: str = "hot") -> Dict:
- if not self.api_key:
- logger.warning("CryptoPanic API key not set")
- # Fallback to public RSS feed logic elsewhere or return empty
- return {"results": []}
-
- params = {
- "auth_token": self.api_key,
- "filter": filter_type,
- "public": "true"
- }
- return await self._get("/posts/", params=params)
-
-class AlternativeMeProvider(BaseProvider):
- def __init__(self):
- super().__init__("Alternative.me", "https://api.alternative.me")
-
- async def get_fear_and_greed(self, limit: int = 1) -> Dict:
- return await self._get("/fng/", params={"limit": limit})
-
-# Singleton instances
-coingecko_provider = CoinGeckoProvider()
-binance_provider = BinanceProvider()
-cryptopanic_provider = CryptoPanicProvider()
-alternative_me_provider = AlternativeMeProvider()
-
-async def get_all_providers_status():
- results = {}
- # Simple check
- try:
- await coingecko_provider.get_coin_price("bitcoin")
- results["coingecko"] = "online"
- except:
- results["coingecko"] = "offline"
-
- try:
- await binance_provider.get_ticker_price("BTCUSDT")
- results["binance"] = "online"
- except:
- results["binance"] = "offline"
-
- try:
- await alternative_me_provider.get_fear_and_greed()
- results["alternative_me"] = "online"
- except:
- results["alternative_me"] = "offline"
+# ==============================================================================
+# FETCH IMPLEMENTATIONS
+# ==============================================================================
+
+async def fetch_coingecko_market(config: ProviderConfig, **kwargs) -> Any:
+ ids = kwargs.get("ids", "bitcoin,ethereum")
+ vs_currency = kwargs.get("vs_currency", "usd")
+
+ url = f"{config.base_url}/coins/markets"
+ params = {
+ "vs_currency": vs_currency,
+ "ids": ids,
+ "order": "market_cap_desc",
+ "per_page": 100,
+ "page": 1,
+ "sparkline": "false",
+ "price_change_percentage": "24h"
+ }
+
+ # Pro API key support
+ if config.api_key:
+ params["x_cg_pro_api_key"] = config.api_key
- return results
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params, timeout=config.timeout) as response:
+ if response.status == 429:
+ raise Exception("Rate limit exceeded (429)")
+ response.raise_for_status()
+ return await response.json()
+
+async def fetch_coingecko_price(config: ProviderConfig, **kwargs) -> Any:
+ coin_id = kwargs.get("coin_id", "bitcoin")
+ vs_currencies = kwargs.get("vs_currencies", "usd")
+
+ url = f"{config.base_url}/simple/price"
+ params = {"ids": coin_id, "vs_currencies": vs_currencies}
+
+ if config.api_key:
+ params["x_cg_pro_api_key"] = config.api_key
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params, timeout=config.timeout) as response:
+ response.raise_for_status()
+ return await response.json()
+
+async def fetch_binance_ticker(config: ProviderConfig, **kwargs) -> Any:
+ symbol = kwargs.get("symbol", "BTCUSDT").upper()
+ url = f"{config.base_url}/ticker/price"
+ params = {"symbol": symbol}
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params, timeout=config.timeout) as response:
+ if response.status == 451:
+ raise Exception("Geo-blocked (451)")
+ response.raise_for_status()
+ data = await response.json()
+ # Normalize to look somewhat like CoinGecko for generic usage if needed
+ return {"price": float(data.get("price", 0)), "symbol": data.get("symbol")}
+
+async def fetch_binance_klines(config: ProviderConfig, **kwargs) -> Any:
+ symbol = kwargs.get("symbol", "BTCUSDT").upper()
+ interval = kwargs.get("interval", "1h")
+ limit = kwargs.get("limit", 100)
+
+ url = f"{config.base_url}/klines"
+ params = {
+ "symbol": symbol,
+ "interval": interval,
+ "limit": limit
+ }
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params, timeout=config.timeout) as response:
+ if response.status == 451:
+ raise Exception("Geo-blocked (451)")
+ response.raise_for_status()
+ return await response.json()
+
+async def fetch_cryptopanic_news(config: ProviderConfig, **kwargs) -> Any:
+ filter_type = kwargs.get("filter", "hot")
+ url = f"{config.base_url}/posts/"
+
+ params = {
+ "auth_token": config.api_key,
+ "filter": filter_type,
+ "public": "true"
+ }
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params, timeout=config.timeout) as response:
+ response.raise_for_status()
+ return await response.json()
+
+async def fetch_newsapi(config: ProviderConfig, **kwargs) -> Any:
+ query = kwargs.get("query", "crypto")
+ url = f"{config.base_url}/everything"
+
+ params = {
+ "q": query,
+ "apiKey": config.api_key,
+ "sortBy": "publishedAt",
+ "language": "en"
+ }
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params, timeout=config.timeout) as response:
+ response.raise_for_status()
+ return await response.json()
+
+async def fetch_alternative_me_fng(config: ProviderConfig, **kwargs) -> Any:
+ limit = kwargs.get("limit", 1)
+ url = f"{config.base_url}/fng/"
+ params = {"limit": limit}
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params, timeout=config.timeout) as response:
+ response.raise_for_status()
+ return await response.json()
+
+async def fetch_etherscan_gas(config: ProviderConfig, **kwargs) -> Any:
+ url = config.base_url
+ params = {
+ "module": "gastracker",
+ "action": "gasoracle",
+ "apikey": config.api_key
+ }
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params, timeout=config.timeout) as response:
+ response.raise_for_status()
+ return await response.json()
+
+# ==============================================================================
+# REGISTRATION
+# ==============================================================================
+
+def initialize_providers():
+ # Market Data Providers
+ provider_manager.register_provider(
+ "market",
+ ProviderConfig(
+ name="coingecko_free",
+ category="market",
+ base_url="https://api.coingecko.com/api/v3",
+ rate_limit_per_min=30, # Conservative for free tier
+ weight=100
+ ),
+ fetch_coingecko_market
+ )
+
+ provider_manager.register_provider(
+ "market_pro",
+ ProviderConfig(
+ name="coingecko_pro",
+ category="market",
+ base_url="https://pro-api.coingecko.com/api/v3", # Assuming Pro URL
+ api_key=os.getenv("COINGECKO_PRO_API_KEY", "04cf4b5b-9868-465c-8ba0-9f2e78c92eb1"),
+ rate_limit_per_min=500,
+ weight=200
+ ),
+ fetch_coingecko_market
+ )
+
+ provider_manager.register_provider(
+ "market",
+ ProviderConfig(
+ name="binance",
+ category="market",
+ base_url="https://api.binance.com/api/v3",
+ rate_limit_per_min=1200,
+ weight=90
+ ),
+ fetch_binance_ticker # Note: This fetch function behaves differently (ticker vs market list), router needs to handle
+ )
+
+ # OHLC Providers
+ provider_manager.register_provider(
+ "ohlc",
+ ProviderConfig(
+ name="binance_ohlc",
+ category="ohlc",
+ base_url="https://api.binance.com/api/v3",
+ rate_limit_per_min=1200,
+ weight=100
+ ),
+ fetch_binance_klines
+ )
+
+ # News Providers
+ provider_manager.register_provider(
+ "news",
+ ProviderConfig(
+ name="cryptopanic",
+ category="news",
+ base_url="https://cryptopanic.com/api/v1",
+ api_key=os.getenv("CRYPTOPANIC_API_KEY", "7832690f05026639556837583758"), # Placeholder if env not set
+ rate_limit_per_min=60,
+ weight=100
+ ),
+ fetch_cryptopanic_news
+ )
+
+ provider_manager.register_provider(
+ "news",
+ ProviderConfig(
+ name="newsapi",
+ category="news",
+ base_url="https://newsapi.org/v2",
+ api_key=os.getenv("NEWS_API_KEY", "968a5e25552b4cb5ba3280361d8444ab"),
+ rate_limit_per_min=100,
+ weight=90
+ ),
+ fetch_newsapi
+ )
+
+ # Sentiment
+ provider_manager.register_provider(
+ "sentiment",
+ ProviderConfig(
+ name="alternative_me",
+ category="sentiment",
+ base_url="https://api.alternative.me",
+ rate_limit_per_min=60,
+ weight=100
+ ),
+ fetch_alternative_me_fng
+ )
+
+ # OnChain / RPC
+ provider_manager.register_provider(
+ "onchain",
+ ProviderConfig(
+ name="etherscan",
+ category="onchain",
+ base_url="https://api.etherscan.io/api",
+ api_key=os.getenv("ETHERSCAN_API_KEY", "SZHYFZK2RR8H9TIMJBVW54V4H81K2Z2KR2"),
+ rate_limit_per_min=5, # Free tier limit
+ weight=100
+ ),
+ fetch_etherscan_gas
+ )
+
+ provider_manager.register_provider(
+ "onchain",
+ ProviderConfig(
+ name="etherscan_backup",
+ category="onchain",
+ base_url="https://api.etherscan.io/api",
+ api_key=os.getenv("ETHERSCAN_API_KEY_2", "T6IR8VJHX2NE6ZJW2S3FDVN1TYG4PYYI45"),
+ rate_limit_per_min=5,
+ weight=90
+ ),
+ fetch_etherscan_gas
+ )
+
+# Auto-initialize
+initialize_providers()
diff --git a/backend/routers/hf_space_api.py b/backend/routers/hf_space_api.py
index 7683868..41ed9e9 100644
--- a/backend/routers/hf_space_api.py
+++ b/backend/routers/hf_space_api.py
@@ -1,7 +1,7 @@
"""
HF Space Complete API Router
Implements all required endpoints for Hugging Face Space deployment
-using REAL data providers.
+using REAL data providers managed by the Orchestrator.
"""
from fastapi import APIRouter, HTTPException, Query, Body, Depends
from fastapi.responses import JSONResponse
@@ -14,14 +14,8 @@ import json
import os
from pathlib import Path
-# Import Real Data Providers
-from backend.live_data.providers import (
- coingecko_provider,
- binance_provider,
- cryptopanic_provider,
- alternative_me_provider
-)
-from backend.cache.cache_manager import cache_manager
+# Import Orchestrator
+from backend.orchestration.provider_manager import provider_manager
logger = logging.getLogger(__name__)
@@ -36,6 +30,7 @@ class MetaInfo(BaseModel):
cache_ttl_seconds: int = Field(default=30, description="Cache TTL in seconds")
generated_at: str = Field(default_factory=lambda: datetime.now().isoformat())
source: str = Field(default="live", description="Data source")
+ latency_ms: Optional[float] = None
class MarketItem(BaseModel):
"""Market ticker item"""
@@ -94,39 +89,42 @@ class GasResponse(BaseModel):
async def get_market_snapshot():
"""
Get current market snapshot with prices, changes, and volumes.
- Uses CoinGecko API.
+ Uses Provider Orchestrator (CoinGecko, Binance, etc.)
"""
- cache_key = "market_snapshot"
- cached = await cache_manager.get(cache_key)
- if cached:
- return cached
-
- try:
- data = await coingecko_provider.get_market_data(ids="bitcoin,ethereum,tron,solana,binancecoin,ripple")
+ response = await provider_manager.fetch_data(
+ "market",
+ params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"},
+ use_cache=True,
+ ttl=60
+ )
+
+ if not response["success"]:
+ raise HTTPException(status_code=503, detail=response["error"])
- items = []
+ data = response["data"]
+ items = []
+
+ # Handle different provider formats if needed, but fetch functions should normalize
+ # Assuming coingecko format for "market" category list
+ if isinstance(data, list):
for coin in data:
items.append(MarketItem(
symbol=coin.get('symbol', '').upper(),
price=coin.get('current_price', 0),
change_24h=coin.get('price_change_percentage_24h', 0),
volume_24h=coin.get('total_volume', 0),
- source="coingecko"
+ source=response["source"]
))
-
- response = MarketResponse(
- last_updated=datetime.now().isoformat(),
- items=items,
- meta=MetaInfo(cache_ttl_seconds=60, source="coingecko")
+
+ return MarketResponse(
+ last_updated=response["timestamp"],
+ items=items,
+ meta=MetaInfo(
+ cache_ttl_seconds=60,
+ source=response["source"],
+ latency_ms=response.get("latency_ms")
)
-
- await cache_manager.set(cache_key, response, ttl=60)
- return response
-
- except Exception as e:
- logger.error(f"Error in get_market_snapshot: {e}")
- # Return empty list or cached stale data if available, but NEVER fake data
- raise HTTPException(status_code=503, detail="Market data unavailable")
+ )
@router.get("/api/market/ohlc")
async def get_ohlc(
@@ -134,55 +132,61 @@ async def get_ohlc(
interval: int = Query(60, description="Interval in minutes"),
limit: int = Query(100, description="Number of candles")
):
- """Get OHLC candlestick data from Binance"""
- cache_key = f"ohlc_{symbol}_{interval}_{limit}"
- cached = await cache_manager.get(cache_key)
- if cached:
- return cached
+ """Get OHLC candlestick data via Orchestrator"""
+
+ # Map minutes to common string format if needed by providers,
+ # but fetch_binance_klines handles it.
+ interval_str = "1h"
+ if interval < 60:
+ interval_str = f"{interval}m"
+ elif interval == 60:
+ interval_str = "1h"
+ elif interval == 240:
+ interval_str = "4h"
+ elif interval == 1440:
+ interval_str = "1d"
- try:
- # Map minutes to Binance intervals
- binance_interval = "1h"
- if interval == 1: binance_interval = "1m"
- elif interval == 5: binance_interval = "5m"
- elif interval == 15: binance_interval = "15m"
- elif interval == 60: binance_interval = "1h"
- elif interval == 240: binance_interval = "4h"
- elif interval == 1440: binance_interval = "1d"
+ response = await provider_manager.fetch_data(
+ "ohlc",
+ params={
+ "symbol": symbol,
+ "interval": interval_str,
+ "limit": limit
+ },
+ use_cache=True,
+ ttl=60
+ )
- # Binance symbol needs to be e.g., BTCUSDT
- formatted_symbol = symbol.upper()
- if not formatted_symbol.endswith("USDT") and not formatted_symbol.endswith("USD"):
- formatted_symbol += "USDT"
-
- klines = await binance_provider.get_klines(formatted_symbol, interval=binance_interval, limit=limit)
-
- ohlc_data = []
+ if not response["success"]:
+ raise HTTPException(status_code=503, detail=response["error"])
+
+ # Transform Binance Klines to standard OHLC
+ # [time, open, high, low, close, volume, ...]
+ klines = response["data"]
+ ohlc_data = []
+
+ if isinstance(klines, list):
for k in klines:
- # Binance kline: [open_time, open, high, low, close, volume, ...]
- ohlc_data.append({
- "ts": int(k[0] / 1000),
- "open": float(k[1]),
- "high": float(k[2]),
- "low": float(k[3]),
- "close": float(k[4]),
- "volume": float(k[5])
- })
-
- response = {
- "symbol": symbol,
- "interval": interval,
- "data": ohlc_data,
- "meta": MetaInfo(cache_ttl_seconds=60, source="binance").dict()
- }
-
- await cache_manager.set(cache_key, response, ttl=60)
- return response
+ if isinstance(k, list) and len(k) >= 6:
+ ohlc_data.append({
+ "ts": int(k[0] / 1000),
+ "open": float(k[1]),
+ "high": float(k[2]),
+ "low": float(k[3]),
+ "close": float(k[4]),
+ "volume": float(k[5])
+ })
- except Exception as e:
- logger.error(f"Error in get_ohlc: {e}")
- # Try fallbacks? For now, fail gracefully.
- raise HTTPException(status_code=503, detail="OHLC data unavailable")
+ return {
+ "symbol": symbol,
+ "interval": interval,
+ "data": ohlc_data,
+ "meta": MetaInfo(
+ cache_ttl_seconds=60,
+ source=response["source"],
+ latency_ms=response.get("latency_ms")
+ ).dict()
+ }
# ============================================================================
# News & Sentiment Endpoints
@@ -193,19 +197,24 @@ async def get_news(
limit: int = Query(20, description="Number of articles"),
source: Optional[str] = Query(None, description="Filter by source")
):
- """Get cryptocurrency news from CryptoPanic"""
- cache_key = f"news_{limit}_{source}"
- cached = await cache_manager.get(cache_key)
- if cached:
- return cached
+ """Get cryptocurrency news via Orchestrator"""
+
+ response = await provider_manager.fetch_data(
+ "news",
+ params={"filter": "hot", "query": "crypto"}, # Params for different providers
+ use_cache=True,
+ ttl=300
+ )
+
+ if not response["success"]:
+ return NewsResponse(articles=[], meta=MetaInfo(source="error"))
- try:
- data = await cryptopanic_provider.get_news()
-
- articles = []
- results = data.get('results', [])[:limit]
-
- for post in results:
+ data = response["data"]
+ articles = []
+
+ # Normalize CryptoPanic / NewsAPI formats
+ if "results" in data: # CryptoPanic
+ for post in data.get('results', [])[:limit]:
articles.append(NewsArticle(
id=str(post.get('id')),
title=post.get('title', ''),
@@ -214,49 +223,60 @@ async def get_news(
summary=post.get('slug', ''),
published_at=post.get('published_at', datetime.now().isoformat())
))
-
- response = NewsResponse(
- articles=articles,
- meta=MetaInfo(cache_ttl_seconds=300, source="cryptopanic")
+ elif "articles" in data: # NewsAPI
+ for post in data.get('articles', [])[:limit]:
+ articles.append(NewsArticle(
+ id=str(hash(post.get('url', ''))),
+ title=post.get('title', ''),
+ url=post.get('url', ''),
+ source=post.get('source', {}).get('name', 'Unknown'),
+ summary=post.get('description', ''),
+ published_at=post.get('publishedAt', datetime.now().isoformat())
+ ))
+
+ return NewsResponse(
+ articles=articles,
+ meta=MetaInfo(
+ cache_ttl_seconds=300,
+ source=response["source"],
+ latency_ms=response.get("latency_ms")
)
-
- await cache_manager.set(cache_key, response, ttl=300)
- return response
-
- except Exception as e:
- logger.error(f"Error in get_news: {e}")
- return NewsResponse(articles=[], meta=MetaInfo(source="error"))
+ )
@router.get("/api/sentiment/global")
async def get_global_sentiment():
- """Get global market sentiment (Fear & Greed Index)"""
- cache_key = "sentiment_global"
- cached = await cache_manager.get(cache_key)
- if cached:
- return cached
-
- try:
- data = await alternative_me_provider.get_fear_and_greed()
- fng_value = 50
- classification = "Neutral"
+ """Get global market sentiment via Orchestrator"""
+
+ response = await provider_manager.fetch_data(
+ "sentiment",
+ params={"limit": 1},
+ use_cache=True,
+ ttl=3600
+ )
+
+ if not response["success"]:
+ raise HTTPException(status_code=503, detail=response["error"])
- if data.get('data'):
- item = data['data'][0]
- fng_value = int(item.get('value', 50))
- classification = item.get('value_classification', 'Neutral')
-
- result = {
- "score": fng_value,
- "label": classification,
- "meta": MetaInfo(cache_ttl_seconds=3600, source="alternative.me").dict()
- }
+ data = response["data"]
+ fng_value = 50
+ classification = "Neutral"
+
+ # Alternative.me format
+ if data.get('data'):
+ item = data['data'][0]
+ fng_value = int(item.get('value', 50))
+ classification = item.get('value_classification', 'Neutral')
- await cache_manager.set(cache_key, result, ttl=3600)
- return result
- except Exception as e:
- logger.error(f"Error in get_global_sentiment: {e}")
- raise HTTPException(status_code=503, detail="Sentiment data unavailable")
+ return {
+ "score": fng_value,
+ "label": classification,
+ "meta": MetaInfo(
+ cache_ttl_seconds=3600,
+ source=response["source"],
+ latency_ms=response.get("latency_ms")
+ ).dict()
+ }
# ============================================================================
# Blockchain Endpoints
@@ -264,14 +284,56 @@ async def get_global_sentiment():
@router.get("/api/crypto/blockchain/gas", response_model=GasResponse)
async def get_gas_prices(chain: str = Query("ethereum", description="Blockchain network")):
- """Get gas prices - Placeholder for real implementation"""
- # TODO: Implement Etherscan or similar provider
- # For now, return empty/null to indicate no data rather than fake data
+ """Get gas prices via Orchestrator"""
+
+ if chain.lower() != "ethereum":
+ # Fallback or implement other chains
+ return GasResponse(
+ chain=chain,
+ gas_prices=None,
+ timestamp=datetime.now().isoformat(),
+ meta=MetaInfo(source="unavailable")
+ )
+
+ response = await provider_manager.fetch_data(
+ "onchain",
+ params={},
+ use_cache=True,
+ ttl=15
+ )
+
+ if not response["success"]:
+ return GasResponse(
+ chain=chain,
+ gas_prices=None,
+ timestamp=datetime.now().isoformat(),
+ meta=MetaInfo(source="unavailable")
+ )
+
+ data = response["data"]
+ result = data.get("result", {})
+
+ gas_price = None
+ if result:
+ # Etherscan returns data in result
+ try:
+ gas_price = GasPrice(
+ fast=float(result.get("FastGasPrice", 0)),
+ standard=float(result.get("ProposeGasPrice", 0)),
+ slow=float(result.get("SafeGasPrice", 0))
+ )
+ except:
+ pass
+
return GasResponse(
chain=chain,
- gas_prices=None,
+ gas_prices=gas_price,
timestamp=datetime.now().isoformat(),
- meta=MetaInfo(source="unavailable")
+ meta=MetaInfo(
+ cache_ttl_seconds=15,
+ source=response["source"],
+ latency_ms=response.get("latency_ms")
+ )
)
# ============================================================================
@@ -281,14 +343,12 @@ async def get_gas_prices(chain: str = Query("ethereum", description="Blockchain
@router.get("/api/status")
async def get_system_status():
"""Get overall system status"""
- from backend.live_data.providers import get_all_providers_status
-
- provider_status = await get_all_providers_status()
+ stats = provider_manager.get_stats()
return {
'status': 'operational',
'timestamp': datetime.now().isoformat(),
- 'providers': provider_status,
- 'version': '1.0.0',
+ 'providers': stats,
+ 'version': '2.0.0',
'meta': MetaInfo(source="system").dict()
}