Your Name
feat: UI improvements and error suppression - Enhanced dashboard and market pages with improved header buttons, logo, and currency symbol display - Stopped animated ticker - Removed pie chart legends - Added error suppressor for external service errors (SSE, Permissions-Policy warnings) - Improved header button prominence and icon appearance - Enhanced logo with glow effects and better design - Fixed currency symbol visibility in market tables
8b7b267
| """ | |
| Authentication and Authorization System | |
| Implements JWT-based authentication for production deployments | |
| """ | |
| import os | |
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any | |
| import hashlib | |
| import logging | |
| from functools import wraps | |
| try: | |
| import jwt | |
| JWT_AVAILABLE = True | |
| except ImportError: | |
| JWT_AVAILABLE = False | |
| logging.warning("PyJWT not installed. Authentication disabled. Install with: pip install PyJWT") | |
| logger = logging.getLogger(__name__) | |
| # Configuration | |
| SECRET_KEY = os.getenv('SECRET_KEY', secrets.token_urlsafe(32)) | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES', '60')) | |
| ENABLE_AUTH = os.getenv('ENABLE_AUTH', 'false').lower() == 'true' | |
| class AuthManager: | |
| """ | |
| Authentication manager for API endpoints and dashboard access | |
| Supports JWT tokens and basic API key authentication | |
| """ | |
| def __init__(self): | |
| self.users_db: Dict[str, str] = {} # username -> hashed_password | |
| self.api_keys_db: Dict[str, Dict[str, Any]] = {} # api_key -> metadata | |
| self._load_credentials() | |
| def _load_credentials(self): | |
| """Load credentials from environment variables""" | |
| # Load default admin user | |
| admin_user = os.getenv('ADMIN_USERNAME', 'admin') | |
| admin_pass = os.getenv('ADMIN_PASSWORD') | |
| if admin_pass: | |
| self.users_db[admin_user] = self._hash_password(admin_pass) | |
| logger.info(f"Loaded admin user: {admin_user}") | |
| # Load API keys from environment | |
| api_keys_str = os.getenv('API_KEYS', '') | |
| if api_keys_str: | |
| for key in api_keys_str.split(','): | |
| key = key.strip() | |
| if key: | |
| self.api_keys_db[key] = { | |
| 'created_at': datetime.utcnow(), | |
| 'name': 'env_key', | |
| 'active': True | |
| } | |
| logger.info(f"Loaded {len(self.api_keys_db)} API keys") | |
| def _hash_password(password: str) -> str: | |
| """Hash password using SHA-256""" | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def verify_password(self, username: str, password: str) -> bool: | |
| """ | |
| Verify username and password | |
| Args: | |
| username: Username | |
| password: Plain text password | |
| Returns: | |
| True if valid, False otherwise | |
| """ | |
| if username not in self.users_db: | |
| return False | |
| hashed = self._hash_password(password) | |
| return secrets.compare_digest(self.users_db[username], hashed) | |
| def create_access_token( | |
| self, | |
| username: str, | |
| expires_delta: Optional[timedelta] = None | |
| ) -> str: | |
| """ | |
| Create JWT access token | |
| Args: | |
| username: Username | |
| expires_delta: Token expiration time | |
| Returns: | |
| JWT token string | |
| """ | |
| if not JWT_AVAILABLE: | |
| raise RuntimeError("PyJWT not installed") | |
| if expires_delta is None: | |
| expires_delta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| expire = datetime.utcnow() + expires_delta | |
| payload = { | |
| 'sub': username, | |
| 'exp': expire, | |
| 'iat': datetime.utcnow() | |
| } | |
| token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) | |
| return token | |
| def verify_token(self, token: str) -> Optional[str]: | |
| """ | |
| Verify JWT token and extract username | |
| Args: | |
| token: JWT token string | |
| Returns: | |
| Username if valid, None otherwise | |
| """ | |
| if not JWT_AVAILABLE: | |
| return None | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| username: str = payload.get('sub') | |
| return username | |
| except jwt.ExpiredSignatureError: | |
| logger.warning("Token expired") | |
| return None | |
| except jwt.JWTError as e: | |
| logger.warning(f"Invalid token: {e}") | |
| return None | |
| def verify_api_key(self, api_key: str) -> bool: | |
| """ | |
| Verify API key | |
| Args: | |
| api_key: API key string | |
| Returns: | |
| True if valid and active, False otherwise | |
| """ | |
| if api_key not in self.api_keys_db: | |
| return False | |
| key_data = self.api_keys_db[api_key] | |
| return key_data.get('active', False) | |
| def create_api_key(self, name: str) -> str: | |
| """ | |
| Create new API key | |
| Args: | |
| name: Descriptive name for the key | |
| Returns: | |
| Generated API key | |
| """ | |
| api_key = secrets.token_urlsafe(32) | |
| self.api_keys_db[api_key] = { | |
| 'created_at': datetime.utcnow(), | |
| 'name': name, | |
| 'active': True, | |
| 'usage_count': 0 | |
| } | |
| logger.info(f"Created API key: {name}") | |
| return api_key | |
| def revoke_api_key(self, api_key: str) -> bool: | |
| """ | |
| Revoke API key | |
| Args: | |
| api_key: API key to revoke | |
| Returns: | |
| True if revoked, False if not found | |
| """ | |
| if api_key in self.api_keys_db: | |
| self.api_keys_db[api_key]['active'] = False | |
| logger.info(f"Revoked API key: {self.api_keys_db[api_key]['name']}") | |
| return True | |
| return False | |
| def track_usage(self, api_key: str): | |
| """Track API key usage""" | |
| if api_key in self.api_keys_db: | |
| self.api_keys_db[api_key]['usage_count'] = \ | |
| self.api_keys_db[api_key].get('usage_count', 0) + 1 | |
| # Global auth manager instance | |
| auth_manager = AuthManager() | |
| # ==================== DECORATORS ==================== | |
| def require_auth(func): | |
| """ | |
| Decorator to require authentication for endpoints | |
| Checks for JWT token in Authorization header or API key in X-API-Key header | |
| """ | |
| async def wrapper(*args, **kwargs): | |
| if not ENABLE_AUTH: | |
| # Authentication disabled, allow all requests | |
| return await func(*args, **kwargs) | |
| # Try to get token from request | |
| # This is a placeholder - actual implementation depends on framework (FastAPI, Flask, etc.) | |
| # For FastAPI: | |
| # from fastapi import Header, HTTPException | |
| # authorization: Optional[str] = Header(None) | |
| # api_key: Optional[str] = Header(None, alias="X-API-Key") | |
| # For now, this is a template | |
| raise NotImplementedError("Integrate with your web framework") | |
| return wrapper | |
| def require_api_key(func): | |
| """Decorator to require API key authentication""" | |
| async def wrapper(*args, **kwargs): | |
| if not ENABLE_AUTH: | |
| return await func(*args, **kwargs) | |
| # Template for API key verification | |
| raise NotImplementedError("Integrate with your web framework") | |
| return wrapper | |
| # ==================== HELPER FUNCTIONS ==================== | |
| def authenticate_user(username: str, password: str) -> Optional[str]: | |
| """ | |
| Authenticate user and return JWT token | |
| Args: | |
| username: Username | |
| password: Password | |
| Returns: | |
| JWT token if successful, None otherwise | |
| """ | |
| if not ENABLE_AUTH: | |
| logger.warning("Authentication disabled") | |
| return None | |
| if auth_manager.verify_password(username, password): | |
| return auth_manager.create_access_token(username) | |
| return None | |
| def verify_request_auth( | |
| authorization: Optional[str] = None, | |
| api_key: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Verify request authentication | |
| Args: | |
| authorization: Authorization header (Bearer token) | |
| api_key: X-API-Key header | |
| Returns: | |
| True if authenticated, False otherwise | |
| """ | |
| if not ENABLE_AUTH: | |
| return True | |
| # Check API key first | |
| if api_key and auth_manager.verify_api_key(api_key): | |
| auth_manager.track_usage(api_key) | |
| return True | |
| # Check JWT token | |
| if authorization and authorization.startswith('Bearer '): | |
| token = authorization.split(' ')[1] | |
| username = auth_manager.verify_token(token) | |
| if username: | |
| return True | |
| return False | |