|
|
import asyncio |
|
|
import logging |
|
|
from datetime import datetime |
|
|
from typing import Dict, Any |
|
|
|
|
|
from backend.orchestration.provider_manager import provider_manager |
|
|
from backend.services.ws_service_manager import ws_manager, ServiceType |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
logger = setup_logger("ws_data_broadcaster") |
|
|
|
|
|
class DataBroadcaster: |
|
|
""" |
|
|
Broadcasts cryptocurrency data updates to WebSocket clients |
|
|
using the Provider Orchestrator for data fetching. |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the broadcaster""" |
|
|
self.last_broadcast = {} |
|
|
self.broadcast_interval = 5 |
|
|
self.is_running = False |
|
|
logger.info("DataBroadcaster initialized") |
|
|
|
|
|
async def start_broadcasting(self): |
|
|
"""Start all broadcast tasks""" |
|
|
logger.info("Starting WebSocket data broadcaster...") |
|
|
|
|
|
self.is_running = True |
|
|
|
|
|
tasks = [ |
|
|
self.broadcast_market_data(), |
|
|
self.broadcast_news(), |
|
|
self.broadcast_sentiment(), |
|
|
self.broadcast_gas_prices() |
|
|
] |
|
|
|
|
|
try: |
|
|
await asyncio.gather(*tasks, return_exceptions=True) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in broadcasting tasks: {e}", exc_info=True) |
|
|
finally: |
|
|
self.is_running = False |
|
|
|
|
|
async def stop_broadcasting(self): |
|
|
"""Stop broadcasting""" |
|
|
logger.info("Stopping WebSocket data broadcaster...") |
|
|
self.is_running = False |
|
|
|
|
|
async def broadcast_market_data(self): |
|
|
"""Broadcast market price updates""" |
|
|
logger.info("Starting market data broadcast...") |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
|
|
|
|
|
|
response = await provider_manager.fetch_data( |
|
|
"market", |
|
|
params={"ids": "bitcoin,ethereum,tron,solana,binancecoin,ripple", "vs_currency": "usd"}, |
|
|
use_cache=True, |
|
|
ttl=10 |
|
|
) |
|
|
|
|
|
if response["success"] and response["data"]: |
|
|
coins = response["data"] |
|
|
|
|
|
|
|
|
prices = {} |
|
|
price_changes = {} |
|
|
volumes = {} |
|
|
market_caps = {} |
|
|
|
|
|
for coin in coins: |
|
|
symbol = coin.get("symbol", "").upper() |
|
|
prices[symbol] = coin.get("current_price") |
|
|
price_changes[symbol] = coin.get("price_change_percentage_24h") |
|
|
volumes[symbol] = coin.get("total_volume") |
|
|
market_caps[symbol] = coin.get("market_cap") |
|
|
|
|
|
data = { |
|
|
"type": "market_data", |
|
|
"data": { |
|
|
"prices": prices, |
|
|
"volumes": volumes, |
|
|
"market_caps": market_caps, |
|
|
"price_changes": price_changes |
|
|
}, |
|
|
"count": len(coins), |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"source": response["source"] |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.MARKET_DATA, data) |
|
|
logger.debug(f"Broadcasted {len(coins)} price updates from {response['source']}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting market data: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(self.broadcast_interval) |
|
|
|
|
|
async def broadcast_news(self): |
|
|
"""Broadcast news updates""" |
|
|
logger.info("Starting news broadcast...") |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
response = await provider_manager.fetch_data( |
|
|
"news", |
|
|
params={"filter": "hot"}, |
|
|
use_cache=True, |
|
|
ttl=300 |
|
|
) |
|
|
|
|
|
if response["success"] and response["data"]: |
|
|
|
|
|
data = response["data"] |
|
|
articles = [] |
|
|
|
|
|
if "results" in data: |
|
|
for post in data.get('results', [])[:5]: |
|
|
articles.append({ |
|
|
"id": str(post.get('id')), |
|
|
"title": post.get('title', ''), |
|
|
"source": post.get('source', {}).get('title', 'Unknown'), |
|
|
"url": post.get('url', ''), |
|
|
"published_at": post.get('published_at', datetime.now().isoformat()) |
|
|
}) |
|
|
elif "articles" in data: |
|
|
for post in data.get('articles', [])[:5]: |
|
|
articles.append({ |
|
|
"id": str(hash(post.get('url', ''))), |
|
|
"title": post.get('title', ''), |
|
|
"source": post.get('source', {}).get('name', 'Unknown'), |
|
|
"url": post.get('url', ''), |
|
|
"published_at": post.get('publishedAt', datetime.now().isoformat()) |
|
|
}) |
|
|
|
|
|
if articles: |
|
|
payload = { |
|
|
"type": "news", |
|
|
"data": {"articles": articles}, |
|
|
"count": len(articles), |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"source": response["source"] |
|
|
} |
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.NEWS, payload) |
|
|
logger.info(f"Broadcasted {len(articles)} news articles from {response['source']}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting news: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
async def broadcast_sentiment(self): |
|
|
"""Broadcast sentiment updates""" |
|
|
logger.info("Starting sentiment broadcast...") |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
response = await provider_manager.fetch_data( |
|
|
"sentiment", |
|
|
params={"limit": 1}, |
|
|
use_cache=True, |
|
|
ttl=3600 |
|
|
) |
|
|
|
|
|
if response["success"] and response["data"]: |
|
|
data = response["data"] |
|
|
fng_value = 50 |
|
|
classification = "Neutral" |
|
|
|
|
|
if data.get('data'): |
|
|
item = data['data'][0] |
|
|
fng_value = int(item.get('value', 50)) |
|
|
classification = item.get('value_classification', 'Neutral') |
|
|
|
|
|
payload = { |
|
|
"type": "sentiment", |
|
|
"data": { |
|
|
"fear_greed_index": fng_value, |
|
|
"classification": classification, |
|
|
"timestamp": datetime.utcnow().isoformat() |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"source": response["source"] |
|
|
} |
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.SENTIMENT, payload) |
|
|
logger.info(f"Broadcasted sentiment: {fng_value} from {response['source']}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting sentiment: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
async def broadcast_gas_prices(self): |
|
|
"""Broadcast gas price updates""" |
|
|
logger.info("Starting gas price broadcast...") |
|
|
|
|
|
while self.is_running: |
|
|
try: |
|
|
response = await provider_manager.fetch_data( |
|
|
"onchain", |
|
|
params={}, |
|
|
use_cache=True, |
|
|
ttl=15 |
|
|
) |
|
|
|
|
|
if response["success"] and response["data"]: |
|
|
data = response["data"] |
|
|
result = data.get("result", {}) |
|
|
|
|
|
if result: |
|
|
payload = { |
|
|
"type": "gas_prices", |
|
|
"data": { |
|
|
"fast": result.get("FastGasPrice"), |
|
|
"standard": result.get("ProposeGasPrice"), |
|
|
"slow": result.get("SafeGasPrice") |
|
|
}, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"source": response["source"] |
|
|
} |
|
|
|
|
|
|
|
|
await ws_manager.broadcast_to_service(ServiceType.RPC_NODES, payload) |
|
|
logger.debug(f"Broadcasted gas prices from {response['source']}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error broadcasting gas prices: {e}", exc_info=True) |
|
|
|
|
|
await asyncio.sleep(30) |
|
|
|
|
|
|
|
|
|
|
|
broadcaster = DataBroadcaster() |
|
|
|