Datasourceforcryptocurrency-5 / api /ws_data_broadcaster.py
nimazasinich
Replace mock data with real (#115)
8ff9278
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any
from backend.orchestration.provider_manager import provider_manager
from backend.services.ws_service_manager import ws_manager, ServiceType
from utils.logger import setup_logger
logger = setup_logger("ws_data_broadcaster")
class DataBroadcaster:
"""
Broadcasts cryptocurrency data updates to WebSocket clients
using the Provider Orchestrator for data fetching.
"""
def __init__(self):
"""Initialize the broadcaster"""
self.last_broadcast = {}
self.broadcast_interval = 5 # seconds for price updates
self.is_running = False
logger.info("DataBroadcaster initialized")
async def start_broadcasting(self):
"""Start all broadcast tasks"""
logger.info("Starting WebSocket data broadcaster...")
self.is_running = True
tasks = [
self.broadcast_market_data(),
self.broadcast_news(),
self.broadcast_sentiment(),
self.broadcast_gas_prices()
]
try:
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(f"Error in broadcasting tasks: {e}", exc_info=True)
finally:
self.is_running = False
async def stop_broadcasting(self):
"""Stop broadcasting"""
logger.info("Stopping WebSocket data broadcaster...")
self.is_running = False
async def broadcast_market_data(self):
"""Broadcast market price updates"""
logger.info("Starting market data broadcast...")
while self.is_running:
try:
# Use Orchestrator to fetch market data
# Using 30s TTL to prevent provider spam, but broadcast often
response = await provider_manager.fetch_data(
"market",
params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"},
use_cache=True,
ttl=10 # Short TTL for live prices if provider allows
)
if response["success"] and response["data"]:
coins = response["data"]
# Format data for broadcast
prices = {}
price_changes = {}
volumes = {}
market_caps = {}
for coin in coins:
symbol = coin.get("symbol", "").upper()
prices[symbol] = coin.get("current_price")
price_changes[symbol] = coin.get("price_change_percentage_24h")
volumes[symbol] = coin.get("total_volume")
market_caps[symbol] = coin.get("market_cap")
data = {
"type": "market_data",
"data": {
"prices": prices,
"volumes": volumes,
"market_caps": market_caps,
"price_changes": price_changes
},
"count": len(coins),
"timestamp": datetime.utcnow().isoformat(),
"source": response["source"]
}
# Diff check could be here (optimization)
# Broadcast to subscribed clients
await ws_manager.broadcast_to_service(ServiceType.MARKET_DATA, data)
logger.debug(f"Broadcasted {len(coins)} price updates from {response['source']}")
except Exception as e:
logger.error(f"Error broadcasting market data: {e}", exc_info=True)
await asyncio.sleep(self.broadcast_interval)
async def broadcast_news(self):
"""Broadcast news updates"""
logger.info("Starting news broadcast...")
while self.is_running:
try:
response = await provider_manager.fetch_data(
"news",
params={"filter": "hot"},
use_cache=True,
ttl=300
)
if response["success"] and response["data"]:
# Transform/Normalize
data = response["data"]
articles = []
if "results" in data: # CryptoPanic
for post in data.get('results', [])[:5]:
articles.append({
"id": str(post.get('id')),
"title": post.get('title', ''),
"source": post.get('source', {}).get('title', 'Unknown'),
"url": post.get('url', ''),
"published_at": post.get('published_at', datetime.now().isoformat())
})
elif "articles" in data: # NewsAPI
for post in data.get('articles', [])[:5]:
articles.append({
"id": str(hash(post.get('url', ''))),
"title": post.get('title', ''),
"source": post.get('source', {}).get('name', 'Unknown'),
"url": post.get('url', ''),
"published_at": post.get('publishedAt', datetime.now().isoformat())
})
if articles:
payload = {
"type": "news",
"data": {"articles": articles},
"count": len(articles),
"timestamp": datetime.utcnow().isoformat(),
"source": response["source"]
}
await ws_manager.broadcast_to_service(ServiceType.NEWS, payload)
logger.info(f"Broadcasted {len(articles)} news articles from {response['source']}")
except Exception as e:
logger.error(f"Error broadcasting news: {e}", exc_info=True)
await asyncio.sleep(60)
async def broadcast_sentiment(self):
"""Broadcast sentiment updates"""
logger.info("Starting sentiment broadcast...")
while self.is_running:
try:
response = await provider_manager.fetch_data(
"sentiment",
params={"limit": 1},
use_cache=True,
ttl=3600
)
if response["success"] and response["data"]:
data = response["data"]
fng_value = 50
classification = "Neutral"
if data.get('data'):
item = data['data'][0]
fng_value = int(item.get('value', 50))
classification = item.get('value_classification', 'Neutral')
payload = {
"type": "sentiment",
"data": {
"fear_greed_index": fng_value,
"classification": classification,
"timestamp": datetime.utcnow().isoformat()
},
"timestamp": datetime.utcnow().isoformat(),
"source": response["source"]
}
await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, payload)
logger.info(f"Broadcasted sentiment: {fng_value} from {response['source']}")
except Exception as e:
logger.error(f"Error broadcasting sentiment: {e}", exc_info=True)
await asyncio.sleep(60)
async def broadcast_gas_prices(self):
"""Broadcast gas price updates"""
logger.info("Starting gas price broadcast...")
while self.is_running:
try:
response = await provider_manager.fetch_data(
"onchain",
params={},
use_cache=True,
ttl=15
)
if response["success"] and response["data"]:
data = response["data"]
result = data.get("result", {})
if result:
payload = {
"type": "gas_prices",
"data": {
"fast": result.get("FastGasPrice"),
"standard": result.get("ProposeGasPrice"),
"slow": result.get("SafeGasPrice")
},
"timestamp": datetime.utcnow().isoformat(),
"source": response["source"]
}
# Broadcast to RPC_NODES service type (gas prices are blockchain-related)
await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, payload)
logger.debug(f"Broadcasted gas prices from {response['source']}")
except Exception as e:
logger.error(f"Error broadcasting gas prices: {e}", exc_info=True)
await asyncio.sleep(30)
# Global broadcaster instance
broadcaster = DataBroadcaster()