Spaces:
Running
Running
| import chromadb | |
| import uuid | |
| import os | |
| from chunker import llama_index_sentence_splitter | |
| from scrape import get_url_content | |
| from web_search import google | |
| client = chromadb.Client() | |
| COLLECTION_NAME = str.strip(os.getenv("RAG_INDEX", "default_index")) or "default_index" | |
| async def search(query: str, top_k: int = 5) -> list: | |
| """ | |
| Search the ChromaDB collection for documents similar to the query. | |
| Arguments: | |
| query (str): The search query. | |
| top_k (int): The number of top results to return. | |
| Returns: | |
| list: A list of dictionaries containing the search results, including documents and metadata. | |
| """ | |
| print("Searching ChromaDB collection for documents similar to the query.") | |
| if not query: | |
| raise ValueError("Query cannot be empty.") | |
| web_search = await google(q=f"{query} -filetype:pdf", results=2) | |
| _index_links([result["link"] for result in web_search["organic"]]) | |
| results = _search_k(query, top_k) | |
| print(f"Found {len(results['documents'])} documents matching the query.") | |
| return [ | |
| { | |
| "content": doc, | |
| "distance": distance, | |
| "metadata": metadata, | |
| } | |
| for i, (doc, metadata, distance) in | |
| enumerate(zip(results["documents"], results["metadatas"], results["distances"])) | |
| ] | |
| def _index_links(links: list) -> int: | |
| """ | |
| Index a list of URLs by adding their content to the ChromaDB collection. | |
| Arguments: | |
| links (list): A list of URLs to index. | |
| Returns: | |
| int: The total number of chunks added to the collection. | |
| """ | |
| from concurrent.futures import ThreadPoolExecutor | |
| print("Indexing multiple URLs:", links) | |
| with ThreadPoolExecutor() as executor: | |
| tasks = [lambda link=link: _index_url(link) for link in links] | |
| running_tasks = [executor.submit(task) for task in tasks] | |
| for running_task in running_tasks: | |
| running_task.result() | |
| total_chunks = sum(task.result() for task in running_tasks) | |
| print(f"Total chunks indexed from {len(links)} URLs: {total_chunks}") | |
| return total_chunks | |
| def _url_exists(url: str) -> bool: | |
| """ | |
| Check if a URL is already indexed in the ChromaDB collection. | |
| Arguments: | |
| url (str): The URL to check. | |
| Returns: | |
| bool: True if the URL is indexed, False otherwise. | |
| """ | |
| print("Checking if URL exists in the collection:", url) | |
| collection = _get_collection() | |
| # Check if the document with the given source exists | |
| exists = len(collection.get( | |
| where={"source": url}, | |
| limit=1, | |
| include=["documents"] | |
| ).get("documents", [])) > 0 | |
| print(f"URL {url} exists: {exists}") | |
| return exists | |
| def _index_url(url: str) -> int: | |
| """ | |
| Index a URL by adding its content to the ChromaDB collection. | |
| Arguments: | |
| url (str): The URL to index. | |
| Returns: | |
| int: The total number of chunks added to the collection. | |
| """ | |
| print("Indexing URL", url) | |
| if _url_exists(url): | |
| print(f"URL {url} is already indexed. Skipping indexing.") | |
| return 0 | |
| document = get_url_content(url) | |
| if not document: | |
| print("No content found at the provided URL.") | |
| return 0 | |
| total_chunks = _add_document_to_collection(document, url) | |
| print(f"Indexed {total_chunks} chunks from URL: {url}") | |
| return total_chunks | |
| def _get_collection() -> "chromadb.Collection": | |
| """ | |
| Get the collection from the ChromaDB client. | |
| :return: The collection object. | |
| """ | |
| collection = client.get_or_create_collection(COLLECTION_NAME) | |
| print(f"Using collection: {COLLECTION_NAME} with {collection.count()} indexed chunks") | |
| return collection | |
| def _add_document_to_collection(document: str, source: str): | |
| """ | |
| Adds a document to the ChromaDB collection. | |
| Args: | |
| document (str): The content of the document to be added. | |
| source (str): The source URI of the document.s | |
| Returns: | |
| The upserted document with its metadata in the collection. | |
| """ | |
| collection = _get_collection() | |
| document_chunks = llama_index_sentence_splitter( | |
| documents=[document], | |
| document_ids=[source], | |
| ) | |
| if not document_chunks: | |
| print("No document chunks were created. Please check the input document.") | |
| return 0 | |
| collection.upsert( | |
| ids=[str(uuid.uuid4().hex) for _ in document_chunks], | |
| documents=[chunk["content"] for chunk in document_chunks], | |
| metadatas=[ | |
| {"source": source, "chunk_id": i} | |
| for i in range(0, len(document_chunks)) | |
| ], | |
| ) | |
| return len(document_chunks) | |
| def _search_k(query: str, k: int = 5): | |
| """ | |
| Search the ChromaDB collection for the top k documents matching the query. | |
| Arguments: | |
| query (str): The search query. | |
| k (int): The number of top results to return. | |
| Returns: | |
| dict: A dictionary containing the search results, including documents and metadata. | |
| """ | |
| collection = _get_collection() | |
| results = collection.query( | |
| query_texts=[query], | |
| n_results=k, | |
| include=["documents", "metadatas", "distances"], | |
| ) | |
| if not results or not results.get("documents"): | |
| print("No results found for the query.") | |
| return { | |
| "documents": [], | |
| "metadatas": [], | |
| "distances": [] | |
| } | |
| query_results = { | |
| "documents": results["documents"][0], | |
| "metadatas": results["metadatas"][0], | |
| "distances": results["distances"][0] | |
| } | |
| return query_results | |