Spaces:
Runtime error
Runtime error
| import os | |
| #DSPY | |
| import dspy | |
| from dspy import Prediction | |
| from dspy.evaluate import Evaluate | |
| from dspy import Prediction | |
| from dspy.teleprompt import BootstrapFewShot | |
| from dspy.teleprompt import BootstrapFewShotWithRandomSearch | |
| # Data handling | |
| # import pandas as pd | |
| # Calculations and formatting | |
| import re | |
| from decimal import Decimal | |
| # UI | |
| import gradio as gr | |
| from gradio_pdf import PDF | |
| # PDF handling | |
| import pdfplumber | |
| pdf_examples_dir = './pdfexamples/' | |
| # model = dspy.LM( | |
| # model='gpt-3.5-turbo', | |
| # api_key=os.getenv('OPENAI_PROJECT_KEY'), | |
| # max_tokens=2000, | |
| # temperature=0.01) | |
| model = dspy.OpenAI( | |
| model='gpt-3.5-turbo-0125', | |
| api_key=os.getenv('OPENAI_PROJECT_KEY'), | |
| max_tokens=2000, | |
| temperature=0.01) | |
| dspy.settings.configure(lm=model) | |
| # Utils | |
| def parse_CSV_string(csv_string): | |
| # Parses a CSV string into a list | |
| return list(map(str.strip, csv_string.split(','))) | |
| def parse_CSV_string_to_unique(csv_string): | |
| # Parses a CSV string into a unique list | |
| if not csv_string: | |
| return [] | |
| return list(set(map(str.lower, map(str.strip, csv_string.split(','))))) | |
| def parse_list_of_CSV_strings(list_of_csv_strings): | |
| # Parses a list of CSV strings with invoice numbers into a list of lists | |
| parsed_csv_list = [] | |
| for csv_string in list_of_csv_strings: | |
| parsed_csv_list.append(parse_CSV_string_to_unique(csv_string)) | |
| return parsed_csv_list | |
| def parse_column_names(s): | |
| """ | |
| Parse a comma-separated list of column names from a string. | |
| Removes the prefix string before splitting the string. | |
| Args: | |
| s: raw response from the model, comma-separated list of column names (string) | |
| Returns: | |
| list of column names (list of strings) | |
| """ | |
| prefix = 'Column Header Names:' | |
| prefix_length = len(prefix) | |
| # r_index = s.rfind(prefix) | |
| # s = s[r_index+prefix_length:] if r_index != -1 else s | |
| if s.strip().lower().startswith(prefix.lower()): | |
| s = s[prefix_length:] | |
| return list(map(str.strip,s.split(','))) | |
| def remove_duplicate_lists(lists): | |
| """ | |
| Remove duplicate lists from a list of lists. | |
| Args: | |
| lists: | |
| a list of lists of strings | |
| Returns: | |
| a list of lists of strings, where each list is unique | |
| """ | |
| seen = set() | |
| unique_lists = [] | |
| for lst in lists: | |
| sorted_list = tuple(sorted(lst)) | |
| if sorted_list not in seen: | |
| seen.add(sorted_list) | |
| unique_lists.append(lst) | |
| return unique_lists | |
| def parse_invoice_number(s): | |
| # Return the invoice number in a specific format if found, otherwise just return the input string | |
| rp = r'^\s*?([\S\d]+\d{6})' | |
| m = re.search(rp, s) | |
| return m.group(1) if m else s | |
| def standardize_number(s): | |
| # Find the last occurrence of a comma or period | |
| last_separator_index = max(s.rfind(','), s.rfind('.')) | |
| if last_separator_index != -1: | |
| # Split the string into two parts | |
| before_separator = s[:last_separator_index] | |
| after_separator = s[last_separator_index+1:] | |
| # Clean the first part of any commas, periods, or whitespace | |
| before_separator_cleaned = re.sub(r'[.,\s]', '', before_separator) | |
| # Ensure the decimal part starts with a period, even if it was a comma | |
| standardized_s = before_separator_cleaned + '.' + after_separator | |
| else: | |
| # If there's no separator, just remove commas, periods, or whitespace | |
| standardized_s = re.sub(r'[.,\s]', '', s) | |
| return standardized_s | |
| def remove_chars_after_last_digit(s): | |
| # Remove any non-digit characters following the last digit in the string | |
| return re.sub(r'(?<=\d)[^\d]*$', '', s) | |
| def clean_text(s): | |
| # This pattern looks for: | |
| # - Optional non-digit or non-negative sign characters followed by whitespace (if any) | |
| # - Followed by any characters until a digit is found in the word | |
| # It then replaces this matched portion with the remaining part of the word from the first digit | |
| # cleaned_s = re.sub(r'\S*?\s*?(\S*\d\S*)', r'\1', s) | |
| cleaned_s = re.sub(r'[^\d-]*\s?(\S*\d\S*)', r'\1', s) | |
| return cleaned_s | |
| def format_text_decimal(text_decimal): | |
| # Run functions to format a text decimal | |
| if not text_decimal: | |
| return '' | |
| return clean_text(remove_chars_after_last_digit(standardize_number(text_decimal.strip().lower()))) | |
| # PDF handling | |
| def extract_text_using_pdfplumber(file_path): | |
| # TODO: add check for text vs image PDF | |
| with pdfplumber.open(file_path) as pdf: | |
| extracted_text = '' | |
| for i, page in enumerate(pdf.pages): | |
| # Remove duplicate characters from the page | |
| deduped_page = page.dedupe_chars(tolerance=1) | |
| extracted_text += deduped_page.extract_text() | |
| return extracted_text | |
| def get_PDF_examples(directory): | |
| example_pdf_files = [] | |
| for filename in os.listdir(directory): | |
| if filename.endswith('.pdf'): | |
| example_pdf_files.append([os.path.join(directory, filename), '', '']) | |
| return example_pdf_files | |
| # Signatures and Models | |
| class FindInvoiceNumberColumns(dspy.Signature): | |
| """Given an input remittance letter, return a list of column header names that may contain invoice numbers.""" | |
| content = dspy.InputField(desc="remittance letter", format=lambda s:s) # s:s so it doesn't skip the new lines | |
| column_header_names = dspy.OutputField(desc="comma-separated list of column header names that may contain " | |
| "invoice numbers") | |
| class InvoiceColumnHeaders(dspy.Module): | |
| """ | |
| Predict the column headers containing invoice numbers from the remittance letter. | |
| Attributes: | |
| response_parser: a function that takes a string and returns a list of strings. | |
| """ | |
| def __init__(self, response_parser=parse_CSV_string): | |
| super().__init__() | |
| self.response_parser = response_parser | |
| self.potential_invoice_column_headers = dspy.Predict(FindInvoiceNumberColumns) | |
| def forward(self, file_content): | |
| prediction = self.potential_invoice_column_headers(content=file_content) | |
| # Remove duplicates from the prediction | |
| unique_headers = list(set(self.response_parser(prediction.column_header_names))) | |
| # Create a new Prediction object with the unique headers | |
| return Prediction(column_header_names=unique_headers) | |
| class FindInvoiceList(dspy.Signature): | |
| """Given an input remittance letter and a column header name output a comma-separated list of all invoice numbers """ | |
| """that belong to that column.""" | |
| content = dspy.InputField(desc="remittance letter", format=lambda s:s) # s:s so it doesn't skip the new lines | |
| invoice_column_header = dspy.InputField(desc="invoice column header name") | |
| candidate_invoice_numbers = dspy.OutputField(desc="comma-separated list of invoice numbers") | |
| class InvoiceList(dspy.Module): | |
| """ | |
| Retrieves a list of list of potential invoice numbers from a remittance letter. | |
| Attributes: | |
| response_parser: A function that takes a string and returns a list of invoice numbers. | |
| Returns: | |
| A Prediction object with the following fields: | |
| candidate_invoice_numbers: A list of lists of invoice numbers. | |
| """ | |
| def __init__(self, response_parser=parse_CSV_string_to_unique): | |
| super().__init__() | |
| self.response_parser = response_parser | |
| self.find_invoice_headers = InvoiceColumnHeaders(response_parser=parse_column_names) # here we could load a compiled program also | |
| self.find_invoice_numbers = dspy.Predict(FindInvoiceList) | |
| def forward(self, file_content): | |
| predict_column_headers = self.find_invoice_headers(file_content=file_content) | |
| potential_invoice_column_headers = predict_column_headers.column_header_names | |
| candidates = [] | |
| for header in potential_invoice_column_headers: | |
| prediction = self.find_invoice_numbers(content=file_content, invoice_column_header=header) | |
| invoice_number_list = self.response_parser(prediction.candidate_invoice_numbers) | |
| candidates.append(invoice_number_list) | |
| # Remove duplicates | |
| candidates = remove_duplicate_lists(candidates) | |
| return Prediction(candidate_invoice_numbers=candidates) | |
| class FindTotalAmountColumns(dspy.Signature): | |
| """Given an input remittance letter, return a list of column header names that may contain the total payment amount.""" | |
| content = dspy.InputField(desc="remittance letter", format=lambda s:s) # s:s so it doesn't skip the new lines | |
| total_column_header_names = dspy.OutputField(desc="comma-separated list of column header names that may contain " | |
| "the remittance letter total payment amount") | |
| class TotalAmountColumnHeaders(dspy.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.potential_total_amount_column_headers = dspy.Predict(FindTotalAmountColumns) | |
| def forward(self, file_content): | |
| prediction = self.potential_total_amount_column_headers(content=file_content) | |
| return prediction | |
| class FindTotalAmount(dspy.Signature): | |
| """Given an input remittance letter and a column header name output the total payment amount """ | |
| """that belongs to that column.""" | |
| content = dspy.InputField(desc="remittance letter", format=lambda s:s) # s:s so it doesn't skip the new lines | |
| total_amount_column_header = dspy.InputField(desc="total amount header name") | |
| total_amount = dspy.OutputField(desc="total payment amount") | |
| class RemittanceLetterTotalAmount(dspy.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.find_total_amount_header = TotalAmountColumnHeaders() | |
| self.find_total_amount = dspy.Predict(FindTotalAmount) | |
| def forward(self, file_content): | |
| # Predict column headers (returns a Prediction with a CSV string in "column_header_names") | |
| predict_column_headers = self.find_total_amount_header(file_content=file_content) | |
| # Parse CSV into a list | |
| potential_total_amount_column_headers = parse_CSV_string_to_unique(predict_column_headers.total_column_header_names) | |
| potential_total_amounts = [] | |
| for header in potential_total_amount_column_headers: | |
| prediction = self.find_total_amount(content=file_content, total_amount_column_header=header) | |
| potential_total_amounts.append(prediction.total_amount) | |
| # Remove duplicates | |
| potential_total_amounts = list(set(potential_total_amounts)) | |
| return Prediction(candidate_total_amounts=potential_total_amounts) | |
| # Pipeline with Verification | |
| def poc_production_pipeline_with_verification(file_content, verification_invoices, verification_total_amount): | |
| # Get invoice candidates | |
| invoice_list_baseline = InvoiceList() | |
| candidate_invoices = invoice_list_baseline(file_content=file_content).candidate_invoice_numbers | |
| candidate_invoices = [','.join(sorted(lst)) for lst in candidate_invoices] | |
| # Get total amount candidates | |
| total_amount_baseline = RemittanceLetterTotalAmount() | |
| # Format all decimals | |
| candidate_total_amounts = list(map(format_text_decimal, | |
| total_amount_baseline(file_content=file_content).candidate_total_amounts)) | |
| # Only keep unique amounts | |
| candidate_total_amounts = list(set(candidate_total_amounts)) | |
| # Verify invoices | |
| verification_invoices_list = parse_CSV_string_to_unique(verification_invoices) | |
| verification_invoices_list_sorted = ','.join(sorted(verification_invoices_list)) | |
| validated_invoices = [] | |
| for candidate in candidate_invoices: | |
| if candidate == verification_invoices_list_sorted: | |
| validated_invoices.append(candidate) | |
| # Verify total amount | |
| verification_total_amount_formatted = format_text_decimal(verification_total_amount) | |
| validated_total_amount = [] | |
| for candidate in candidate_total_amounts: | |
| if candidate == verification_total_amount_formatted: | |
| validated_total_amount.append(candidate) | |
| # Prepare output for UI | |
| candidate_invoices_for_UI = [(candidate,) for candidate in candidate_invoices] | |
| candidate_total_amounts_for_UI = [(candidate,) for candidate in candidate_total_amounts] | |
| validated_invoices_for_UI = [(validated,) for validated in validated_invoices] | |
| validated_total_amount_for_UI = [(validated,) for validated in validated_total_amount] | |
| return candidate_invoices_for_UI, candidate_total_amounts_for_UI, validated_invoices_for_UI, validated_total_amount_for_UI | |
| def poc_production_pipeline_with_verification_from_PDF(file_path, verification_invoices, verification_total_amount): | |
| file_content = extract_text_using_pdfplumber(file_path) | |
| return poc_production_pipeline_with_verification(file_content, verification_invoices, verification_total_amount) | |
| # Main app function | |
| def main(): | |
| fake_PDF_examples = get_PDF_examples(pdf_examples_dir) | |
| # remittance_letter_demo_with_verification_from_PDF = gr.Interface( | |
| # poc_production_pipeline_with_verification_from_PDF, | |
| # [ | |
| # PDF(label="Remittance advice", height=800), | |
| # gr.Textbox(label="Verification Invoices (comma-separated)", placeholder="Enter invoice numbers here..."), | |
| # gr.Textbox(label="Verification Total Amount", placeholder="Enter total amount here...") | |
| # ], | |
| # [ | |
| # gr.Dataframe(col_count=(1, 'fixed'), label="", headers=["Retrieved Invoice Proposals"], wrap=True), | |
| # gr.Dataframe(col_count=(1, 'fixed'), label="", headers=["Retrieved Total Amount Proposals"], wrap=True), | |
| # gr.Dataframe(col_count=(1, 'fixed'), label="", headers=["Validated Invoices"], wrap=True), | |
| # gr.Dataframe(col_count=(1, 'fixed'), label="", headers=["Validated Total Amount"], wrap=True) | |
| # ], | |
| # examples=fake_PDF_examples, | |
| # allow_flagging='never' | |
| # ) | |
| with gr.Blocks() as remittance_demo: | |
| gr.Markdown("# Remittance PDF Processor") | |
| gr.Markdown("Upload a PDF file to extract invoice numbers and payment amounts. Provide verification data if available for comparison.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| pdf_input = PDF(label="Remittance advice", height=900) | |
| with gr.Column(): | |
| with gr.Accordion("Verification Inputs", open=False): | |
| verification_invoices = gr.Textbox(label="Verification Invoices (comma-separated)", placeholder="Enter invoice numbers here...") | |
| verification_total_amount = gr.Textbox(label="Verification Total Amount", placeholder="Enter total amount here...") | |
| retrieved_invoices = gr.Dataframe(col_count=(1, 'fixed'), label="", headers=["Retrieved Invoice Proposals"], wrap=True) | |
| retrieved_amounts = gr.Dataframe(col_count=(1, 'fixed'), label="", headers=["Retrieved Total Amount Proposals"], wrap=True) | |
| validated_invoices = gr.Dataframe(col_count=(1, 'fixed'), label="", headers=["Validated Invoices"], wrap=True) | |
| validated_total_amount = gr.Dataframe(col_count=(1, 'fixed'), label="", headers=["Validated Total Amount"], wrap=True) | |
| submit_button = gr.Button("Process document") | |
| submit_button.click( | |
| poc_production_pipeline_with_verification_from_PDF, | |
| inputs=[pdf_input, verification_invoices, verification_total_amount], | |
| outputs=[retrieved_invoices, retrieved_amounts, validated_invoices, validated_total_amount] | |
| ) | |
| gr.Examples( | |
| examples=[[pdf[0]] for pdf in fake_PDF_examples], # We do this so only PDFs are shown | |
| inputs=[pdf_input], | |
| outputs=[retrieved_invoices, retrieved_amounts, validated_invoices, validated_total_amount], | |
| fn=poc_production_pipeline_with_verification_from_PDF, | |
| cache_examples=True | |
| ) | |
| remittance_demo.launch() | |
| # Run the main app if the file is executed directly | |
| if __name__ == "__main__": | |
| main() |