Spaces:
Running
Running
| import aiohttp | |
| import certifi | |
| import os | |
| import ssl | |
| from typing import Dict, Any | |
| SERPER_API_KEY = str.strip(os.getenv("SERPER_API_KEY", "")) | |
| AIOHTTP_TIMEOUT = int(os.getenv("AIOHTTP_TIMEOUT", "15")) | |
| if not SERPER_API_KEY: | |
| raise ValueError("SERPER_API_KEY environment variable is not set.") | |
| async def google(q: str, results: int = 5) -> Dict[str, Any]: | |
| url = "https://google.serper.dev/search" | |
| return await fetch_json(url, { | |
| "q": q, | |
| "num": results, | |
| "page": 1, | |
| }) | |
| async def fetch_json(url: str, payload: dict) -> Dict[str, Any]: | |
| headers = { | |
| 'X-API-KEY': SERPER_API_KEY, | |
| 'Content-Type': 'application/json' | |
| } | |
| ssl_context = ssl.create_default_context(cafile=certifi.where()) | |
| connector = aiohttp.TCPConnector(ssl=ssl_context) | |
| timeout = aiohttp.ClientTimeout(total=AIOHTTP_TIMEOUT) | |
| async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: | |
| async with session.post(url, headers=headers, json=payload) as response: | |
| response.raise_for_status() | |
| return await response.json() | |