Spaces:
Sleeping
Sleeping
| import subprocess | |
| import sys | |
| try: | |
| import fitz # PyMuPDF | |
| except ModuleNotFoundError: | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "PyMuPDF"]) | |
| import fitz | |
| # --------------------------------------------------------------------- | |
| # 0. Hot‑patch: ensure Gradio‑compatible Pydantic (<2.11) | |
| # --------------------------------------------------------------------- | |
| import os, sys, subprocess | |
| from importlib import metadata | |
| try: | |
| from packaging import version | |
| except ModuleNotFoundError: | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "packaging"]) | |
| from packaging import version | |
| def _ensure_compatible_pydantic(): | |
| try: | |
| cur = version.parse(metadata.version("pydantic")) | |
| except metadata.PackageNotFoundError: | |
| cur = None | |
| if cur is None or cur >= version.parse("2.11"): | |
| print(f"[patch] Installing pydantic<2.11 (current: {cur}) …", flush=True) | |
| subprocess.check_call([ | |
| sys.executable, | |
| "-m", | |
| "pip", | |
| "install", | |
| "--no-cache-dir", | |
| "pydantic<2.11", | |
| "pydantic-core<2.11", | |
| ]) | |
| os.execv(sys.executable, [sys.executable] + sys.argv) | |
| _ensure_compatible_pydantic() | |
| import re | |
| import random | |
| import io | |
| import os | |
| import tempfile | |
| import logging | |
| from datetime import datetime | |
| import gradio as gr | |
| import shutil | |
| # Configura il logging di base | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # --- (Keep the existing update_pdf_bytes function - using the refined version from previous step) --- | |
| def update_pdf_bytes(pdf_bytes, base_date, date_offset=11, job_offset=11, job_option="Sostituisci con C-Stag"): | |
| # (Using the improved version that applies redactions/insertions after iterating spans) | |
| # ... (function code remains the same as the previously refined version) ... | |
| try: | |
| date_pattern = re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})(\.\d+)?') | |
| job_pattern = re.compile(r"((DF25\s+)?- B2020 - Nulla osta/Comunicazione al lavoro subordinato non stagionale nei settori elencati nel DPCM Flussi)") | |
| try: | |
| base = datetime.strptime(base_date, "%Y-%m-%d %H:%M:%S") | |
| except ValueError as e: | |
| raise ValueError("Il formato della data base deve essere 'YYYY-MM-DD HH:MM:SS'.") from e | |
| new_seconds = random.randint(0, 59) | |
| new_fraction = random.randint(0, 999999999) | |
| new_base = base.replace(second=new_seconds) | |
| new_date = new_base.strftime("%Y-%m-%d %H:%M:%S") + f".{new_fraction:09d}" | |
| base_job_text = "- C-Stag - Richiesta di nulla osta/comunicazione al lavoro subordinato stagionale" | |
| doc = None # Initialize doc to None | |
| try: | |
| doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
| except Exception as e: | |
| logging.error("Errore nell'apertura del PDF: %s", e) | |
| raise | |
| redactions_to_apply = {} # Store page_num: [rect1, rect2] | |
| insertions_to_make = {} # Store page_num: [item1, item2] | |
| for page_num, page in enumerate(doc): | |
| text_dict = page.get_text("dict") | |
| if not text_dict: continue | |
| page_redactions = [] | |
| page_insertions = [] | |
| for block in text_dict.get("blocks", []): | |
| for line in block.get("lines", []): | |
| for span in line.get("spans", []): | |
| text = span.get("text", "") | |
| bbox = span.get("bbox") | |
| if not bbox: continue | |
| rect = fitz.Rect(bbox) | |
| if date_pattern.search(text): | |
| logging.info("Trovata data da aggiornare nella pagina %d", page_num + 1) | |
| page_redactions.append(rect) | |
| insert_point = fitz.Point(rect.x0, rect.y0 + date_offset) | |
| page_insertions.append({ | |
| "point": insert_point, "text": new_date, | |
| "size": span.get("size", 12), "font": span.get("font", "helv"), "color": (0,0,0) | |
| }) | |
| elif job_option == "Sostituisci con C-Stag" and job_pattern.search(text): | |
| match = job_pattern.search(text) | |
| prefix = match.group(2) if match.group(2) is not None else "" | |
| new_job_text = f"{prefix}- C-Stag - Richiesta di nulla osta/comunicazione al lavoro subordinato stagionale" | |
| logging.info("Trovato job description da aggiornare nella pagina %d", page_num + 1) | |
| page_redactions.append(rect) | |
| insert_point = fitz.Point(rect.x0, rect.y0 + job_offset) | |
| page_insertions.append({ | |
| "point": insert_point, "text": new_job_text, | |
| "size": span.get("size", 12), "font": span.get("font", "helv"), "color": (0,0,0) | |
| }) | |
| if page_redactions: | |
| redactions_to_apply[page_num] = page_redactions | |
| if page_insertions: | |
| insertions_to_make[page_num] = page_insertions | |
| # Apply changes page by page after iterating spans | |
| for page_num, page in enumerate(doc): | |
| if page_num in redactions_to_apply: | |
| logging.info("Applicazione redazioni alla pagina %d", page_num + 1) | |
| for rect in redactions_to_apply[page_num]: | |
| page.add_redact_annot(rect, text=' ', fill=(1, 1, 1)) # White out | |
| page.apply_redactions(images=fitz.PDF_REDACT_IMAGE_NONE) | |
| if page_num in insertions_to_make: | |
| logging.info("Inserimento testo aggiornato nella pagina %d", page_num + 1) | |
| for item in insertions_to_make[page_num]: | |
| page.insert_text(item["point"], item["text"], | |
| fontsize=item["size"], | |
| fontname=item["font"], | |
| color=item["color"]) | |
| output_stream = io.BytesIO() | |
| try: | |
| # Save with basic garbage collection initially | |
| doc.save(output_stream, garbage=1, deflate=True) | |
| except Exception as e: | |
| logging.error("Errore nel salvataggio del PDF aggiornato: %s", e) | |
| raise | |
| finally: | |
| if doc: | |
| doc.close() | |
| return output_stream.getvalue() | |
| except Exception as e: | |
| logging.exception("Errore durante l'aggiornamento del PDF:") | |
| if 'doc' in locals() and doc is not None: | |
| try: doc.close() | |
| except Exception: pass | |
| raise | |
| # --- NEW ITERATIVE SIZE ADJUSTMENT FUNCTION --- | |
| def adjust_pdf_size_iterative(pdf_bytes: bytes, target_kb: float, tolerance_kb: float = 0.2, max_iterations: int = 10) -> bytes: | |
| """ | |
| Iteratively adjusts PDF size towards target_kb by adding/removing metadata padding or cleaning. | |
| Parameters: | |
| pdf_bytes (bytes): The initial PDF content. | |
| target_kb (float): Target size in kilobytes. | |
| tolerance_kb (float): Allowable deviation from target (e.g., 0.2 KB). | |
| max_iterations (int): Maximum attempts to reach the target size. | |
| Returns: | |
| bytes: The size-adjusted PDF, or the last attempt if target not reached within max_iterations. | |
| """ | |
| if target_kb <= 0: | |
| logging.info("Target KB non valido, saltando l'aggiustamento iterativo.") | |
| return pdf_bytes | |
| target_bytes = int(target_kb * 1024) | |
| tolerance_bytes = int(tolerance_kb * 1024) | |
| current_pdf_bytes = pdf_bytes | |
| padding_key = "X_IterativePaddingData" # Consistent key for padding | |
| logging.info(f"--- Inizio Aggiustamento Iterativo --- Target: {target_kb:.2f} KB ({target_bytes} bytes), Tolleranza: {tolerance_kb:.2f} KB ({tolerance_bytes} bytes)") | |
| for i in range(max_iterations): | |
| current_size = len(current_pdf_bytes) | |
| diff = target_bytes - current_size | |
| abs_diff = abs(diff) | |
| logging.info(f"Iterazione {i+1}/{max_iterations}: Dimensione attuale={current_size / 1024:.2f} KB ({current_size} bytes), Diff dal target={diff} bytes") | |
| # Check if within tolerance | |
| if abs_diff <= tolerance_bytes: | |
| logging.info(f"Dimensione raggiunta entro la tolleranza. ({current_size / 1024:.2f} KB)") | |
| return current_pdf_bytes | |
| doc = None # Ensure doc is reset/closed each iteration | |
| previous_pdf_bytes = current_pdf_bytes # Keep track in case of error | |
| try: | |
| # --- Action: Decide whether to add padding or clean --- | |
| if diff > 0: | |
| # --- Need to INCREASE size (Add Padding) --- | |
| logging.debug("Azione: Aggiungere padding.") | |
| doc = fitz.open(stream=current_pdf_bytes, filetype="pdf") | |
| metadata = doc.metadata or {} | |
| # Estimate padding needed - add slightly more than diff to overshoot a bit | |
| # Simple approach: add diff + small buffer (e.g., 50 bytes) | |
| # More adaptive might be diff * 1.1, but let's keep it simple | |
| chars_to_add = max(1, diff + 50) # Add difference plus a small buffer | |
| current_padding = metadata.get(padding_key, "") | |
| new_padding = current_padding + (" " * chars_to_add) | |
| metadata[padding_key] = new_padding | |
| logging.debug(f"Aggiunta di {chars_to_add} caratteri di padding a '{padding_key}'.") | |
| doc.set_metadata(metadata) | |
| output_stream = io.BytesIO() | |
| # Save *without* strong compression when adding padding | |
| # garbage=1 does basic cleanup but shouldn't drastically shrink | |
| doc.save(output_stream, garbage=1, deflate=False) | |
| current_pdf_bytes = output_stream.getvalue() | |
| doc.close() | |
| doc = None # Mark as closed | |
| else: # diff < 0 | |
| # --- Need to DECREASE size (Cleanup) --- | |
| logging.debug("Azione: Pulizia aggressiva.") | |
| doc = fitz.open(stream=current_pdf_bytes, filetype="pdf") | |
| # Option 1: Remove our own padding first if it exists | |
| metadata = doc.metadata or {} | |
| if padding_key in metadata and len(metadata[padding_key]) > 0: | |
| padding_len = len(metadata[padding_key]) | |
| # Try removing a chunk of padding roughly equal to the excess size | |
| bytes_to_remove = abs(diff) | |
| # Reduce padding, but don't remove more than exists | |
| keep_chars = max(0, padding_len - bytes_to_remove - 50) # Remove diff + buffer | |
| metadata[padding_key] = metadata[padding_key][:keep_chars] | |
| logging.debug(f"Riduzione padding in '{padding_key}' a {keep_chars} caratteri.") | |
| doc.set_metadata(metadata) | |
| # Save with minimal changes first to see effect of padding removal | |
| output_stream = io.BytesIO() | |
| doc.save(output_stream, garbage=1, deflate=False) | |
| current_pdf_bytes = output_stream.getvalue() | |
| # Check size again *before* aggressive cleanup | |
| if abs(target_bytes - len(current_pdf_bytes)) <= tolerance_bytes: | |
| logging.info("Dimensione raggiunta dopo rimozione padding.") | |
| doc.close() | |
| continue # Skip to next iteration's check | |
| # Option 2: If still too large or no padding to remove, do aggressive cleanup | |
| logging.debug("Esecuzione pulizia aggressiva (garbage=4, deflate=True)") | |
| # Need to reopen if we saved after removing padding | |
| if doc: doc.close() # Close previous handle if open | |
| doc = fitz.open(stream=current_pdf_bytes, filetype="pdf") | |
| output_stream = io.BytesIO() | |
| doc.save(output_stream, garbage=4, deflate=True, linearize=False) | |
| current_pdf_bytes = output_stream.getvalue() | |
| doc.close() | |
| doc = None # Mark as closed | |
| except Exception as e: | |
| logging.exception(f"Errore durante l'aggiustamento nella iterazione {i+1}:") | |
| if doc: # Ensure doc is closed on error | |
| try: doc.close() | |
| except: pass | |
| logging.warning("Ripristino dei bytes dalla iterazione precedente.") | |
| return previous_pdf_bytes # Return the last known good state | |
| # Check if size somehow became drastically smaller/larger than expected (e.g., save error) | |
| # This is a safety check, might need tuning | |
| if len(current_pdf_bytes) < 100: # Arbitrary small size check | |
| logging.error(f"Dimensione del PDF diventata inaspettatamente piccola ({len(current_pdf_bytes)} bytes) dopo l'iterazione {i+1}. Interruzione.") | |
| return previous_pdf_bytes | |
| # If loop finishes without reaching tolerance | |
| logging.warning(f"Raggiunto limite massimo di {max_iterations} iterazioni. Dimensione finale: {len(current_pdf_bytes) / 1024:.2f} KB") | |
| return current_pdf_bytes | |
| # --- Updated process_batch Function --- | |
| def process_batch(pdf_files, base_date, date_offset, job_offset, job_option, adjust_size, target_kb, tolerance_kb, max_iterations): | |
| """ | |
| Elabora un batch di PDF: aggiorna data/job, opzionalmente aggiusta la dimensione iterativamente. | |
| Parameters: | |
| (Includes new parameters: tolerance_kb, max_iterations) | |
| """ | |
| output_dir = None | |
| results = [] | |
| try: | |
| output_dir = tempfile.mkdtemp(prefix="updated_pdfs_") | |
| logging.info(f"Creato directory temporanea: {output_dir}") | |
| for file_obj in pdf_files: | |
| original_name = "unknown_file.pdf" | |
| try: | |
| # ... (file reading logic remains the same) ... | |
| if isinstance(file_obj, str): | |
| if not os.path.exists(file_obj): | |
| logging.error(f"File non trovato: {file_obj}") | |
| continue | |
| with open(file_obj, "rb") as f: pdf_bytes = f.read() | |
| original_name = os.path.basename(file_obj) | |
| elif hasattr(file_obj, 'read') and hasattr(file_obj, 'name'): | |
| pdf_bytes = file_obj.read() | |
| original_name = os.path.basename(getattr(file_obj, "name", "uploaded_file.pdf")) | |
| else: | |
| logging.error(f"Input file non riconosciuto: {type(file_obj)}") | |
| continue | |
| logging.info(f"--- Elaborazione file: {original_name} (Dimensione iniziale: {len(pdf_bytes)/1024:.2f} KB) ---") | |
| # 1. Update date and job description | |
| updated_bytes = update_pdf_bytes(pdf_bytes, base_date, date_offset, job_offset, job_option) | |
| logging.info(f"Dimensione dopo aggiornamenti: {len(updated_bytes)/1024:.2f} KB") | |
| # 2. Adjust size iteratively if requested | |
| if adjust_size and target_kb > 0: | |
| logging.info(f"Richiesto aggiustamento dimensione per {original_name} a {target_kb} KB (Tolleranza: {tolerance_kb} KB, Max Iter: {max_iterations})") | |
| final_bytes = adjust_pdf_size_iterative( | |
| updated_bytes, | |
| target_kb, | |
| tolerance_kb=tolerance_kb, | |
| max_iterations=max_iterations | |
| ) | |
| else: | |
| final_bytes = updated_bytes | |
| # 3. Save the final PDF | |
| new_file_path = os.path.join(output_dir, original_name) | |
| with open(new_file_path, "wb") as f: | |
| f.write(final_bytes) | |
| results.append(new_file_path) | |
| logging.info(f"File aggiornato salvato in: {new_file_path} (Dimensione finale: {len(final_bytes)/1024:.2f} KB)") | |
| except Exception as e: | |
| logging.exception(f"Errore nel processamento del file '{original_name}':") | |
| continue | |
| return results | |
| except Exception as e: | |
| logging.exception("Errore generale durante l'elaborazione batch:") | |
| return [] | |
| finally: | |
| # -------------- FIX -------------- | |
| # The cleanup that deleted the temp directory before Gradio | |
| # could read the files has been removed to avoid FileNotFoundError. | |
| # (You can clean up old folders with a scheduled task if desired.) | |
| pass | |
| # --- Updated Gradio Interface --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| """ | |
| ## Aggiornamento PDF: Data, Job Description e Dimensione (Iterativo) | |
| **Guida:** | |
| 1. Carica PDF. | |
| 2. Imposta Data Base (YYYY-MM-DD HH:MM:SS). | |
| 3. Regola Spostamenti Verticali (offset). | |
| 4. Scegli opzione Job Description. | |
| 5. **(Opzionale) Aggiusta Dimensione:** | |
| - Seleziona la casella. | |
| - Imposta la **Dimensione Target (KB)**. | |
| - Imposta la **Tolleranza (KB)** (quanto può discostarsi dal target, es. 0.2). | |
| - Imposta le **Max Iterazioni** (quanti tentativi fare, es. 10). | |
| - Il sistema tenterà di aggiungere/rimuovere dati invisibili (metadata/pulizia) per avvicinarsi al target. | |
| 6. Clicca **Elabora PDF**. | |
| 7. Scarica i file aggiornati. | |
| """ | |
| ) | |
| with gr.Row(): | |
| pdf_input = gr.File(label="Carica file PDF", file_count="multiple") | |
| with gr.Row(): | |
| base_date_input = gr.Textbox(label="Data base (YYYY-MM-DD HH:MM:SS)", value="2025-04-01 10:00:00") | |
| with gr.Row(): | |
| date_offset_input = gr.Slider(label="Spostamento Data (pt)", minimum=-10, maximum=30, value=11, step=1) | |
| job_offset_input = gr.Slider(label="Spostamento Job (pt)", minimum=-10, maximum=30, value=11, step=1) | |
| with gr.Row(): | |
| job_option_input = gr.Dropdown(label="Aggiornamento Job", | |
| choices=["Sostituisci con C-Stag", "Mantieni B2020"], | |
| value="Sostituisci con C-Stag") | |
| with gr.Accordion("Opzioni Aggiustamento Dimensione", open=False): # Use Accordion | |
| adjust_size_checkbox = gr.Checkbox(label="Aggiusta dimensione file?", value=False) | |
| with gr.Row(): | |
| target_kb_input = gr.Number(label="Dimensione Target (KB)", value=33.0, minimum=1.0, step=0.1) | |
| tolerance_kb_input = gr.Number(label="Tolleranza (KB)", value=0.1, minimum=0.05, step=0.05) | |
| max_iterations_input = gr.Slider(label="Max Iterazioni", minimum=1, maximum=20, value=10, step=1) | |
| output_files = gr.File(label="Scarica i PDF aggiornati", file_count="multiple") | |
| btn = gr.Button("Elabora PDF") | |
| btn.click(fn=process_batch, | |
| inputs=[ | |
| pdf_input, | |
| base_date_input, | |
| date_offset_input, | |
| job_offset_input, | |
| job_option_input, | |
| adjust_size_checkbox, | |
| target_kb_input, | |
| tolerance_kb_input, # Pass tolerance | |
| max_iterations_input # Pass max iterations | |
| ], | |
| outputs=output_files) | |
| # Launch the Gradio app | |
| demo.launch() | |