Spaces:
Sleeping
Sleeping
| """ | |
| API utilities for the Fake News Detector application. | |
| This module provides utilities for handling API calls, rate limiting, | |
| error handling, and exponential backoff for retrying failed requests. | |
| """ | |
| import time | |
| import functools | |
| import random | |
| import logging | |
| import requests | |
| from datetime import datetime, timedelta | |
| from collections import deque | |
| from config import RATE_LIMITS, ERROR_BACKOFF | |
| logger = logging.getLogger("misinformation_detector") | |
| class RateLimiter: | |
| """ | |
| Rate limiter for API calls with support for different APIs. | |
| This class implements a token bucket algorithm for rate limiting, | |
| with support for different rate limits for different APIs. | |
| It also provides exponential backoff for error handling. | |
| """ | |
| def __init__(self): | |
| """Initialize the rate limiter with configuration from settings.""" | |
| # Store rate limits for different APIs | |
| self.limits = {} | |
| # Initialize limits from config | |
| for api_name, limit_info in RATE_LIMITS.items(): | |
| self.limits[api_name] = { | |
| "requests": limit_info["requests"], | |
| "period": limit_info["period"], | |
| "timestamps": deque() | |
| } | |
| # Error backoff settings | |
| self.max_retries = ERROR_BACKOFF["max_retries"] | |
| self.initial_backoff = ERROR_BACKOFF["initial_backoff"] | |
| self.backoff_factor = ERROR_BACKOFF["backoff_factor"] | |
| def check_and_update(self, api_name): | |
| """ | |
| Check if request is allowed and update timestamps. | |
| Args: | |
| api_name (str): Name of the API to check | |
| Returns: | |
| tuple: (allowed, wait_time) | |
| - allowed (bool): Whether the request is allowed | |
| - wait_time (float): Time to wait if not allowed | |
| """ | |
| if api_name not in self.limits: | |
| return True, 0 # Unknown API, allow by default | |
| now = datetime.now() | |
| limit_info = self.limits[api_name] | |
| # Remove timestamps older than the period | |
| cutoff = now - timedelta(seconds=limit_info["period"]) | |
| while limit_info["timestamps"] and limit_info["timestamps"][0] < cutoff: | |
| limit_info["timestamps"].popleft() | |
| # Check if we're at the rate limit | |
| if len(limit_info["timestamps"]) >= limit_info["requests"]: | |
| # Calculate wait time until oldest timestamp expires | |
| wait_time = (limit_info["timestamps"][0] + timedelta(seconds=limit_info["period"]) - now).total_seconds() | |
| return False, max(0, wait_time) | |
| # Add current timestamp and allow request | |
| limit_info["timestamps"].append(now) | |
| return True, 0 | |
| def wait_if_needed(self, api_name): | |
| """ | |
| Wait if rate limit is reached. | |
| Args: | |
| api_name (str): Name of the API to check | |
| Returns: | |
| bool: True if waited, False otherwise | |
| """ | |
| allowed, wait_time = self.check_and_update(api_name) | |
| if not allowed: | |
| logger.info(f"Rate limit reached for {api_name}. Waiting {wait_time:.2f} seconds...") | |
| time.sleep(wait_time + 0.1) # Add a small buffer | |
| return True | |
| return False | |
| def get_backoff_time(self, attempt): | |
| """ | |
| Calculate exponential backoff time with jitter. | |
| Args: | |
| attempt (int): Current attempt number (0-based) | |
| Returns: | |
| float: Backoff time in seconds | |
| """ | |
| backoff = self.initial_backoff * (self.backoff_factor ** attempt) | |
| # Add jitter to prevent thundering herd problem | |
| jitter = random.uniform(0, 0.1 * backoff) | |
| return backoff + jitter | |
| # Create rate limiter instance | |
| rate_limiter = RateLimiter() | |
| # API Error Handler decorator | |
| def api_error_handler(api_name): | |
| """ | |
| Decorator for API calls with error handling and rate limiting. | |
| This decorator handles rate limiting, retries with exponential | |
| backoff, and error handling for API calls. | |
| Args: | |
| api_name (str): Name of the API being called | |
| Returns: | |
| callable: Decorated function | |
| """ | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| try: | |
| # Apply rate limiting - make sure rate_limiter exists and has the method | |
| if hasattr(rate_limiter, 'wait_if_needed'): | |
| rate_limiter.wait_if_needed(api_name) | |
| # Track retries | |
| for attempt in range(rate_limiter.max_retries): | |
| try: | |
| return func(*args, **kwargs) | |
| except requests.exceptions.HTTPError as e: | |
| status_code = e.response.status_code if hasattr(e, 'response') else 0 | |
| # Handle specific HTTP errors | |
| if status_code == 429: # Too Many Requests | |
| logger.warning(f"{api_name} rate limit exceeded (429). Attempt {attempt+1}/{rate_limiter.max_retries}") | |
| # Get retry-after header or use exponential backoff | |
| retry_after = e.response.headers.get('Retry-After') | |
| if retry_after and retry_after.isdigit(): | |
| wait_time = int(retry_after) | |
| else: | |
| wait_time = rate_limiter.get_backoff_time(attempt) | |
| logger.info(f"Waiting {wait_time} seconds before retry...") | |
| time.sleep(wait_time) | |
| elif status_code >= 500: # Server errors | |
| logger.warning(f"{api_name} server error ({status_code}). Attempt {attempt+1}/{rate_limiter.max_retries}") | |
| time.sleep(rate_limiter.get_backoff_time(attempt)) | |
| elif status_code == 403: # Forbidden - likely API key issue | |
| logger.error(f"{api_name} access forbidden (403). Check API key.") | |
| return None # Don't retry on auth errors | |
| elif status_code == 404: # Not Found | |
| logger.warning(f"{api_name} resource not found (404).") | |
| return None # Don't retry on resource not found | |
| else: | |
| logger.error(f"{api_name} HTTP error: {e}") | |
| if attempt < rate_limiter.max_retries - 1: | |
| wait_time = rate_limiter.get_backoff_time(attempt) | |
| logger.info(f"Waiting {wait_time} seconds before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| return None | |
| except requests.exceptions.ConnectionError as e: | |
| logger.error(f"{api_name} connection error: {e}") | |
| if attempt < rate_limiter.max_retries - 1: | |
| wait_time = rate_limiter.get_backoff_time(attempt) | |
| logger.info(f"Waiting {wait_time} seconds before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| return None | |
| except requests.exceptions.Timeout as e: | |
| logger.error(f"{api_name} timeout error: {e}") | |
| if attempt < rate_limiter.max_retries - 1: | |
| wait_time = rate_limiter.get_backoff_time(attempt) | |
| logger.info(f"Waiting {wait_time} seconds before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| return None | |
| except Exception as e: | |
| logger.error(f"{api_name} unexpected error: {str(e)}") | |
| if attempt < rate_limiter.max_retries - 1: | |
| wait_time = rate_limiter.get_backoff_time(attempt) | |
| logger.info(f"Waiting {wait_time} seconds before retry...") | |
| time.sleep(wait_time) | |
| else: | |
| return None | |
| # If we've exhausted all retries | |
| logger.error(f"{api_name} call failed after {rate_limiter.max_retries} attempts") | |
| return None | |
| except Exception as e: | |
| # Catch any unexpected errors in the decorator itself | |
| logger.error(f"{api_name} decorator error: {str(e)}") | |
| return None | |
| return wrapper | |
| return decorator | |
| def safe_json_parse(response, api_name): | |
| """ | |
| Safely parse JSON response with error handling. | |
| Args: | |
| response (requests.Response): Response object to parse | |
| api_name (str): Name of the API for logging | |
| Returns: | |
| dict: Parsed JSON or empty dict on error | |
| """ | |
| try: | |
| return response.json() | |
| except ValueError as e: | |
| logger.error(f"Error parsing {api_name} JSON response: {e}") | |
| logger.debug(f"Response content: {response.text[:500]}...") | |
| return {} |