import gradio as gr import os import re import json import tempfile import zipfile import traceback from huggingface_hub import hf_hub_download import base64 from PIL import Image from io import BytesIO print("=" * 50) print("Starting VisualQuality-R1 GGUF") print("=" * 50) # Константы REPO_ID = "mradermacher/VisualQuality-R1-7B-GGUF" MODEL_FILE = "VisualQuality-R1-7B.Q4_K_M.gguf" MMPROJ_FILE = "VisualQuality-R1-7B.mmproj-Q8_0.gguf" # Промпты PROMPT = ( "You are doing the image quality assessment task. Here is the question: " "What is your overall rating on the quality of this picture? The rating should be a float between 1 and 5, " "rounded to two decimal places, with 1 representing very poor quality and 5 representing excellent quality." ) QUESTION_TEMPLATE_THINKING = "{Question} First output the thinking process in tags and then output the final answer with only one score in tags." QUESTION_TEMPLATE_NO_THINKING = "{Question} Please only output the final answer with only one score in tags." # Глобальные переменные llm = None print("Importing llama_cpp...") try: from llama_cpp import Llama import llama_cpp print(f"llama_cpp version: {llama_cpp.__version__ if hasattr(llama_cpp, '__version__') else 'unknown'}") except Exception as e: print(f"Error importing llama_cpp: {e}") traceback.print_exc() # Пробуем импортировать chat handler для Qwen2-VL chat_handler_class = None chat_handler_name = None try: from llama_cpp.llama_chat_format import Qwen2VLChatHandler chat_handler_class = Qwen2VLChatHandler chat_handler_name = "Qwen2VLChatHandler" print(f"✓ Found {chat_handler_name}") except ImportError as e: print(f"✗ Qwen2VLChatHandler not found: {e}") # Список доступных chat handlers if chat_handler_class is None: print("\nListing available chat handlers...") try: from llama_cpp import llama_chat_format handlers = [name for name in dir(llama_chat_format) if 'Handler' in name or 'Chat' in name] print(f"Available handlers: {handlers}") except Exception as e: print(f"Could not list handlers: {e}") def download_models(): """Скачивание моделей""" print(f"Downloading {MODEL_FILE}...") model_path = hf_hub_download( repo_id=REPO_ID, filename=MODEL_FILE, ) print(f"Model downloaded: {model_path}") print(f"Downloading {MMPROJ_FILE}...") mmproj_path = hf_hub_download( repo_id=REPO_ID, filename=MMPROJ_FILE, ) print(f"MMProj downloaded: {mmproj_path}") return model_path, mmproj_path def load_model(): """Загрузка модели""" global llm, chat_handler_class, chat_handler_name if llm is not None: return True if chat_handler_class is None: print("ERROR: No suitable chat handler found for Qwen2-VL!") print("Please ensure llama-cpp-python >= 0.3.2 is installed") return False try: model_path, mmproj_path = download_models() print(f"Creating {chat_handler_name}...") chat_handler = chat_handler_class( clip_model_path=mmproj_path, verbose=True ) print("Chat handler created") print("Loading LLM...") llm = Llama( model_path=model_path, chat_handler=chat_handler, n_ctx=4096, n_threads=4, n_gpu_layers=0, verbose=True, ) print("Model loaded successfully!") return True except Exception as e: print(f"Error loading model: {e}") traceback.print_exc() return False def image_to_data_uri(image): """Конвертация PIL Image в data URI""" if image is None: return None if image.mode != "RGB": image = image.convert("RGB") max_size = 768 if max(image.size) > max_size: ratio = max_size / max(image.size) new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio)) image = image.resize(new_size, Image.LANCZOS) buffered = BytesIO() image.save(buffered, format="JPEG", quality=85) img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") return f"data:image/jpeg;base64,{img_base64}" def extract_score(text): """Извлечение оценки""" try: matches = re.findall(r'(.*?)', text, re.DOTALL) if matches: answer = matches[-1].strip() else: answer = text.strip() score_match = re.search(r'\d+(\.\d+)?', answer) if score_match: score = float(score_match.group()) return min(max(score, 1.0), 5.0) except: pass return None def extract_thinking(text): """Извлечение мышления""" matches = re.findall(r'(.*?)', text, re.DOTALL) if matches: return matches[-1].strip() return "" def score_single_image(image, use_thinking=True): """Оценка одного изображения""" global llm print(f"score_single_image called, use_thinking={use_thinking}") if image is None: return "❌ Upload an image first", "", "" if not load_model(): return "❌ Failed to load model. Qwen2VLChatHandler not available. Check logs.", "", "" template = QUESTION_TEMPLATE_THINKING if use_thinking else QUESTION_TEMPLATE_NO_THINKING prompt_text = template.format(Question=PROMPT) print("Converting image...") image_uri = image_to_data_uri(image) print(f"Image converted, URI length: {len(image_uri)}") messages = [ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": image_uri}}, {"type": "text", "text": prompt_text} ] } ] print("Starting generation...") generated_text = "" try: response = llm.create_chat_completion( messages=messages, max_tokens=2048 if use_thinking else 256, temperature=0.7, top_p=0.95, stream=True, ) for chunk in response: delta = chunk.get("choices", [{}])[0].get("delta", {}) content = delta.get("content", "") if content: generated_text += content thinking = extract_thinking(generated_text) score = extract_score(generated_text) score_display = f"⭐ **Score: {score:.2f} / 5.00**" if score else "*Analyzing...*" yield generated_text, thinking, score_display print(f"Generation complete, length: {len(generated_text)}") final_score = extract_score(generated_text) final_thinking = extract_thinking(generated_text) if use_thinking else "" if final_score is not None: score_display = f"⭐ **Quality Score: {final_score:.2f} / 5.00**\n\n📊 **For Leaderboard:** `{final_score:.2f}`" else: score_display = "❌ Could not extract score" yield generated_text, final_thinking, score_display except Exception as e: error_msg = f"❌ Error: {str(e)}" print(error_msg) traceback.print_exc() yield error_msg, "", "" def process_batch(files, use_thinking=True, progress=gr.Progress()): """Batch processing""" global llm print(f"process_batch: {len(files) if files else 0} files") if not files: return "❌ No files", None if not load_model(): return "❌ Failed to load model", None results = [] template = QUESTION_TEMPLATE_THINKING if use_thinking else QUESTION_TEMPLATE_NO_THINKING prompt_text = template.format(Question=PROMPT) for i, file in enumerate(files): filename = "unknown" try: if hasattr(file, 'name'): image = Image.open(file.name) filename = os.path.basename(file.name) else: image = Image.open(file) filename = f"image_{i+1}.jpg" print(f"Processing {i+1}/{len(files)}: {filename}") image_uri = image_to_data_uri(image) messages = [ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": image_uri}}, {"type": "text", "text": prompt_text} ] } ] response = llm.create_chat_completion( messages=messages, max_tokens=2048 if use_thinking else 256, temperature=0.7, top_p=0.95, ) generated_text = response["choices"][0]["message"]["content"] score = extract_score(generated_text) thinking = extract_thinking(generated_text) if use_thinking else "" results.append({ "filename": filename, "score": score if score else "N/A", "thinking": thinking, "raw_output": generated_text }) print(f" Score: {score}") progress((i + 1) / len(files), desc=f"{i+1}/{len(files)}: {filename}") except Exception as e: print(f" Error: {e}") results.append({ "filename": filename, "score": "ERROR", "thinking": "", "raw_output": str(e) }) # Create files try: with tempfile.TemporaryDirectory() as tmpdir: txt_file = os.path.join(tmpdir, "leaderboard_scores.txt") with open(txt_file, "w") as f: for r in results: s = f"{r['score']:.2f}" if isinstance(r['score'], float) else str(r['score']) f.write(f"{r['filename']}\t{s}\n") json_file = os.path.join(tmpdir, "results.json") with open(json_file, "w") as f: json.dump(results, f, indent=2, ensure_ascii=False) csv_file = os.path.join(tmpdir, "scores.csv") with open(csv_file, "w") as f: f.write("filename,score\n") for r in results: s = f"{r['score']:.2f}" if isinstance(r['score'], float) else str(r['score']) f.write(f"{r['filename']},{s}\n") zip_path = os.path.join(tmpdir, "results.zip") with zipfile.ZipFile(zip_path, 'w') as zipf: zipf.write(txt_file, "leaderboard_scores.txt") zipf.write(json_file, "results.json") zipf.write(csv_file, "scores.csv") final_zip = tempfile.NamedTemporaryFile(delete=False, suffix=".zip") with open(zip_path, 'rb') as f: final_zip.write(f.read()) final_zip.close() except Exception as e: return f"❌ Error saving: {e}", None valid_scores = [r['score'] for r in results if isinstance(r['score'], float)] avg = sum(valid_scores) / len(valid_scores) if valid_scores else 0 summary = f"""## ✅ Done! **Processed:** {len(results)} | **OK:** {len(valid_scores)} | **Failed:** {len(results) - len(valid_scores)} **Avg:** {avg:.2f} | **Min:** {min(valid_scores):.2f if valid_scores else 'N/A'} | **Max:** {max(valid_scores):.2f if valid_scores else 'N/A'} | File | Score | |------|-------| """ + "\n".join([f"| {r['filename'][:40]} | {r['score']:.2f if isinstance(r['score'], float) else r['score']} |" for r in results[:10]]) return summary, final_zip.name # Interface print("Creating interface...") with gr.Blocks(title="VisualQuality-R1") as demo: gr.Markdown(""" # 🎨 VisualQuality-R1 (GGUF/CPU) **Image Quality Assessment** | ~30-60 sec/image on CPU [![Paper](https://img.shields.io/badge/arXiv-2505.14460-red)](https://arxiv.org/abs/2505.14460) """) with gr.Tabs(): with gr.TabItem("📷 Single"): with gr.Row(): with gr.Column(): img = gr.Image(label="Image", type="pil", height=350) think = gr.Checkbox(label="🧠 Thinking", value=True) btn = gr.Button("🔍 Analyze", variant="primary", size="lg") with gr.Column(): score = gr.Markdown("*Upload image*") thinking = gr.Textbox(label="Thinking", lines=6) output = gr.Textbox(label="Output", lines=8) btn.click(score_single_image, [img, think], [output, thinking, score]) with gr.TabItem("📁 Batch"): with gr.Row(): with gr.Column(): files = gr.File(label="Images", file_count="multiple", file_types=["image"]) batch_think = gr.Checkbox(label="🧠 Thinking", value=False) batch_btn = gr.Button("🚀 Process", variant="primary", size="lg") with gr.Column(): summary = gr.Markdown("*Upload & Process*") download = gr.File(label="📥 Results") batch_btn.click(process_batch, [files, batch_think], [summary, download]) print("Starting server...") if __name__ == "__main__": demo.queue(max_size=5) demo.launch(server_name="0.0.0.0", server_port=7860)