Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| """ | |
| Database Manager Module | |
| Provides comprehensive database operations for the crypto API monitoring system | |
| """ | |
| import os | |
| from contextlib import contextmanager | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Dict, Any, Tuple | |
| from pathlib import Path | |
| from sqlalchemy import create_engine, func, and_, or_, desc, text | |
| from sqlalchemy.orm import sessionmaker, Session | |
| from sqlalchemy.exc import SQLAlchemyError, IntegrityError | |
| from database.models import ( | |
| Base, | |
| Provider, | |
| ConnectionAttempt, | |
| DataCollection, | |
| RateLimitUsage, | |
| ScheduleConfig, | |
| ScheduleCompliance, | |
| FailureLog, | |
| Alert, | |
| SystemMetrics, | |
| ConnectionStatus, | |
| ProviderCategory, | |
| # Crypto data models | |
| MarketPrice, | |
| NewsArticle, | |
| WhaleTransaction, | |
| SentimentMetric, | |
| GasPrice, | |
| BlockchainStat | |
| ) | |
| from database.data_access import DataAccessMixin | |
| from utils.logger import setup_logger | |
| # Initialize logger | |
| logger = setup_logger("db_manager", level="INFO") | |
| class DatabaseManager(DataAccessMixin): | |
| """ | |
| Comprehensive database manager for API monitoring system | |
| Handles all database operations with proper error handling and logging | |
| """ | |
| def __init__(self, db_path: str = "data/api_monitor.db"): | |
| """ | |
| Initialize database manager | |
| Args: | |
| db_path: Path to SQLite database file | |
| """ | |
| self.db_path = db_path | |
| self._ensure_data_directory() | |
| # Create SQLAlchemy engine | |
| db_url = f"sqlite:///{self.db_path}" | |
| self.engine = create_engine( | |
| db_url, | |
| echo=False, # Set to True for SQL debugging | |
| connect_args={"check_same_thread": False} # SQLite specific | |
| ) | |
| # Create session factory | |
| self.SessionLocal = sessionmaker( | |
| autocommit=False, | |
| autoflush=False, | |
| bind=self.engine, | |
| expire_on_commit=False # Allow access to attributes after commit | |
| ) | |
| logger.info(f"Database manager initialized with database: {self.db_path}") | |
| def _ensure_data_directory(self): | |
| """Ensure the data directory exists""" | |
| data_dir = Path(self.db_path).parent | |
| data_dir.mkdir(parents=True, exist_ok=True) | |
| def get_session(self) -> Session: | |
| """ | |
| Context manager for database sessions | |
| Automatically handles commit/rollback and cleanup | |
| Yields: | |
| SQLAlchemy session | |
| Example: | |
| with db_manager.get_session() as session: | |
| provider = session.query(Provider).first() | |
| """ | |
| session = self.SessionLocal() | |
| try: | |
| yield session | |
| session.commit() | |
| except Exception as e: | |
| session.rollback() | |
| logger.error(f"Session error: {str(e)}", exc_info=True) | |
| raise | |
| finally: | |
| session.close() | |
| def init_database(self) -> bool: | |
| """ | |
| Initialize database by creating all tables | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| Base.metadata.create_all(bind=self.engine) | |
| logger.info("Database tables created successfully") | |
| return True | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to initialize database: {str(e)}", exc_info=True) | |
| return False | |
| def drop_all_tables(self) -> bool: | |
| """ | |
| Drop all tables (use with caution!) | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| Base.metadata.drop_all(bind=self.engine) | |
| logger.warning("All database tables dropped") | |
| return True | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to drop tables: {str(e)}", exc_info=True) | |
| return False | |
| # ============================================================================ | |
| # Provider CRUD Operations | |
| # ============================================================================ | |
| def create_provider( | |
| self, | |
| name: str, | |
| category: str, | |
| endpoint_url: str, | |
| requires_key: bool = False, | |
| api_key_masked: Optional[str] = None, | |
| rate_limit_type: Optional[str] = None, | |
| rate_limit_value: Optional[int] = None, | |
| timeout_ms: int = 10000, | |
| priority_tier: int = 3 | |
| ) -> Optional[Provider]: | |
| """ | |
| Create a new provider | |
| Args: | |
| name: Provider name | |
| category: Provider category | |
| endpoint_url: API endpoint URL | |
| requires_key: Whether API key is required | |
| api_key_masked: Masked API key for display | |
| rate_limit_type: Rate limit type (per_minute, per_hour, per_day) | |
| rate_limit_value: Rate limit value | |
| timeout_ms: Timeout in milliseconds | |
| priority_tier: Priority tier (1-4, 1 is highest) | |
| Returns: | |
| Created Provider object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| provider = Provider( | |
| name=name, | |
| category=category, | |
| endpoint_url=endpoint_url, | |
| requires_key=requires_key, | |
| api_key_masked=api_key_masked, | |
| rate_limit_type=rate_limit_type, | |
| rate_limit_value=rate_limit_value, | |
| timeout_ms=timeout_ms, | |
| priority_tier=priority_tier | |
| ) | |
| session.add(provider) | |
| session.commit() | |
| session.refresh(provider) | |
| logger.info(f"Created provider: {name}") | |
| return provider | |
| except IntegrityError: | |
| logger.error(f"Provider already exists: {name}") | |
| return None | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to create provider {name}: {str(e)}", exc_info=True) | |
| return None | |
| def get_provider(self, provider_id: Optional[int] = None, name: Optional[str] = None) -> Optional[Provider]: | |
| """ | |
| Get a provider by ID or name | |
| Args: | |
| provider_id: Provider ID | |
| name: Provider name | |
| Returns: | |
| Provider object or None if not found | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| if provider_id: | |
| provider = session.query(Provider).filter(Provider.id == provider_id).first() | |
| elif name: | |
| provider = session.query(Provider).filter(Provider.name == name).first() | |
| else: | |
| logger.warning("Either provider_id or name must be provided") | |
| return None | |
| if provider: | |
| session.refresh(provider) | |
| return provider | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get provider: {str(e)}", exc_info=True) | |
| return None | |
| def get_all_providers(self, category: Optional[str] = None, enabled_only: bool = False) -> List[Provider]: | |
| """ | |
| Get all providers with optional filtering | |
| Args: | |
| category: Filter by category | |
| enabled_only: Only return enabled providers (based on schedule_config) | |
| Returns: | |
| List of Provider objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| query = session.query(Provider) | |
| if category: | |
| query = query.filter(Provider.category == category) | |
| if enabled_only: | |
| query = query.join(ScheduleConfig).filter(ScheduleConfig.enabled == True) | |
| providers = query.order_by(Provider.priority_tier, Provider.name).all() | |
| # Refresh all providers to ensure data is loaded | |
| for provider in providers: | |
| session.refresh(provider) | |
| return providers | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get providers: {str(e)}", exc_info=True) | |
| return [] | |
| def update_provider(self, provider_id: int, **kwargs) -> bool: | |
| """ | |
| Update a provider's attributes | |
| Args: | |
| provider_id: Provider ID | |
| **kwargs: Attributes to update | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| provider = session.query(Provider).filter(Provider.id == provider_id).first() | |
| if not provider: | |
| logger.warning(f"Provider not found: {provider_id}") | |
| return False | |
| for key, value in kwargs.items(): | |
| if hasattr(provider, key): | |
| setattr(provider, key, value) | |
| provider.updated_at = datetime.utcnow() | |
| session.commit() | |
| logger.info(f"Updated provider: {provider.name}") | |
| return True | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to update provider {provider_id}: {str(e)}", exc_info=True) | |
| return False | |
| def delete_provider(self, provider_id: int) -> bool: | |
| """ | |
| Delete a provider and all related records | |
| Args: | |
| provider_id: Provider ID | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| provider = session.query(Provider).filter(Provider.id == provider_id).first() | |
| if not provider: | |
| logger.warning(f"Provider not found: {provider_id}") | |
| return False | |
| provider_name = provider.name | |
| session.delete(provider) | |
| session.commit() | |
| logger.info(f"Deleted provider: {provider_name}") | |
| return True | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to delete provider {provider_id}: {str(e)}", exc_info=True) | |
| return False | |
| # ============================================================================ | |
| # Connection Attempt Operations | |
| # ============================================================================ | |
| def save_connection_attempt( | |
| self, | |
| provider_id: int, | |
| endpoint: str, | |
| status: str, | |
| response_time_ms: Optional[int] = None, | |
| http_status_code: Optional[int] = None, | |
| error_type: Optional[str] = None, | |
| error_message: Optional[str] = None, | |
| retry_count: int = 0, | |
| retry_result: Optional[str] = None | |
| ) -> Optional[ConnectionAttempt]: | |
| """ | |
| Save a connection attempt log | |
| Args: | |
| provider_id: Provider ID | |
| endpoint: API endpoint | |
| status: Connection status | |
| response_time_ms: Response time in milliseconds | |
| http_status_code: HTTP status code | |
| error_type: Error type if failed | |
| error_message: Error message if failed | |
| retry_count: Number of retries | |
| retry_result: Result of retry attempt | |
| Returns: | |
| Created ConnectionAttempt object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| attempt = ConnectionAttempt( | |
| provider_id=provider_id, | |
| endpoint=endpoint, | |
| status=status, | |
| response_time_ms=response_time_ms, | |
| http_status_code=http_status_code, | |
| error_type=error_type, | |
| error_message=error_message, | |
| retry_count=retry_count, | |
| retry_result=retry_result | |
| ) | |
| session.add(attempt) | |
| session.commit() | |
| session.refresh(attempt) | |
| return attempt | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to save connection attempt: {str(e)}", exc_info=True) | |
| return None | |
| def get_connection_attempts( | |
| self, | |
| provider_id: Optional[int] = None, | |
| status: Optional[str] = None, | |
| hours: int = 24, | |
| limit: int = 1000 | |
| ) -> List[ConnectionAttempt]: | |
| """ | |
| Get connection attempts with filtering | |
| Args: | |
| provider_id: Filter by provider ID | |
| status: Filter by status | |
| hours: Get attempts from last N hours | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of ConnectionAttempt objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| query = session.query(ConnectionAttempt).filter( | |
| ConnectionAttempt.timestamp >= cutoff_time | |
| ) | |
| if provider_id: | |
| query = query.filter(ConnectionAttempt.provider_id == provider_id) | |
| if status: | |
| query = query.filter(ConnectionAttempt.status == status) | |
| attempts = query.order_by(desc(ConnectionAttempt.timestamp)).limit(limit).all() | |
| for attempt in attempts: | |
| session.refresh(attempt) | |
| return attempts | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get connection attempts: {str(e)}", exc_info=True) | |
| return [] | |
| # ============================================================================ | |
| # Data Collection Operations | |
| # ============================================================================ | |
| def save_data_collection( | |
| self, | |
| provider_id: int, | |
| category: str, | |
| scheduled_time: datetime, | |
| actual_fetch_time: datetime, | |
| data_timestamp: Optional[datetime] = None, | |
| staleness_minutes: Optional[float] = None, | |
| record_count: int = 0, | |
| payload_size_bytes: int = 0, | |
| data_quality_score: float = 1.0, | |
| on_schedule: bool = True, | |
| skip_reason: Optional[str] = None | |
| ) -> Optional[DataCollection]: | |
| """ | |
| Save a data collection record | |
| Args: | |
| provider_id: Provider ID | |
| category: Data category | |
| scheduled_time: Scheduled collection time | |
| actual_fetch_time: Actual fetch time | |
| data_timestamp: Timestamp from API response | |
| staleness_minutes: Data staleness in minutes | |
| record_count: Number of records collected | |
| payload_size_bytes: Payload size in bytes | |
| data_quality_score: Data quality score (0-1) | |
| on_schedule: Whether collection was on schedule | |
| skip_reason: Reason if skipped | |
| Returns: | |
| Created DataCollection object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| collection = DataCollection( | |
| provider_id=provider_id, | |
| category=category, | |
| scheduled_time=scheduled_time, | |
| actual_fetch_time=actual_fetch_time, | |
| data_timestamp=data_timestamp, | |
| staleness_minutes=staleness_minutes, | |
| record_count=record_count, | |
| payload_size_bytes=payload_size_bytes, | |
| data_quality_score=data_quality_score, | |
| on_schedule=on_schedule, | |
| skip_reason=skip_reason | |
| ) | |
| session.add(collection) | |
| session.commit() | |
| session.refresh(collection) | |
| return collection | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to save data collection: {str(e)}", exc_info=True) | |
| return None | |
| def get_data_collections( | |
| self, | |
| provider_id: Optional[int] = None, | |
| category: Optional[str] = None, | |
| hours: int = 24, | |
| limit: int = 1000 | |
| ) -> List[DataCollection]: | |
| """ | |
| Get data collections with filtering | |
| Args: | |
| provider_id: Filter by provider ID | |
| category: Filter by category | |
| hours: Get collections from last N hours | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of DataCollection objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| query = session.query(DataCollection).filter( | |
| DataCollection.actual_fetch_time >= cutoff_time | |
| ) | |
| if provider_id: | |
| query = query.filter(DataCollection.provider_id == provider_id) | |
| if category: | |
| query = query.filter(DataCollection.category == category) | |
| collections = query.order_by(desc(DataCollection.actual_fetch_time)).limit(limit).all() | |
| for collection in collections: | |
| session.refresh(collection) | |
| return collections | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get data collections: {str(e)}", exc_info=True) | |
| return [] | |
| # ============================================================================ | |
| # Rate Limit Usage Operations | |
| # ============================================================================ | |
| def save_rate_limit_usage( | |
| self, | |
| provider_id: int, | |
| limit_type: str, | |
| limit_value: int, | |
| current_usage: int, | |
| reset_time: datetime | |
| ) -> Optional[RateLimitUsage]: | |
| """ | |
| Save rate limit usage record | |
| Args: | |
| provider_id: Provider ID | |
| limit_type: Limit type (per_minute, per_hour, per_day) | |
| limit_value: Rate limit value | |
| current_usage: Current usage count | |
| reset_time: When the limit resets | |
| Returns: | |
| Created RateLimitUsage object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| percentage = (current_usage / limit_value * 100) if limit_value > 0 else 0 | |
| usage = RateLimitUsage( | |
| provider_id=provider_id, | |
| limit_type=limit_type, | |
| limit_value=limit_value, | |
| current_usage=current_usage, | |
| percentage=percentage, | |
| reset_time=reset_time | |
| ) | |
| session.add(usage) | |
| session.commit() | |
| session.refresh(usage) | |
| return usage | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to save rate limit usage: {str(e)}", exc_info=True) | |
| return None | |
| def get_rate_limit_usage( | |
| self, | |
| provider_id: Optional[int] = None, | |
| hours: int = 24, | |
| high_usage_only: bool = False, | |
| threshold: float = 80.0 | |
| ) -> List[RateLimitUsage]: | |
| """ | |
| Get rate limit usage records | |
| Args: | |
| provider_id: Filter by provider ID | |
| hours: Get usage from last N hours | |
| high_usage_only: Only return high usage records | |
| threshold: Percentage threshold for high usage | |
| Returns: | |
| List of RateLimitUsage objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| query = session.query(RateLimitUsage).filter( | |
| RateLimitUsage.timestamp >= cutoff_time | |
| ) | |
| if provider_id: | |
| query = query.filter(RateLimitUsage.provider_id == provider_id) | |
| if high_usage_only: | |
| query = query.filter(RateLimitUsage.percentage >= threshold) | |
| usage_records = query.order_by(desc(RateLimitUsage.timestamp)).all() | |
| for record in usage_records: | |
| session.refresh(record) | |
| return usage_records | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get rate limit usage: {str(e)}", exc_info=True) | |
| return [] | |
| # ============================================================================ | |
| # Schedule Configuration Operations | |
| # ============================================================================ | |
| def create_schedule_config( | |
| self, | |
| provider_id: int, | |
| schedule_interval: str, | |
| enabled: bool = True, | |
| next_run: Optional[datetime] = None | |
| ) -> Optional[ScheduleConfig]: | |
| """ | |
| Create schedule configuration for a provider | |
| Args: | |
| provider_id: Provider ID | |
| schedule_interval: Schedule interval (e.g., "every_1_min") | |
| enabled: Whether schedule is enabled | |
| next_run: Next scheduled run time | |
| Returns: | |
| Created ScheduleConfig object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| config = ScheduleConfig( | |
| provider_id=provider_id, | |
| schedule_interval=schedule_interval, | |
| enabled=enabled, | |
| next_run=next_run | |
| ) | |
| session.add(config) | |
| session.commit() | |
| session.refresh(config) | |
| logger.info(f"Created schedule config for provider {provider_id}") | |
| return config | |
| except IntegrityError: | |
| logger.error(f"Schedule config already exists for provider {provider_id}") | |
| return None | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to create schedule config: {str(e)}", exc_info=True) | |
| return None | |
| def get_schedule_config(self, provider_id: int) -> Optional[ScheduleConfig]: | |
| """ | |
| Get schedule configuration for a provider | |
| Args: | |
| provider_id: Provider ID | |
| Returns: | |
| ScheduleConfig object or None if not found | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| config = session.query(ScheduleConfig).filter( | |
| ScheduleConfig.provider_id == provider_id | |
| ).first() | |
| if config: | |
| session.refresh(config) | |
| return config | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get schedule config: {str(e)}", exc_info=True) | |
| return None | |
| def update_schedule_config(self, provider_id: int, **kwargs) -> bool: | |
| """ | |
| Update schedule configuration | |
| Args: | |
| provider_id: Provider ID | |
| **kwargs: Attributes to update | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| config = session.query(ScheduleConfig).filter( | |
| ScheduleConfig.provider_id == provider_id | |
| ).first() | |
| if not config: | |
| logger.warning(f"Schedule config not found for provider {provider_id}") | |
| return False | |
| for key, value in kwargs.items(): | |
| if hasattr(config, key): | |
| setattr(config, key, value) | |
| session.commit() | |
| logger.info(f"Updated schedule config for provider {provider_id}") | |
| return True | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to update schedule config: {str(e)}", exc_info=True) | |
| return False | |
| def get_all_schedule_configs(self, enabled_only: bool = True) -> List[ScheduleConfig]: | |
| """ | |
| Get all schedule configurations | |
| Args: | |
| enabled_only: Only return enabled schedules | |
| Returns: | |
| List of ScheduleConfig objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| query = session.query(ScheduleConfig) | |
| if enabled_only: | |
| query = query.filter(ScheduleConfig.enabled == True) | |
| configs = query.all() | |
| for config in configs: | |
| session.refresh(config) | |
| return configs | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get schedule configs: {str(e)}", exc_info=True) | |
| return [] | |
| # ============================================================================ | |
| # Schedule Compliance Operations | |
| # ============================================================================ | |
| def save_schedule_compliance( | |
| self, | |
| provider_id: int, | |
| expected_time: datetime, | |
| actual_time: Optional[datetime] = None, | |
| delay_seconds: Optional[int] = None, | |
| on_time: bool = True, | |
| skip_reason: Optional[str] = None | |
| ) -> Optional[ScheduleCompliance]: | |
| """ | |
| Save schedule compliance record | |
| Args: | |
| provider_id: Provider ID | |
| expected_time: Expected execution time | |
| actual_time: Actual execution time | |
| delay_seconds: Delay in seconds | |
| on_time: Whether execution was on time | |
| skip_reason: Reason if skipped | |
| Returns: | |
| Created ScheduleCompliance object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| compliance = ScheduleCompliance( | |
| provider_id=provider_id, | |
| expected_time=expected_time, | |
| actual_time=actual_time, | |
| delay_seconds=delay_seconds, | |
| on_time=on_time, | |
| skip_reason=skip_reason | |
| ) | |
| session.add(compliance) | |
| session.commit() | |
| session.refresh(compliance) | |
| return compliance | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to save schedule compliance: {str(e)}", exc_info=True) | |
| return None | |
| def get_schedule_compliance( | |
| self, | |
| provider_id: Optional[int] = None, | |
| hours: int = 24, | |
| late_only: bool = False | |
| ) -> List[ScheduleCompliance]: | |
| """ | |
| Get schedule compliance records | |
| Args: | |
| provider_id: Filter by provider ID | |
| hours: Get records from last N hours | |
| late_only: Only return late executions | |
| Returns: | |
| List of ScheduleCompliance objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| query = session.query(ScheduleCompliance).filter( | |
| ScheduleCompliance.timestamp >= cutoff_time | |
| ) | |
| if provider_id: | |
| query = query.filter(ScheduleCompliance.provider_id == provider_id) | |
| if late_only: | |
| query = query.filter(ScheduleCompliance.on_time == False) | |
| compliance_records = query.order_by(desc(ScheduleCompliance.timestamp)).all() | |
| for record in compliance_records: | |
| session.refresh(record) | |
| return compliance_records | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get schedule compliance: {str(e)}", exc_info=True) | |
| return [] | |
| # ============================================================================ | |
| # Failure Log Operations | |
| # ============================================================================ | |
| def save_failure_log( | |
| self, | |
| provider_id: int, | |
| endpoint: str, | |
| error_type: str, | |
| error_message: Optional[str] = None, | |
| http_status: Optional[int] = None, | |
| retry_attempted: bool = False, | |
| retry_result: Optional[str] = None, | |
| remediation_applied: Optional[str] = None | |
| ) -> Optional[FailureLog]: | |
| """ | |
| Save failure log record | |
| Args: | |
| provider_id: Provider ID | |
| endpoint: API endpoint | |
| error_type: Type of error | |
| error_message: Error message | |
| http_status: HTTP status code | |
| retry_attempted: Whether retry was attempted | |
| retry_result: Result of retry | |
| remediation_applied: Remediation action taken | |
| Returns: | |
| Created FailureLog object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| failure = FailureLog( | |
| provider_id=provider_id, | |
| endpoint=endpoint, | |
| error_type=error_type, | |
| error_message=error_message, | |
| http_status=http_status, | |
| retry_attempted=retry_attempted, | |
| retry_result=retry_result, | |
| remediation_applied=remediation_applied | |
| ) | |
| session.add(failure) | |
| session.commit() | |
| session.refresh(failure) | |
| return failure | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to save failure log: {str(e)}", exc_info=True) | |
| return None | |
| def get_failure_logs( | |
| self, | |
| provider_id: Optional[int] = None, | |
| error_type: Optional[str] = None, | |
| hours: int = 24, | |
| limit: int = 1000 | |
| ) -> List[FailureLog]: | |
| """ | |
| Get failure logs with filtering | |
| Args: | |
| provider_id: Filter by provider ID | |
| error_type: Filter by error type | |
| hours: Get logs from last N hours | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of FailureLog objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| query = session.query(FailureLog).filter( | |
| FailureLog.timestamp >= cutoff_time | |
| ) | |
| if provider_id: | |
| query = query.filter(FailureLog.provider_id == provider_id) | |
| if error_type: | |
| query = query.filter(FailureLog.error_type == error_type) | |
| failures = query.order_by(desc(FailureLog.timestamp)).limit(limit).all() | |
| for failure in failures: | |
| session.refresh(failure) | |
| return failures | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get failure logs: {str(e)}", exc_info=True) | |
| return [] | |
| # ============================================================================ | |
| # Alert Operations | |
| # ============================================================================ | |
| def create_alert( | |
| self, | |
| provider_id: int, | |
| alert_type: str, | |
| message: str, | |
| severity: str = "medium" | |
| ) -> Optional[Alert]: | |
| """ | |
| Create an alert | |
| Args: | |
| provider_id: Provider ID | |
| alert_type: Type of alert | |
| message: Alert message | |
| severity: Alert severity (low, medium, high, critical) | |
| Returns: | |
| Created Alert object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| alert = Alert( | |
| provider_id=provider_id, | |
| alert_type=alert_type, | |
| message=message, | |
| severity=severity | |
| ) | |
| session.add(alert) | |
| session.commit() | |
| session.refresh(alert) | |
| logger.warning(f"Alert created: {alert_type} - {message}") | |
| return alert | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to create alert: {str(e)}", exc_info=True) | |
| return None | |
| def get_alerts( | |
| self, | |
| provider_id: Optional[int] = None, | |
| alert_type: Optional[str] = None, | |
| severity: Optional[str] = None, | |
| acknowledged: Optional[bool] = None, | |
| hours: int = 24 | |
| ) -> List[Alert]: | |
| """ | |
| Get alerts with filtering | |
| Args: | |
| provider_id: Filter by provider ID | |
| alert_type: Filter by alert type | |
| severity: Filter by severity | |
| acknowledged: Filter by acknowledgment status | |
| hours: Get alerts from last N hours | |
| Returns: | |
| List of Alert objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| query = session.query(Alert).filter( | |
| Alert.timestamp >= cutoff_time | |
| ) | |
| if provider_id: | |
| query = query.filter(Alert.provider_id == provider_id) | |
| if alert_type: | |
| query = query.filter(Alert.alert_type == alert_type) | |
| if severity: | |
| query = query.filter(Alert.severity == severity) | |
| if acknowledged is not None: | |
| query = query.filter(Alert.acknowledged == acknowledged) | |
| alerts = query.order_by(desc(Alert.timestamp)).all() | |
| for alert in alerts: | |
| session.refresh(alert) | |
| return alerts | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get alerts: {str(e)}", exc_info=True) | |
| return [] | |
| def acknowledge_alert(self, alert_id: int) -> bool: | |
| """ | |
| Acknowledge an alert | |
| Args: | |
| alert_id: Alert ID | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| alert = session.query(Alert).filter(Alert.id == alert_id).first() | |
| if not alert: | |
| logger.warning(f"Alert not found: {alert_id}") | |
| return False | |
| alert.acknowledged = True | |
| alert.acknowledged_at = datetime.utcnow() | |
| session.commit() | |
| logger.info(f"Alert acknowledged: {alert_id}") | |
| return True | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to acknowledge alert: {str(e)}", exc_info=True) | |
| return False | |
| # ============================================================================ | |
| # System Metrics Operations | |
| # ============================================================================ | |
| def save_system_metrics( | |
| self, | |
| total_providers: int, | |
| online_count: int, | |
| degraded_count: int, | |
| offline_count: int, | |
| avg_response_time_ms: float, | |
| total_requests_hour: int, | |
| total_failures_hour: int, | |
| system_health: str = "healthy" | |
| ) -> Optional[SystemMetrics]: | |
| """ | |
| Save system metrics snapshot | |
| Args: | |
| total_providers: Total number of providers | |
| online_count: Number of online providers | |
| degraded_count: Number of degraded providers | |
| offline_count: Number of offline providers | |
| avg_response_time_ms: Average response time | |
| total_requests_hour: Total requests in last hour | |
| total_failures_hour: Total failures in last hour | |
| system_health: Overall system health | |
| Returns: | |
| Created SystemMetrics object or None if failed | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| metrics = SystemMetrics( | |
| total_providers=total_providers, | |
| online_count=online_count, | |
| degraded_count=degraded_count, | |
| offline_count=offline_count, | |
| avg_response_time_ms=avg_response_time_ms, | |
| total_requests_hour=total_requests_hour, | |
| total_failures_hour=total_failures_hour, | |
| system_health=system_health | |
| ) | |
| session.add(metrics) | |
| session.commit() | |
| session.refresh(metrics) | |
| return metrics | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to save system metrics: {str(e)}", exc_info=True) | |
| return None | |
| def get_system_metrics(self, hours: int = 24, limit: int = 1000) -> List[SystemMetrics]: | |
| """ | |
| Get system metrics history | |
| Args: | |
| hours: Get metrics from last N hours | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of SystemMetrics objects | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| metrics = session.query(SystemMetrics).filter( | |
| SystemMetrics.timestamp >= cutoff_time | |
| ).order_by(desc(SystemMetrics.timestamp)).limit(limit).all() | |
| for metric in metrics: | |
| session.refresh(metric) | |
| return metrics | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get system metrics: {str(e)}", exc_info=True) | |
| return [] | |
| def get_latest_system_metrics(self) -> Optional[SystemMetrics]: | |
| """ | |
| Get the most recent system metrics | |
| Returns: | |
| Latest SystemMetrics object or None | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| metrics = session.query(SystemMetrics).order_by( | |
| desc(SystemMetrics.timestamp) | |
| ).first() | |
| if metrics: | |
| session.refresh(metrics) | |
| return metrics | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get latest system metrics: {str(e)}", exc_info=True) | |
| return None | |
| # ============================================================================ | |
| # Advanced Analytics Methods | |
| # ============================================================================ | |
| def get_provider_stats(self, provider_id: int, hours: int = 24) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive statistics for a provider | |
| Args: | |
| provider_id: Provider ID | |
| hours: Time window in hours | |
| Returns: | |
| Dictionary with provider statistics | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| # Get provider info | |
| provider = session.query(Provider).filter(Provider.id == provider_id).first() | |
| if not provider: | |
| return {} | |
| # Connection attempt stats | |
| connection_stats = session.query( | |
| func.count(ConnectionAttempt.id).label('total_attempts'), | |
| func.sum(func.case((ConnectionAttempt.status == 'success', 1), else_=0)).label('successful'), | |
| func.sum(func.case((ConnectionAttempt.status == 'failed', 1), else_=0)).label('failed'), | |
| func.sum(func.case((ConnectionAttempt.status == 'timeout', 1), else_=0)).label('timeout'), | |
| func.sum(func.case((ConnectionAttempt.status == 'rate_limited', 1), else_=0)).label('rate_limited'), | |
| func.avg(ConnectionAttempt.response_time_ms).label('avg_response_time') | |
| ).filter( | |
| ConnectionAttempt.provider_id == provider_id, | |
| ConnectionAttempt.timestamp >= cutoff_time | |
| ).first() | |
| # Data collection stats | |
| collection_stats = session.query( | |
| func.count(DataCollection.id).label('total_collections'), | |
| func.sum(DataCollection.record_count).label('total_records'), | |
| func.sum(DataCollection.payload_size_bytes).label('total_bytes'), | |
| func.avg(DataCollection.data_quality_score).label('avg_quality'), | |
| func.avg(DataCollection.staleness_minutes).label('avg_staleness') | |
| ).filter( | |
| DataCollection.provider_id == provider_id, | |
| DataCollection.actual_fetch_time >= cutoff_time | |
| ).first() | |
| # Failure stats | |
| failure_count = session.query(func.count(FailureLog.id)).filter( | |
| FailureLog.provider_id == provider_id, | |
| FailureLog.timestamp >= cutoff_time | |
| ).scalar() | |
| # Calculate success rate | |
| total_attempts = connection_stats.total_attempts or 0 | |
| successful = connection_stats.successful or 0 | |
| success_rate = (successful / total_attempts * 100) if total_attempts > 0 else 0 | |
| return { | |
| 'provider_name': provider.name, | |
| 'provider_id': provider_id, | |
| 'time_window_hours': hours, | |
| 'connection_stats': { | |
| 'total_attempts': total_attempts, | |
| 'successful': successful, | |
| 'failed': connection_stats.failed or 0, | |
| 'timeout': connection_stats.timeout or 0, | |
| 'rate_limited': connection_stats.rate_limited or 0, | |
| 'success_rate': round(success_rate, 2), | |
| 'avg_response_time_ms': round(connection_stats.avg_response_time or 0, 2) | |
| }, | |
| 'data_collection_stats': { | |
| 'total_collections': collection_stats.total_collections or 0, | |
| 'total_records': collection_stats.total_records or 0, | |
| 'total_bytes': collection_stats.total_bytes or 0, | |
| 'avg_quality_score': round(collection_stats.avg_quality or 0, 2), | |
| 'avg_staleness_minutes': round(collection_stats.avg_staleness or 0, 2) | |
| }, | |
| 'failure_count': failure_count or 0 | |
| } | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get provider stats: {str(e)}", exc_info=True) | |
| return {} | |
| def get_failure_analysis(self, hours: int = 24) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive failure analysis across all providers | |
| Args: | |
| hours: Time window in hours | |
| Returns: | |
| Dictionary with failure analysis | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| # Failures by error type | |
| error_type_stats = session.query( | |
| FailureLog.error_type, | |
| func.count(FailureLog.id).label('count') | |
| ).filter( | |
| FailureLog.timestamp >= cutoff_time | |
| ).group_by(FailureLog.error_type).all() | |
| # Failures by provider | |
| provider_stats = session.query( | |
| Provider.name, | |
| func.count(FailureLog.id).label('count') | |
| ).join( | |
| FailureLog, Provider.id == FailureLog.provider_id | |
| ).filter( | |
| FailureLog.timestamp >= cutoff_time | |
| ).group_by(Provider.name).order_by(desc('count')).limit(10).all() | |
| # Retry statistics | |
| retry_stats = session.query( | |
| func.sum(func.case((FailureLog.retry_attempted == True, 1), else_=0)).label('total_retries'), | |
| func.sum(func.case((FailureLog.retry_result == 'success', 1), else_=0)).label('successful_retries') | |
| ).filter( | |
| FailureLog.timestamp >= cutoff_time | |
| ).first() | |
| total_retries = retry_stats.total_retries or 0 | |
| successful_retries = retry_stats.successful_retries or 0 | |
| retry_success_rate = (successful_retries / total_retries * 100) if total_retries > 0 else 0 | |
| return { | |
| 'time_window_hours': hours, | |
| 'failures_by_error_type': [ | |
| {'error_type': stat.error_type, 'count': stat.count} | |
| for stat in error_type_stats | |
| ], | |
| 'top_failing_providers': [ | |
| {'provider': stat.name, 'failure_count': stat.count} | |
| for stat in provider_stats | |
| ], | |
| 'retry_statistics': { | |
| 'total_retries': total_retries, | |
| 'successful_retries': successful_retries, | |
| 'retry_success_rate': round(retry_success_rate, 2) | |
| } | |
| } | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get failure analysis: {str(e)}", exc_info=True) | |
| return {} | |
| def get_recent_logs( | |
| self, | |
| log_type: str, | |
| provider_id: Optional[int] = None, | |
| hours: int = 1, | |
| limit: int = 100 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get recent logs of specified type with filtering | |
| Args: | |
| log_type: Type of logs (connection, failure, collection, rate_limit) | |
| provider_id: Filter by provider ID | |
| hours: Get logs from last N hours | |
| limit: Maximum number of records | |
| Returns: | |
| List of log dictionaries | |
| """ | |
| try: | |
| cutoff_time = datetime.utcnow() - timedelta(hours=hours) | |
| if log_type == 'connection': | |
| attempts = self.get_connection_attempts(provider_id=provider_id, hours=hours, limit=limit) | |
| return [ | |
| { | |
| 'id': a.id, | |
| 'timestamp': a.timestamp.isoformat(), | |
| 'provider_id': a.provider_id, | |
| 'endpoint': a.endpoint, | |
| 'status': a.status, | |
| 'response_time_ms': a.response_time_ms, | |
| 'http_status_code': a.http_status_code, | |
| 'error_type': a.error_type, | |
| 'error_message': a.error_message | |
| } | |
| for a in attempts | |
| ] | |
| elif log_type == 'failure': | |
| failures = self.get_failure_logs(provider_id=provider_id, hours=hours, limit=limit) | |
| return [ | |
| { | |
| 'id': f.id, | |
| 'timestamp': f.timestamp.isoformat(), | |
| 'provider_id': f.provider_id, | |
| 'endpoint': f.endpoint, | |
| 'error_type': f.error_type, | |
| 'error_message': f.error_message, | |
| 'http_status': f.http_status, | |
| 'retry_attempted': f.retry_attempted, | |
| 'retry_result': f.retry_result | |
| } | |
| for f in failures | |
| ] | |
| elif log_type == 'collection': | |
| collections = self.get_data_collections(provider_id=provider_id, hours=hours, limit=limit) | |
| return [ | |
| { | |
| 'id': c.id, | |
| 'provider_id': c.provider_id, | |
| 'category': c.category, | |
| 'scheduled_time': c.scheduled_time.isoformat(), | |
| 'actual_fetch_time': c.actual_fetch_time.isoformat(), | |
| 'record_count': c.record_count, | |
| 'payload_size_bytes': c.payload_size_bytes, | |
| 'data_quality_score': c.data_quality_score, | |
| 'on_schedule': c.on_schedule | |
| } | |
| for c in collections | |
| ] | |
| elif log_type == 'rate_limit': | |
| usage = self.get_rate_limit_usage(provider_id=provider_id, hours=hours) | |
| return [ | |
| { | |
| 'id': u.id, | |
| 'timestamp': u.timestamp.isoformat(), | |
| 'provider_id': u.provider_id, | |
| 'limit_type': u.limit_type, | |
| 'limit_value': u.limit_value, | |
| 'current_usage': u.current_usage, | |
| 'percentage': u.percentage, | |
| 'reset_time': u.reset_time.isoformat() | |
| } | |
| for u in usage[:limit] | |
| ] | |
| else: | |
| logger.warning(f"Unknown log type: {log_type}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Failed to get recent logs: {str(e)}", exc_info=True) | |
| return [] | |
| def cleanup_old_data(self, days: int = 30) -> Dict[str, int]: | |
| """ | |
| Remove old records from the database to manage storage | |
| Args: | |
| days: Remove records older than N days | |
| Returns: | |
| Dictionary with count of deleted records per table | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| cutoff_time = datetime.utcnow() - timedelta(days=days) | |
| deleted_counts = {} | |
| # Clean connection attempts | |
| deleted = session.query(ConnectionAttempt).filter( | |
| ConnectionAttempt.timestamp < cutoff_time | |
| ).delete() | |
| deleted_counts['connection_attempts'] = deleted | |
| # Clean data collections | |
| deleted = session.query(DataCollection).filter( | |
| DataCollection.actual_fetch_time < cutoff_time | |
| ).delete() | |
| deleted_counts['data_collections'] = deleted | |
| # Clean rate limit usage | |
| deleted = session.query(RateLimitUsage).filter( | |
| RateLimitUsage.timestamp < cutoff_time | |
| ).delete() | |
| deleted_counts['rate_limit_usage'] = deleted | |
| # Clean schedule compliance | |
| deleted = session.query(ScheduleCompliance).filter( | |
| ScheduleCompliance.timestamp < cutoff_time | |
| ).delete() | |
| deleted_counts['schedule_compliance'] = deleted | |
| # Clean failure logs | |
| deleted = session.query(FailureLog).filter( | |
| FailureLog.timestamp < cutoff_time | |
| ).delete() | |
| deleted_counts['failure_logs'] = deleted | |
| # Clean acknowledged alerts | |
| deleted = session.query(Alert).filter( | |
| and_( | |
| Alert.timestamp < cutoff_time, | |
| Alert.acknowledged == True | |
| ) | |
| ).delete() | |
| deleted_counts['alerts'] = deleted | |
| # Clean system metrics | |
| deleted = session.query(SystemMetrics).filter( | |
| SystemMetrics.timestamp < cutoff_time | |
| ).delete() | |
| deleted_counts['system_metrics'] = deleted | |
| session.commit() | |
| total_deleted = sum(deleted_counts.values()) | |
| logger.info(f"Cleaned up {total_deleted} old records (older than {days} days)") | |
| return deleted_counts | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to cleanup old data: {str(e)}", exc_info=True) | |
| return {} | |
| def get_database_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get database statistics | |
| Returns: | |
| Dictionary with database statistics | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| stats = { | |
| 'providers': session.query(func.count(Provider.id)).scalar(), | |
| 'connection_attempts': session.query(func.count(ConnectionAttempt.id)).scalar(), | |
| 'data_collections': session.query(func.count(DataCollection.id)).scalar(), | |
| 'rate_limit_usage': session.query(func.count(RateLimitUsage.id)).scalar(), | |
| 'schedule_configs': session.query(func.count(ScheduleConfig.id)).scalar(), | |
| 'schedule_compliance': session.query(func.count(ScheduleCompliance.id)).scalar(), | |
| 'failure_logs': session.query(func.count(FailureLog.id)).scalar(), | |
| 'alerts': session.query(func.count(Alert.id)).scalar(), | |
| 'system_metrics': session.query(func.count(SystemMetrics.id)).scalar(), | |
| } | |
| # Get database file size if it exists | |
| if os.path.exists(self.db_path): | |
| stats['database_size_mb'] = round(os.path.getsize(self.db_path) / (1024 * 1024), 2) | |
| else: | |
| stats['database_size_mb'] = 0 | |
| return stats | |
| except SQLAlchemyError as e: | |
| logger.error(f"Failed to get database stats: {str(e)}", exc_info=True) | |
| return {} | |
| def health_check(self) -> Dict[str, Any]: | |
| """ | |
| Perform database health check | |
| Returns: | |
| Dictionary with health check results | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| # Test connection with a simple query | |
| result = session.execute(text("SELECT 1")).scalar() | |
| # Get stats | |
| stats = self.get_database_stats() | |
| return { | |
| 'status': 'healthy' if result == 1 else 'unhealthy', | |
| 'database_path': self.db_path, | |
| 'database_exists': os.path.exists(self.db_path), | |
| 'stats': stats, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Health check failed: {str(e)}", exc_info=True) | |
| return { | |
| 'status': 'unhealthy', | |
| 'error': str(e), | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| # ============================================================================ | |
| # Global Database Manager Instance | |
| # ============================================================================ | |
| # Create a global instance (can be reconfigured as needed) | |
| db_manager = DatabaseManager() | |
| # ============================================================================ | |
| # Convenience Functions | |
| # ============================================================================ | |
| def init_db(db_path: str = "data/api_monitor.db") -> DatabaseManager: | |
| """ | |
| Initialize database and return manager instance | |
| Args: | |
| db_path: Path to database file | |
| Returns: | |
| DatabaseManager instance | |
| """ | |
| manager = DatabaseManager(db_path=db_path) | |
| manager.init_database() | |
| logger.info("Database initialized successfully") | |
| return manager | |
| if __name__ == "__main__": | |
| # Example usage and testing | |
| print("Database Manager Module") | |
| print("=" * 80) | |
| # Initialize database | |
| manager = init_db() | |
| # Run health check | |
| health = manager.health_check() | |
| print(f"\nHealth Check: {health['status']}") | |
| print(f"Database Stats: {health.get('stats', {})}") | |
| # Get database statistics | |
| stats = manager.get_database_stats() | |
| print(f"\nDatabase Statistics:") | |
| for table, count in stats.items(): | |
| if table != 'database_size_mb': | |
| print(f" {table}: {count}") | |
| print(f" Database Size: {stats.get('database_size_mb', 0)} MB") | |