import subprocess import sys try: import fitz # PyMuPDF except ModuleNotFoundError: subprocess.check_call([sys.executable, "-m", "pip", "install", "PyMuPDF"]) import fitz # --------------------------------------------------------------------- # 0. Hot‑patch: ensure Gradio‑compatible Pydantic (<2.11) # --------------------------------------------------------------------- import os, sys, subprocess from importlib import metadata try: from packaging import version except ModuleNotFoundError: subprocess.check_call([sys.executable, "-m", "pip", "install", "packaging"]) from packaging import version def _ensure_compatible_pydantic(): try: cur = version.parse(metadata.version("pydantic")) except metadata.PackageNotFoundError: cur = None if cur is None or cur >= version.parse("2.11"): print(f"[patch] Installing pydantic<2.11 (current: {cur}) …", flush=True) subprocess.check_call([ sys.executable, "-m", "pip", "install", "--no-cache-dir", "pydantic<2.11", "pydantic-core<2.11", ]) os.execv(sys.executable, [sys.executable] + sys.argv) _ensure_compatible_pydantic() import re import random import io import os import tempfile import logging from datetime import datetime import gradio as gr import shutil # Configura il logging di base logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- (Keep the existing update_pdf_bytes function - using the refined version from previous step) --- def update_pdf_bytes(pdf_bytes, base_date, date_offset=11, job_offset=11, job_option="Sostituisci con C-Stag"): # (Using the improved version that applies redactions/insertions after iterating spans) # ... (function code remains the same as the previously refined version) ... try: date_pattern = re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})(\.\d+)?') job_pattern = re.compile(r"((DF25\s+)?- B2020 - Nulla osta/Comunicazione al lavoro subordinato non stagionale nei settori elencati nel DPCM Flussi)") try: base = datetime.strptime(base_date, "%Y-%m-%d %H:%M:%S") except ValueError as e: raise ValueError("Il formato della data base deve essere 'YYYY-MM-DD HH:MM:SS'.") from e new_seconds = random.randint(0, 59) new_fraction = random.randint(0, 999999999) new_base = base.replace(second=new_seconds) new_date = new_base.strftime("%Y-%m-%d %H:%M:%S") + f".{new_fraction:09d}" base_job_text = "- C-Stag - Richiesta di nulla osta/comunicazione al lavoro subordinato stagionale" doc = None # Initialize doc to None try: doc = fitz.open(stream=pdf_bytes, filetype="pdf") except Exception as e: logging.error("Errore nell'apertura del PDF: %s", e) raise redactions_to_apply = {} # Store page_num: [rect1, rect2] insertions_to_make = {} # Store page_num: [item1, item2] for page_num, page in enumerate(doc): text_dict = page.get_text("dict") if not text_dict: continue page_redactions = [] page_insertions = [] for block in text_dict.get("blocks", []): for line in block.get("lines", []): for span in line.get("spans", []): text = span.get("text", "") bbox = span.get("bbox") if not bbox: continue rect = fitz.Rect(bbox) if date_pattern.search(text): logging.info("Trovata data da aggiornare nella pagina %d", page_num + 1) page_redactions.append(rect) insert_point = fitz.Point(rect.x0, rect.y0 + date_offset) page_insertions.append({ "point": insert_point, "text": new_date, "size": span.get("size", 12), "font": span.get("font", "helv"), "color": (0,0,0) }) elif job_option == "Sostituisci con C-Stag" and job_pattern.search(text): match = job_pattern.search(text) prefix = match.group(2) if match.group(2) is not None else "" new_job_text = f"{prefix}- C-Stag - Richiesta di nulla osta/comunicazione al lavoro subordinato stagionale" logging.info("Trovato job description da aggiornare nella pagina %d", page_num + 1) page_redactions.append(rect) insert_point = fitz.Point(rect.x0, rect.y0 + job_offset) page_insertions.append({ "point": insert_point, "text": new_job_text, "size": span.get("size", 12), "font": span.get("font", "helv"), "color": (0,0,0) }) if page_redactions: redactions_to_apply[page_num] = page_redactions if page_insertions: insertions_to_make[page_num] = page_insertions # Apply changes page by page after iterating spans for page_num, page in enumerate(doc): if page_num in redactions_to_apply: logging.info("Applicazione redazioni alla pagina %d", page_num + 1) for rect in redactions_to_apply[page_num]: page.add_redact_annot(rect, text=' ', fill=(1, 1, 1)) # White out page.apply_redactions(images=fitz.PDF_REDACT_IMAGE_NONE) if page_num in insertions_to_make: logging.info("Inserimento testo aggiornato nella pagina %d", page_num + 1) for item in insertions_to_make[page_num]: page.insert_text(item["point"], item["text"], fontsize=item["size"], fontname=item["font"], color=item["color"]) output_stream = io.BytesIO() try: # Save with basic garbage collection initially doc.save(output_stream, garbage=1, deflate=True) except Exception as e: logging.error("Errore nel salvataggio del PDF aggiornato: %s", e) raise finally: if doc: doc.close() return output_stream.getvalue() except Exception as e: logging.exception("Errore durante l'aggiornamento del PDF:") if 'doc' in locals() and doc is not None: try: doc.close() except Exception: pass raise # --- NEW ITERATIVE SIZE ADJUSTMENT FUNCTION --- def adjust_pdf_size_iterative(pdf_bytes: bytes, target_kb: float, tolerance_kb: float = 0.2, max_iterations: int = 10) -> bytes: """ Iteratively adjusts PDF size towards target_kb by adding/removing metadata padding or cleaning. Parameters: pdf_bytes (bytes): The initial PDF content. target_kb (float): Target size in kilobytes. tolerance_kb (float): Allowable deviation from target (e.g., 0.2 KB). max_iterations (int): Maximum attempts to reach the target size. Returns: bytes: The size-adjusted PDF, or the last attempt if target not reached within max_iterations. """ if target_kb <= 0: logging.info("Target KB non valido, saltando l'aggiustamento iterativo.") return pdf_bytes target_bytes = int(target_kb * 1024) tolerance_bytes = int(tolerance_kb * 1024) current_pdf_bytes = pdf_bytes padding_key = "X_IterativePaddingData" # Consistent key for padding logging.info(f"--- Inizio Aggiustamento Iterativo --- Target: {target_kb:.2f} KB ({target_bytes} bytes), Tolleranza: {tolerance_kb:.2f} KB ({tolerance_bytes} bytes)") for i in range(max_iterations): current_size = len(current_pdf_bytes) diff = target_bytes - current_size abs_diff = abs(diff) logging.info(f"Iterazione {i+1}/{max_iterations}: Dimensione attuale={current_size / 1024:.2f} KB ({current_size} bytes), Diff dal target={diff} bytes") # Check if within tolerance if abs_diff <= tolerance_bytes: logging.info(f"Dimensione raggiunta entro la tolleranza. ({current_size / 1024:.2f} KB)") return current_pdf_bytes doc = None # Ensure doc is reset/closed each iteration previous_pdf_bytes = current_pdf_bytes # Keep track in case of error try: # --- Action: Decide whether to add padding or clean --- if diff > 0: # --- Need to INCREASE size (Add Padding) --- logging.debug("Azione: Aggiungere padding.") doc = fitz.open(stream=current_pdf_bytes, filetype="pdf") metadata = doc.metadata or {} # Estimate padding needed - add slightly more than diff to overshoot a bit # Simple approach: add diff + small buffer (e.g., 50 bytes) # More adaptive might be diff * 1.1, but let's keep it simple chars_to_add = max(1, diff + 50) # Add difference plus a small buffer current_padding = metadata.get(padding_key, "") new_padding = current_padding + (" " * chars_to_add) metadata[padding_key] = new_padding logging.debug(f"Aggiunta di {chars_to_add} caratteri di padding a '{padding_key}'.") doc.set_metadata(metadata) output_stream = io.BytesIO() # Save *without* strong compression when adding padding # garbage=1 does basic cleanup but shouldn't drastically shrink doc.save(output_stream, garbage=1, deflate=False) current_pdf_bytes = output_stream.getvalue() doc.close() doc = None # Mark as closed else: # diff < 0 # --- Need to DECREASE size (Cleanup) --- logging.debug("Azione: Pulizia aggressiva.") doc = fitz.open(stream=current_pdf_bytes, filetype="pdf") # Option 1: Remove our own padding first if it exists metadata = doc.metadata or {} if padding_key in metadata and len(metadata[padding_key]) > 0: padding_len = len(metadata[padding_key]) # Try removing a chunk of padding roughly equal to the excess size bytes_to_remove = abs(diff) # Reduce padding, but don't remove more than exists keep_chars = max(0, padding_len - bytes_to_remove - 50) # Remove diff + buffer metadata[padding_key] = metadata[padding_key][:keep_chars] logging.debug(f"Riduzione padding in '{padding_key}' a {keep_chars} caratteri.") doc.set_metadata(metadata) # Save with minimal changes first to see effect of padding removal output_stream = io.BytesIO() doc.save(output_stream, garbage=1, deflate=False) current_pdf_bytes = output_stream.getvalue() # Check size again *before* aggressive cleanup if abs(target_bytes - len(current_pdf_bytes)) <= tolerance_bytes: logging.info("Dimensione raggiunta dopo rimozione padding.") doc.close() continue # Skip to next iteration's check # Option 2: If still too large or no padding to remove, do aggressive cleanup logging.debug("Esecuzione pulizia aggressiva (garbage=4, deflate=True)") # Need to reopen if we saved after removing padding if doc: doc.close() # Close previous handle if open doc = fitz.open(stream=current_pdf_bytes, filetype="pdf") output_stream = io.BytesIO() doc.save(output_stream, garbage=4, deflate=True, linearize=False) current_pdf_bytes = output_stream.getvalue() doc.close() doc = None # Mark as closed except Exception as e: logging.exception(f"Errore durante l'aggiustamento nella iterazione {i+1}:") if doc: # Ensure doc is closed on error try: doc.close() except: pass logging.warning("Ripristino dei bytes dalla iterazione precedente.") return previous_pdf_bytes # Return the last known good state # Check if size somehow became drastically smaller/larger than expected (e.g., save error) # This is a safety check, might need tuning if len(current_pdf_bytes) < 100: # Arbitrary small size check logging.error(f"Dimensione del PDF diventata inaspettatamente piccola ({len(current_pdf_bytes)} bytes) dopo l'iterazione {i+1}. Interruzione.") return previous_pdf_bytes # If loop finishes without reaching tolerance logging.warning(f"Raggiunto limite massimo di {max_iterations} iterazioni. Dimensione finale: {len(current_pdf_bytes) / 1024:.2f} KB") return current_pdf_bytes # --- Updated process_batch Function --- def process_batch(pdf_files, base_date, date_offset, job_offset, job_option, adjust_size, target_kb, tolerance_kb, max_iterations): """ Elabora un batch di PDF: aggiorna data/job, opzionalmente aggiusta la dimensione iterativamente. Parameters: (Includes new parameters: tolerance_kb, max_iterations) """ output_dir = None results = [] try: output_dir = tempfile.mkdtemp(prefix="updated_pdfs_") logging.info(f"Creato directory temporanea: {output_dir}") for file_obj in pdf_files: original_name = "unknown_file.pdf" try: # ... (file reading logic remains the same) ... if isinstance(file_obj, str): if not os.path.exists(file_obj): logging.error(f"File non trovato: {file_obj}") continue with open(file_obj, "rb") as f: pdf_bytes = f.read() original_name = os.path.basename(file_obj) elif hasattr(file_obj, 'read') and hasattr(file_obj, 'name'): pdf_bytes = file_obj.read() original_name = os.path.basename(getattr(file_obj, "name", "uploaded_file.pdf")) else: logging.error(f"Input file non riconosciuto: {type(file_obj)}") continue logging.info(f"--- Elaborazione file: {original_name} (Dimensione iniziale: {len(pdf_bytes)/1024:.2f} KB) ---") # 1. Update date and job description updated_bytes = update_pdf_bytes(pdf_bytes, base_date, date_offset, job_offset, job_option) logging.info(f"Dimensione dopo aggiornamenti: {len(updated_bytes)/1024:.2f} KB") # 2. Adjust size iteratively if requested if adjust_size and target_kb > 0: logging.info(f"Richiesto aggiustamento dimensione per {original_name} a {target_kb} KB (Tolleranza: {tolerance_kb} KB, Max Iter: {max_iterations})") final_bytes = adjust_pdf_size_iterative( updated_bytes, target_kb, tolerance_kb=tolerance_kb, max_iterations=max_iterations ) else: final_bytes = updated_bytes # 3. Save the final PDF new_file_path = os.path.join(output_dir, original_name) with open(new_file_path, "wb") as f: f.write(final_bytes) results.append(new_file_path) logging.info(f"File aggiornato salvato in: {new_file_path} (Dimensione finale: {len(final_bytes)/1024:.2f} KB)") except Exception as e: logging.exception(f"Errore nel processamento del file '{original_name}':") continue return results except Exception as e: logging.exception("Errore generale durante l'elaborazione batch:") return [] finally: # -------------- FIX -------------- # The cleanup that deleted the temp directory before Gradio # could read the files has been removed to avoid FileNotFoundError. # (You can clean up old folders with a scheduled task if desired.) pass # --- Updated Gradio Interface --- with gr.Blocks() as demo: gr.Markdown( """ ## Aggiornamento PDF: Data, Job Description e Dimensione (Iterativo) **Guida:** 1. Carica PDF. 2. Imposta Data Base (YYYY-MM-DD HH:MM:SS). 3. Regola Spostamenti Verticali (offset). 4. Scegli opzione Job Description. 5. **(Opzionale) Aggiusta Dimensione:** - Seleziona la casella. - Imposta la **Dimensione Target (KB)**. - Imposta la **Tolleranza (KB)** (quanto può discostarsi dal target, es. 0.2). - Imposta le **Max Iterazioni** (quanti tentativi fare, es. 10). - Il sistema tenterà di aggiungere/rimuovere dati invisibili (metadata/pulizia) per avvicinarsi al target. 6. Clicca **Elabora PDF**. 7. Scarica i file aggiornati. """ ) with gr.Row(): pdf_input = gr.File(label="Carica file PDF", file_count="multiple") with gr.Row(): base_date_input = gr.Textbox(label="Data base (YYYY-MM-DD HH:MM:SS)", value="2025-04-01 10:00:00") with gr.Row(): date_offset_input = gr.Slider(label="Spostamento Data (pt)", minimum=-10, maximum=30, value=11, step=1) job_offset_input = gr.Slider(label="Spostamento Job (pt)", minimum=-10, maximum=30, value=11, step=1) with gr.Row(): job_option_input = gr.Dropdown(label="Aggiornamento Job", choices=["Sostituisci con C-Stag", "Mantieni B2020"], value="Sostituisci con C-Stag") with gr.Accordion("Opzioni Aggiustamento Dimensione", open=False): # Use Accordion adjust_size_checkbox = gr.Checkbox(label="Aggiusta dimensione file?", value=False) with gr.Row(): target_kb_input = gr.Number(label="Dimensione Target (KB)", value=33.0, minimum=1.0, step=0.1) tolerance_kb_input = gr.Number(label="Tolleranza (KB)", value=0.1, minimum=0.05, step=0.05) max_iterations_input = gr.Slider(label="Max Iterazioni", minimum=1, maximum=20, value=10, step=1) output_files = gr.File(label="Scarica i PDF aggiornati", file_count="multiple") btn = gr.Button("Elabora PDF") btn.click(fn=process_batch, inputs=[ pdf_input, base_date_input, date_offset_input, job_offset_input, job_option_input, adjust_size_checkbox, target_kb_input, tolerance_kb_input, # Pass tolerance max_iterations_input # Pass max iterations ], outputs=output_files) # Launch the Gradio app demo.launch()