Datasourceforcryptocurrency-5 / api /realtime_monitoring.py
nimazasinich
News source and monitoring update (#113)
3fea549
"""
Real-Time Monitoring Service with WebSocket Push Updates
This module provides real-time monitoring capabilities:
- Push updates for market data
- Real-time news alerts
- Sentiment changes
- Data collection status
- System health monitoring
All data is pushed via WebSocket when changes occur,
not just on a fixed interval.
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable, Set
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import json
logger = logging.getLogger(__name__)
router = APIRouter()
# ===== CONNECTION MANAGER =====
class RealTimeConnectionManager:
"""
Manages WebSocket connections for real-time updates
Supports multiple channels for different data types
"""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.subscriptions: Dict[str, Set[str]] = {} # client_id -> set of channels
self.channel_subscribers: Dict[str, Set[str]] = {} # channel -> set of client_ids
self._client_counter = 0
async def connect(self, websocket: WebSocket) -> str:
"""Accept connection and return client ID"""
await websocket.accept()
self._client_counter += 1
client_id = f"client_{self._client_counter}_{datetime.utcnow().timestamp()}"
self.active_connections[client_id] = websocket
self.subscriptions[client_id] = set()
logger.info(f"Real-time client connected: {client_id}")
return client_id
def disconnect(self, client_id: str):
"""Remove client and clean up subscriptions"""
if client_id in self.active_connections:
del self.active_connections[client_id]
if client_id in self.subscriptions:
# Remove from all channel subscriber lists
for channel in self.subscriptions[client_id]:
if channel in self.channel_subscribers:
self.channel_subscribers[channel].discard(client_id)
del self.subscriptions[client_id]
logger.info(f"Real-time client disconnected: {client_id}")
def subscribe(self, client_id: str, channel: str):
"""Subscribe client to a channel"""
if client_id not in self.subscriptions:
self.subscriptions[client_id] = set()
self.subscriptions[client_id].add(channel)
if channel not in self.channel_subscribers:
self.channel_subscribers[channel] = set()
self.channel_subscribers[channel].add(client_id)
logger.debug(f"Client {client_id} subscribed to {channel}")
def unsubscribe(self, client_id: str, channel: str):
"""Unsubscribe client from a channel"""
if client_id in self.subscriptions:
self.subscriptions[client_id].discard(channel)
if channel in self.channel_subscribers:
self.channel_subscribers[channel].discard(client_id)
async def broadcast_to_channel(self, channel: str, data: Dict[str, Any]):
"""Broadcast message to all subscribers of a channel"""
if channel not in self.channel_subscribers:
return
message = {
"channel": channel,
"data": data,
"timestamp": datetime.utcnow().isoformat()
}
disconnected = []
for client_id in self.channel_subscribers[channel]:
try:
websocket = self.active_connections.get(client_id)
if websocket:
await websocket.send_json(message)
except Exception as e:
logger.warning(f"Failed to send to {client_id}: {e}")
disconnected.append(client_id)
# Clean up disconnected clients
for client_id in disconnected:
self.disconnect(client_id)
async def send_to_client(self, client_id: str, data: Dict[str, Any]):
"""Send message to specific client"""
websocket = self.active_connections.get(client_id)
if websocket:
try:
await websocket.send_json(data)
except Exception as e:
logger.warning(f"Failed to send to {client_id}: {e}")
self.disconnect(client_id)
def get_stats(self) -> Dict[str, Any]:
"""Get connection statistics"""
return {
"total_connections": len(self.active_connections),
"channels": {
channel: len(subscribers)
for channel, subscribers in self.channel_subscribers.items()
},
"timestamp": datetime.utcnow().isoformat()
}
# Global connection manager
connection_manager = RealTimeConnectionManager()
# ===== AVAILABLE CHANNELS =====
class Channels:
"""Available WebSocket channels"""
MARKET_DATA = "market_data"
PRICE_UPDATES = "price_updates"
NEWS = "news"
SENTIMENT = "sentiment"
WHALE_ALERTS = "whale_alerts"
COLLECTION_STATUS = "collection_status"
SYSTEM_HEALTH = "system_health"
ALL = "all"
# ===== REAL-TIME PUBLISHER =====
class RealTimePublisher:
"""
Publishes data to WebSocket channels in real-time
Used by data collectors to push updates
"""
def __init__(self, manager: RealTimeConnectionManager):
self.manager = manager
self.last_data: Dict[str, Any] = {} # Cache last data per channel
async def publish_market_data(self, data: List[Dict[str, Any]]):
"""Publish market data update"""
# Only publish if data has changed significantly
if self._has_significant_change(Channels.MARKET_DATA, data):
await self.manager.broadcast_to_channel(Channels.MARKET_DATA, {
"type": "market_update",
"coins": data,
"count": len(data)
})
self.last_data[Channels.MARKET_DATA] = data
async def publish_price_update(self, symbol: str, price: float, change_24h: float = None):
"""Publish single price update"""
await self.manager.broadcast_to_channel(Channels.PRICE_UPDATES, {
"type": "price_update",
"symbol": symbol,
"price": price,
"change_24h": change_24h
})
async def publish_news(self, articles: List[Dict[str, Any]]):
"""Publish news articles"""
await self.manager.broadcast_to_channel(Channels.NEWS, {
"type": "news_update",
"articles": articles,
"count": len(articles)
})
async def publish_sentiment(self, sentiment_data: Dict[str, Any]):
"""Publish sentiment update"""
await self.manager.broadcast_to_channel(Channels.SENTIMENT, {
"type": "sentiment_update",
"data": sentiment_data
})
async def publish_whale_alert(self, transaction: Dict[str, Any]):
"""Publish whale transaction alert"""
await self.manager.broadcast_to_channel(Channels.WHALE_ALERTS, {
"type": "whale_alert",
"transaction": transaction
})
async def publish_collection_status(self, collector_name: str, status: Dict[str, Any]):
"""Publish data collection status"""
await self.manager.broadcast_to_channel(Channels.COLLECTION_STATUS, {
"type": "collection_status",
"collector": collector_name,
"status": status
})
async def publish_system_health(self, health_data: Dict[str, Any]):
"""Publish system health update"""
await self.manager.broadcast_to_channel(Channels.SYSTEM_HEALTH, {
"type": "health_update",
"data": health_data
})
def _has_significant_change(self, channel: str, new_data: Any) -> bool:
"""Check if data has changed significantly (to avoid spam)"""
if channel not in self.last_data:
return True
# For market data, check if any price changed more than 0.1%
if channel == Channels.MARKET_DATA:
old_prices = {d.get("symbol"): d.get("price", 0) for d in self.last_data.get(channel, [])}
for item in new_data:
symbol = item.get("symbol")
new_price = item.get("price", 0)
old_price = old_prices.get(symbol, 0)
if old_price > 0 and abs((new_price - old_price) / old_price) > 0.001:
return True
return False
return True
# Global publisher
publisher = RealTimePublisher(connection_manager)
def get_realtime_publisher() -> RealTimePublisher:
"""Get global publisher instance"""
return publisher
# ===== WEBSOCKET ENDPOINTS =====
@router.websocket("/ws/realtime")
async def websocket_realtime(websocket: WebSocket):
"""
Main real-time WebSocket endpoint
After connecting, send subscription messages:
{
"action": "subscribe",
"channels": ["market_data", "news", "sentiment"]
}
Or subscribe to all:
{
"action": "subscribe",
"channels": ["all"]
}
To unsubscribe:
{
"action": "unsubscribe",
"channels": ["news"]
}
"""
client_id = await connection_manager.connect(websocket)
try:
# Send welcome message with available channels
await websocket.send_json({
"type": "connected",
"client_id": client_id,
"available_channels": [
Channels.MARKET_DATA,
Channels.PRICE_UPDATES,
Channels.NEWS,
Channels.SENTIMENT,
Channels.WHALE_ALERTS,
Channels.COLLECTION_STATUS,
Channels.SYSTEM_HEALTH,
Channels.ALL
],
"timestamp": datetime.utcnow().isoformat()
})
while True:
data = await websocket.receive_json()
action = data.get("action")
channels = data.get("channels", [])
if action == "subscribe":
if Channels.ALL in channels:
# Subscribe to all channels
for channel in [Channels.MARKET_DATA, Channels.PRICE_UPDATES,
Channels.NEWS, Channels.SENTIMENT,
Channels.WHALE_ALERTS, Channels.COLLECTION_STATUS,
Channels.SYSTEM_HEALTH]:
connection_manager.subscribe(client_id, channel)
else:
for channel in channels:
connection_manager.subscribe(client_id, channel)
await websocket.send_json({
"type": "subscribed",
"channels": list(connection_manager.subscriptions.get(client_id, set())),
"timestamp": datetime.utcnow().isoformat()
})
elif action == "unsubscribe":
for channel in channels:
connection_manager.unsubscribe(client_id, channel)
await websocket.send_json({
"type": "unsubscribed",
"channels": channels,
"remaining": list(connection_manager.subscriptions.get(client_id, set())),
"timestamp": datetime.utcnow().isoformat()
})
elif action == "get_stats":
await websocket.send_json({
"type": "stats",
"data": connection_manager.get_stats()
})
elif action == "ping":
await websocket.send_json({
"type": "pong",
"timestamp": datetime.utcnow().isoformat()
})
except WebSocketDisconnect:
logger.info(f"Client {client_id} disconnected")
except Exception as e:
logger.error(f"WebSocket error for {client_id}: {e}")
finally:
connection_manager.disconnect(client_id)
@router.websocket("/ws/prices")
async def websocket_prices(websocket: WebSocket):
"""Dedicated WebSocket for price updates only"""
client_id = await connection_manager.connect(websocket)
connection_manager.subscribe(client_id, Channels.PRICE_UPDATES)
connection_manager.subscribe(client_id, Channels.MARKET_DATA)
try:
await websocket.send_json({
"type": "connected",
"channels": [Channels.PRICE_UPDATES, Channels.MARKET_DATA],
"timestamp": datetime.utcnow().isoformat()
})
while True:
data = await websocket.receive_json()
if data.get("action") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"Price WebSocket error: {e}")
finally:
connection_manager.disconnect(client_id)
@router.websocket("/ws/alerts")
async def websocket_alerts(websocket: WebSocket):
"""Dedicated WebSocket for alerts (whale, sentiment changes)"""
client_id = await connection_manager.connect(websocket)
connection_manager.subscribe(client_id, Channels.WHALE_ALERTS)
connection_manager.subscribe(client_id, Channels.SENTIMENT)
try:
await websocket.send_json({
"type": "connected",
"channels": [Channels.WHALE_ALERTS, Channels.SENTIMENT],
"timestamp": datetime.utcnow().isoformat()
})
while True:
data = await websocket.receive_json()
if data.get("action") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"Alerts WebSocket error: {e}")
finally:
connection_manager.disconnect(client_id)
# ===== BACKGROUND TASKS =====
async def start_realtime_monitoring():
"""Start real-time monitoring background tasks"""
logger.info("Starting real-time monitoring services...")
# Import data collection worker
try:
from workers.data_collection_worker import get_data_collection_worker, get_realtime_fetcher
worker = get_data_collection_worker()
fetcher = get_realtime_fetcher()
# Start periodic health check broadcasts
asyncio.create_task(_broadcast_health_status())
# Start periodic market data broadcasts
asyncio.create_task(_broadcast_market_updates(fetcher))
logger.info("Real-time monitoring services started")
except Exception as e:
logger.error(f"Failed to start real-time monitoring: {e}")
async def _broadcast_health_status():
"""Periodically broadcast system health"""
while True:
try:
health_data = {
"status": "healthy",
"connections": connection_manager.get_stats(),
"timestamp": datetime.utcnow().isoformat()
}
await publisher.publish_system_health(health_data)
except Exception as e:
logger.error(f"Health broadcast error: {e}")
await asyncio.sleep(30) # Every 30 seconds
async def _broadcast_market_updates(fetcher):
"""Periodically broadcast market updates"""
while True:
try:
# Only broadcast if there are subscribers
if connection_manager.channel_subscribers.get(Channels.MARKET_DATA):
# Fetch latest data
price_result = await fetcher.fetch_price("BTC")
if price_result.get("success"):
await publisher.publish_price_update(
"BTC",
price_result.get("price"),
None
)
except Exception as e:
logger.error(f"Market broadcast error: {e}")
await asyncio.sleep(60) # Every minute
# ===== HTTP ENDPOINTS FOR STATS =====
@router.get("/api/realtime/stats")
async def get_realtime_stats():
"""Get real-time connection statistics"""
return {
"success": True,
"data": connection_manager.get_stats()
}
@router.get("/api/realtime/channels")
async def get_available_channels():
"""Get available real-time channels"""
return {
"success": True,
"channels": [
{"id": Channels.MARKET_DATA, "name": "Market Data", "description": "Real-time market prices and stats"},
{"id": Channels.PRICE_UPDATES, "name": "Price Updates", "description": "Individual price changes"},
{"id": Channels.NEWS, "name": "News", "description": "Latest crypto news articles"},
{"id": Channels.SENTIMENT, "name": "Sentiment", "description": "Market sentiment updates"},
{"id": Channels.WHALE_ALERTS, "name": "Whale Alerts", "description": "Large transaction alerts"},
{"id": Channels.COLLECTION_STATUS, "name": "Collection Status", "description": "Data collection progress"},
{"id": Channels.SYSTEM_HEALTH, "name": "System Health", "description": "System health monitoring"}
]
}