import gradio as gr import os import yaml import json import random import re from datasets import load_dataset, get_dataset_config_names, get_dataset_split_names from openai import OpenAI from openevolve import run_evolution from typing import Dict, List, Tuple, Optional import tempfile import shutil import requests import glob # Model for OpenRouter # Using paid llama-3.2-3b-instruct since free tier models have unreliable rate limits MODELS = [ "meta-llama/llama-3.2-3b-instruct", # 3B - Reliable, fast, and very cheap ($0.04/$0.04 per 1M tokens) ] def validate_dataset(dataset_name: str, split: str, input_field: str, target_field: str) -> Tuple[bool, str]: """ Validate that the dataset exists and has the required fields. Returns: Tuple of (is_valid, error_message) """ try: # Check if dataset name has correct format (should be org/name or just name) if not dataset_name or dataset_name.strip() == "": return False, "❌ Dataset name cannot be empty" dataset_name = dataset_name.strip() # Try to get dataset info from HuggingFace API hf_token = os.environ.get("HF_TOKEN", None) headers = {} if hf_token: headers["Authorization"] = f"Bearer {hf_token}" # Check if dataset exists on HuggingFace Hub api_url = f"https://huggingface.co/api/datasets/{dataset_name}" response = requests.get(api_url, headers=headers, timeout=10) if response.status_code == 404: return False, f"❌ Dataset '{dataset_name}' not found on HuggingFace Hub. Please use the full dataset name (e.g., 'stanfordnlp/imdb' or 'gsm8k')" elif response.status_code != 200: # Try to load anyway - might be a private dataset or API issue print(f"Warning: Could not verify dataset via API (status {response.status_code}), attempting to load...") # Try to load a small sample to verify it works and check fields print(f"Loading dataset {dataset_name} with split {split}...") # First, check if the split exists try: available_splits = get_dataset_split_names(dataset_name) if split not in available_splits: return False, f"❌ Split '{split}' not found. Available splits: {', '.join(available_splits)}" except Exception as e: print(f"Could not get split names: {e}. Will try to load anyway...") # Load a small sample to check fields # Try loading with just dataset name first try: dataset = load_dataset(dataset_name, split=split, streaming=True) except ValueError as e: # If it fails with config error, try common configs if "config" in str(e).lower() or "Config name is missing" in str(e): # Try common configs based on dataset name default_config = "main" if dataset_name.lower() == "glue": default_config = "sst2" print(f"Dataset requires config, trying with '{default_config}' config...") try: dataset = load_dataset(dataset_name, default_config, split=split, streaming=True) except: # If default config doesn't work, raise the original error raise e else: raise # Get first example to check fields first_example = next(iter(dataset)) available_fields = list(first_example.keys()) # Check if input field exists if input_field not in available_fields: return False, f"❌ Input field '{input_field}' not found. Available fields: {', '.join(available_fields)}" # Check if target field exists if target_field not in available_fields: return False, f"❌ Target field '{target_field}' not found. Available fields: {', '.join(available_fields)}" # All validations passed return True, f"✅ Dataset validated successfully! Fields '{input_field}' and '{target_field}' found." except Exception as e: error_msg = str(e) if "404" in error_msg or "not found" in error_msg.lower(): return False, f"❌ Dataset '{dataset_name}' not found. Please check the dataset name (use format: org/dataset-name)" return False, f"❌ Error validating dataset: {error_msg}" def validate_inputs(dataset_name: str, split: str, input_field: str, target_field: str, initial_prompt: str) -> Tuple[bool, str]: """ Validate all inputs before starting optimization. Returns: Tuple of (is_valid, message) """ # Check API key api_key = os.environ.get("OPENAI_API_KEY") if not api_key: return False, "❌ OPENAI_API_KEY environment variable not set. Please set it in the Space secrets." # Check prompt contains {input} placeholder if "{input}" not in initial_prompt: return False, "❌ Prompt must contain '{input}' placeholder for dataset inputs" # Check dataset name format dataset_name = dataset_name.strip() if not dataset_name: return False, "❌ Dataset name cannot be empty" # Validate dataset and fields is_valid, message = validate_dataset(dataset_name, split, input_field, target_field) if not is_valid: return False, message return True, message def evaluate_prompt(prompt: str, dataset_name: str, split: str, num_samples: int, model: str, input_field: str, target_field: str, fixed_indices: List[int] = None) -> Dict: """ Evaluate a prompt on a dataset using the selected model. Args: fixed_indices: Optional list of dataset indices to use. If provided, ensures we evaluate on the SAME samples every time. """ try: # Get API key from environment api_key = os.environ.get("OPENAI_API_KEY") if not api_key: return { "error": "OPENAI_API_KEY not set in environment", "accuracy": 0, "correct": 0, "total": 0, "results": [] } # Load dataset # Try loading with just dataset name first try: dataset = load_dataset(dataset_name, split=split, streaming=False) except ValueError as e: # If it fails with config error, try common configs if "config" in str(e).lower() or "Config name is missing" in str(e): # Try common configs based on dataset name default_config = "main" if dataset_name.lower() == "glue": default_config = "sst2" dataset = load_dataset(dataset_name, default_config, split=split, streaming=False) else: raise # Sample examples - use fixed indices if provided to ensure consistency if fixed_indices is not None: # Use the provided indices (ensures same samples for initial/final eval) indices = fixed_indices samples = [dataset[i] for i in indices] elif len(dataset) > num_samples: # First time: use fixed seed for reproducible sampling random.seed(42) # Fixed seed ensures same samples across runs indices = random.sample(range(len(dataset)), num_samples) samples = [dataset[i] for i in indices] else: indices = list(range(min(num_samples, len(dataset)))) samples = list(dataset)[:num_samples] # Initialize OpenAI client with OpenRouter client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=api_key, ) correct = 0 total = 0 results = [] errors = [] for idx, sample in enumerate(samples): try: # Get input and target input_text = sample.get(input_field, "") if isinstance(input_text, dict): input_text = str(input_text) target = sample.get(target_field, "") if isinstance(target, dict): target = str(target) # Format the prompt with the input formatted_prompt = prompt.replace("{input}", str(input_text)) # Call the model with retry logic for transient failures max_retries = 3 import time for retry in range(max_retries): try: response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": formatted_prompt} ], temperature=0.0, max_tokens=500, ) break # Success, exit retry loop except Exception as api_error: if retry < max_retries - 1: wait_time = (retry + 1) * 2 # Exponential backoff: 2s, 4s, 6s print(f" API error on sample {idx+1}, retrying in {wait_time}s...") time.sleep(wait_time) else: raise # Final retry failed, propagate error prediction = response.choices[0].message.content.strip() # Small delay to avoid rate limiting time.sleep(0.1) # IMDB labels: 0 = negative, 1 = positive true_label = int(target) # 0 or 1 # FORMAT REQUIREMENT: Need "sentiment" keyword + positive/negative in first 150 chars # This is strict enough to fail conversational responses, but learnable through evolution pred_lower = prediction.lower() pred_start = pred_lower[:150] # First 150 chars # Must mention "sentiment" to get credit (helps evolution learn to add this keyword) has_sentiment_keyword = "sentiment" in pred_start # Check for positive/negative indicators has_positive = "positive" in pred_start has_negative = "negative" in pred_start # Only count as correct if sentiment keyword present AND unambiguous positive/negative if has_sentiment_keyword and has_positive and not has_negative: predicted_label = 1 elif has_sentiment_keyword and has_negative and not has_positive: predicted_label = 0 else: predicted_label = -1 is_correct = (predicted_label == true_label) if is_correct: correct += 1 total += 1 results.append({ "input": str(input_text)[:100] + "..." if len(str(input_text)) > 100 else str(input_text), "target": str(target), "prediction": prediction[:100] + "..." if len(prediction) > 100 else prediction, "correct": is_correct }) except Exception as e: error_msg = f"Sample {idx+1}: {str(e)}" print(f"Error evaluating sample {idx+1}: {e}") errors.append(error_msg) # Only continue if we haven't failed on all samples if len(errors) > len(samples) // 2: # More than half failed print(f"Too many errors ({len(errors)} out of {len(samples)}), stopping evaluation") break continue accuracy = (correct / total * 100) if total > 0 else 0 result_dict = { "accuracy": accuracy, "correct": correct, "total": total, "results": results, "indices": indices # Return indices so we can reuse them for final eval } # Add errors if any occurred if errors: result_dict["errors"] = errors if total == 0: # All samples failed - create a helpful error message result_dict["error"] = f"All {len(samples)} samples failed to evaluate. First few errors:\n" + "\n".join(errors[:3]) return result_dict except Exception as e: return { "error": str(e), "accuracy": 0, "correct": 0, "total": 0, "results": [] } def collect_prompt_history(output_dir: str, initial_score: float = 0.0) -> List[Dict]: """ Collect only the prompts that were "best" at some point during evolution. Returns only programs that improved upon the initial score (deduplicated). Args: output_dir: Directory containing checkpoint data initial_score: Score of the initial prompt (baseline to beat) Returns a list of dicts with: {prompt, score, iteration, id} """ try: all_programs = [] seen_prompts = set() # Track unique prompts # OpenEvolve saves programs in checkpoint directories as JSON files # Structure: output_dir/checkpoints/checkpoint_{iteration}/programs/{program_id}.json checkpoints_dir = os.path.join(output_dir, "checkpoints") if not os.path.exists(checkpoints_dir): return [] # Find all checkpoint directories checkpoint_dirs = sorted(glob.glob(os.path.join(checkpoints_dir, "checkpoint_*"))) # Collect all programs from all checkpoints for checkpoint_dir in checkpoint_dirs: programs_dir = os.path.join(checkpoint_dir, "programs") if not os.path.exists(programs_dir): continue # Read all program JSON files program_files = glob.glob(os.path.join(programs_dir, "*.json")) for pfile in program_files: try: with open(pfile, 'r') as f: program_data = json.load(f) # Extract the code (prompt) from the program data prompt_content = program_data.get("code", "").strip() prog_id = program_data.get("id", os.path.basename(pfile).replace(".json", "")) iteration = program_data.get("iteration_found", 0) metrics = program_data.get("metrics", {}) # Get combined score for comparison combined_score = metrics.get("combined_score", 0.0) all_programs.append({ "prompt": prompt_content, "id": prog_id, "file": pfile, "iteration": iteration, "metrics": metrics, "score": combined_score }) except Exception as e: print(f"Error reading program file {pfile}: {e}") continue # Sort all programs by iteration (chronological order) all_programs.sort(key=lambda x: x.get("iteration", 0)) # Filter to keep only programs that improved the best score # Start from the initial score as the baseline best_programs = [] current_best_score = initial_score for program in all_programs: prompt_content = program["prompt"] score = program["score"] iteration = program["iteration"] # Skip iteration 0 (that's the initial prompt, already added separately) if iteration == 0: continue # Create a normalized version for duplicate detection (ignore whitespace differences) normalized_prompt = " ".join(prompt_content.split()) # Skip duplicates if normalized_prompt in seen_prompts: continue # Only keep if this program improved the best score if score > current_best_score: seen_prompts.add(normalized_prompt) best_programs.append(program) improvement = score - current_best_score print(f" ✓ Best program at iteration {iteration}: score={score:.2%} (improved by +{improvement:.2%})") current_best_score = score return best_programs except Exception as e: print(f"Error collecting prompt history: {e}") return [] def parse_evolution_history(output_dir: str) -> str: """ Parse evolution history from OpenEvolve output directory. Returns a markdown string with visualization of the evolution process. """ try: evolution_viz = "## 🧬 Evolution Progress\n\n" # Look for generation files or logs generation_files = sorted(glob.glob(os.path.join(output_dir, "generation_*.txt"))) log_file = os.path.join(output_dir, "evolution.log") # Try to parse generation files if they exist if generation_files: evolution_viz += "### Generation-by-Generation Progress\n\n" for gen_file in generation_files: gen_num = os.path.basename(gen_file).replace("generation_", "").replace(".txt", "") try: with open(gen_file, 'r') as f: content = f.read() evolution_viz += f"**Generation {gen_num}:**\n```\n{content[:200]}{'...' if len(content) > 200 else ''}\n```\n\n" except: pass # Try to parse log file elif os.path.exists(log_file): evolution_viz += "### Evolution Log\n\n" try: with open(log_file, 'r') as f: log_content = f.read() evolution_viz += f"```\n{log_content[-1000:]}\n```\n\n" except: pass # Look for scores or history file scores_file = os.path.join(output_dir, "scores.json") if os.path.exists(scores_file): try: with open(scores_file, 'r') as f: scores = json.load(f) evolution_viz += "### Score Progression\n\n" evolution_viz += "| Generation | Best Score | Avg Score | Population |\n" evolution_viz += "|------------|-----------|-----------|------------|\n" for gen in scores: evolution_viz += f"| {gen['generation']} | {gen['best']:.3f} | {gen['avg']:.3f} | {gen['population']} |\n" evolution_viz += "\n" except: pass # Look for all program variants program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt"))) if program_files: evolution_viz += f"### Explored Variants\n\n" evolution_viz += f"OpenEvolve explored {len(program_files)} different prompt variants during evolution.\n\n" # Show a few intermediate prompts if len(program_files) > 3: sample_files = [program_files[0], program_files[len(program_files)//2], program_files[-2]] evolution_viz += "**Sample Intermediate Prompts:**\n\n" for idx, pfile in enumerate(sample_files, 1): try: with open(pfile, 'r') as f: prompt_content = f.read() evolution_viz += f"**Variant {idx}:**\n```\n{prompt_content[:150]}{'...' if len(prompt_content) > 150 else ''}\n```\n\n" except: pass # If no specific files found, show directory contents if not generation_files and not os.path.exists(log_file) and not os.path.exists(scores_file): evolution_viz += "### Evolution Complete\n\n" evolution_viz += "OpenEvolve ran 5 iterations of evolutionary optimization using:\n" evolution_viz += "- **Population Size**: 10 prompts per generation\n" evolution_viz += "- **Selection Strategy**: 10% elite, 30% explore, 60% exploit\n" evolution_viz += "- **Islands**: 1 population with mutation and crossover\n" evolution_viz += "- **Evaluation**: 50 samples per prompt variant\n\n" # Count files in output directory all_files = os.listdir(output_dir) evolution_viz += f"Generated {len(all_files)} files during evolution process.\n\n" return evolution_viz except Exception as e: return f"## 🧬 Evolution Progress\n\nEvolution completed successfully. Unable to parse detailed history: {str(e)}\n\n" def create_evaluator_file(dataset_name: str, split: str, model: str, input_field: str, target_field: str, work_dir: str): """Create an evaluator.py file for OpenEvolve that uses same 50 samples as initial/final eval.""" evaluator_code = f''' import os import random import time from datasets import load_dataset from openai import OpenAI def evaluate(prompt: str) -> dict: """ Evaluate a prompt using 50 fixed samples - SAME as initial and final evaluation. OpenEvolve passes a file path, so we need to read the prompt from the file. Using the same 50 samples ensures evolution optimizes for the exact test set. Includes early stopping and rate limit handling. """ try: # CRITICAL: OpenEvolve passes a FILE PATH, not the prompt text! # Check if prompt is a file path and read it if os.path.exists(prompt): with open(prompt, 'r') as f: prompt_text = f.read() # Strip EVOLVE-BLOCK markers if present prompt_text = prompt_text.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip() else: # If not a file path, use as-is (for backward compatibility) prompt_text = prompt # IMPORTANT: Use fixed seed for consistent sampling across all evaluations random.seed(42) # Load dataset try: dataset = load_dataset("{dataset_name}", split="{split}", streaming=False) except ValueError as e: if "config" in str(e).lower() or "Config name is missing" in str(e): default_config = "main" if "{dataset_name}".lower() == "glue": default_config = "sst2" dataset = load_dataset("{dataset_name}", default_config, split="{split}", streaming=False) else: raise # Sample 50 samples with seed 42 - SAME as initial/final evaluation for consistency! num_samples = 50 if len(dataset) > num_samples: # Use SAME sampling logic as initial/final eval indices = random.sample(range(len(dataset)), num_samples) samples = [dataset[i] for i in indices] else: indices = list(range(min(num_samples, len(dataset)))) samples = list(dataset)[:num_samples] # Initialize OpenAI client api_key = os.environ.get("OPENAI_API_KEY") client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=api_key, ) correct = 0 total = 0 errors = 0 print(f"Evaluating on {{len(samples)}} samples...") for idx, sample in enumerate(samples): try: # Get input and target input_text = sample.get("{input_field}", "") if isinstance(input_text, dict): input_text = str(input_text) target = sample.get("{target_field}", "") if isinstance(target, dict): target = str(target) # Format the prompt (use prompt_text that we read from file) formatted_prompt = prompt_text.replace("{{input}}", str(input_text)) # Call the model with retry logic for transient failures max_retries = 3 for retry in range(max_retries): try: response = client.chat.completions.create( model="{model}", messages=[ {{"role": "system", "content": "You are a helpful assistant."}}, {{"role": "user", "content": formatted_prompt}} ], temperature=0.0, max_tokens=500, ) break # Success, exit retry loop except Exception as api_error: if retry < max_retries - 1: wait_time = (retry + 1) * 2 # Exponential backoff: 2s, 4s, 6s print(f" API error on sample {{idx+1}}, retrying in {{wait_time}}s...") time.sleep(wait_time) else: raise # Final retry failed, propagate error prediction = response.choices[0].message.content.strip() # IMDB labels: 0 = negative, 1 = positive true_label = int(target) # 0 or 1 # FORMAT REQUIREMENT: Need "sentiment" keyword + positive/negative in first 150 chars # This is strict enough to fail conversational responses, but learnable through evolution pred_lower = prediction.lower() pred_start = pred_lower[:150] # First 150 chars # Must mention "sentiment" to get credit (helps evolution learn to add this keyword) has_sentiment_keyword = "sentiment" in pred_start # Check for positive/negative indicators has_positive = "positive" in pred_start has_negative = "negative" in pred_start # Only count as correct if sentiment keyword present AND unambiguous positive/negative if has_sentiment_keyword and has_positive and not has_negative: predicted_label = 1 elif has_sentiment_keyword and has_negative and not has_positive: predicted_label = 0 else: predicted_label = -1 is_correct = (predicted_label == true_label) if is_correct: correct += 1 total += 1 # Small delay to avoid rate limiting time.sleep(0.1) if (idx + 1) % 25 == 0: print(f" Progress: {{idx + 1}}/{{len(samples)}} - Current accuracy: {{correct/total:.2%}}") except Exception as e: errors += 1 print(f"Error evaluating sample {{idx+1}}: {{e}}") # Early stopping: if more than 40% of samples fail, abort if errors > len(samples) * 0.4: print(f"Too many errors ({{errors}}/{{idx+1}}), stopping evaluation early") break continue accuracy = (correct / total) if total > 0 else 0.0 print(f"Final: {{correct}}/{{total}} = {{accuracy:.2%}}") # DEBUG: Log the prompt being evaluated and its score (use prompt_text, not file path) prompt_preview = prompt_text[:80].replace('\\n', ' ') if len(prompt_text) > 80 else prompt_text.replace('\\n', ' ') print(f"[EVAL DEBUG] Prompt: '{{prompt_preview}}...' → Score: {{accuracy:.2%}}") return {{ "combined_score": accuracy, "accuracy": accuracy, "correct": correct, "total": total }} except Exception as e: print(f"Error in evaluation: {{e}}") return {{ "combined_score": 0.0, "accuracy": 0.0, "correct": 0, "total": 0, "error": str(e) }} ''' evaluator_path = os.path.join(work_dir, "evaluator.py") with open(evaluator_path, "w") as f: f.write(evaluator_code) return evaluator_path def create_config_file(model: str, work_dir: str): """Create a config.yaml file for OpenEvolve.""" # Create custom templates directory for prompt optimization templates_dir = os.path.join(work_dir, "templates") os.makedirs(templates_dir, exist_ok=True) # Create custom system template for PROMPT optimization (not code) system_template = """You are an expert prompt engineer tasked with iteratively improving prompts for language models. Your job is to analyze the current prompt and suggest improvements based on performance feedback. CRITICAL RULES: 1. Keep prompts BRIEF and DIRECT - shorter is usually better 2. Preserve the EXACT output format that the evaluation expects 3. Do NOT make prompts conversational or verbose 4. Do NOT ask for explanations - just ask for the answer 5. Maintain all placeholder variables like {input}, {text}, etc. 6. Focus on clarity and directness, not linguistic elegance 7. Avoid prompts that might cause the model to discuss multiple possibilities For classification tasks: - Ask for direct classification (e.g., "The sentiment is positive") - Avoid asking "what", "why", or "explain" - just ask for the label - Ensure the response will include the label word (positive/negative/neutral) - Keep prompts short enough that responses stay focused - IMPORTANT: The prompt should naturally cause the model to echo the task type in its response (e.g., if classifying sentiment, the response should include the word "sentiment") Good patterns for classification prompts: - "[Action] [task_type] [delimiter] {input}" - e.g., "Classify sentiment: {input}" - "[Task_type] of [delimiter] {input}" - e.g., "Sentiment of: {input}" - "[Action] the [task_type]: {input}" - e.g., "Determine the sentiment: {input}" Bad patterns to avoid: - Questions ("Is this X or Y?", "What is the X?") - too conversational - No task type mentioned - response won't include the keyword - Verbose explanations - pushes keywords past evaluation window - Multiple questions - confuses the model """ with open(os.path.join(templates_dir, "system_message.txt"), "w") as f: f.write(system_template) # Create custom user template for prompt rewriting user_template = """# Current Prompt Performance - Current metrics: {metrics} - Areas for improvement: {improvement_areas} {artifacts} # Prompt Evolution History {evolution_history} # Current Prompt ```text {current_program} ``` # Task Rewrite the prompt to MAXIMIZE accuracy on sentiment classification. CRITICAL REQUIREMENTS (these DIRECTLY affect score): 1. ✓ MUST include word "sentiment" → model response will contain "sentiment" keyword 2. ✓ MUST use pattern "[Action] sentiment: {{input}}" → triggers correct response format 3. ✓ Keep it reasonable (under 1000 chars) → focus on clarity and effectiveness 4. ✓ MUST keep {{input}} placeholder EXACTLY as-is PROVEN WORKING PATTERNS (use these!): - "Classify sentiment: {{input}}" ← BEST (scores ~90%) - "Determine sentiment: {{input}}" ← Also works well (~85%) - "Sentiment of: {{input}}" ← Good (~80%) PATTERNS THAT FAIL (avoid!): - ❌ "What is the sentiment?" - question format, no {{input}} - ❌ "Review: {{input}}" - missing "sentiment" keyword - ❌ "Please analyze the sentiment..." - too long, word "please" Generate a DIRECT, EFFECTIVE prompt using the working pattern above. You have up to 1000 characters to craft the best possible prompt. Output ONLY the new prompt between ```text markers: ```text Your improved prompt here ``` """ with open(os.path.join(templates_dir, "full_rewrite_user.txt"), "w") as f: f.write(user_template) config = { "llm": { "primary_model": "meta-llama/llama-3.1-8b-instruct", # Use STRONGER model for prompt generation "api_base": "https://openrouter.ai/api/v1", # Use OpenRouter endpoint "temperature": 1.2, # Even higher temperature for more creative variations }, "max_iterations": 10, # More iterations for better convergence "checkpoint_interval": 1, # Save checkpoints every iteration to preserve prompt history "diff_based_evolution": False, # Use full rewrite mode for prompts (not diff/patch mode) "language": "text", # CRITICAL: Optimize text/prompts, not Python code! "max_code_length": 40000, # Allow long prompts (default 10000 is too short) "num_islands": 1, # IMPORTANT: Use only 1 island (not 5) for simpler evolution "prompt": { "template_dir": templates_dir, # Use our custom prompt engineering templates }, "evolution": { "population_size": 15, # Larger population = more variants per generation "num_islands": 1, # Single island for simpler evolution "elite_ratio": 0.4, # Keep top 40% (6 best prompts) "explore_ratio": 0.1, # Minimal random exploration (only 1-2 prompts) "exploit_ratio": 0.5, # 50% exploitation of best prompts }, "database": { "log_prompts": True, # Save prompts used to generate each program "num_islands": 1, # CRITICAL: This is where island count is actually read from! }, "evaluator": { "timeout": 3600, # 1 hour timeout (effectively disabled, but prevents NoneType arithmetic errors) "cascade_evaluation": False, # Disable cascade to prevent signal errors "parallel_evaluations": 1, # Single worker to avoid multiprocessing complexity "distributed": False, # No distributed processing } } config_path = os.path.join(work_dir, "config.yaml") with open(config_path, "w") as f: yaml.dump(config, f) return config_path def optimize_prompt(initial_prompt: str, dataset_name: str, dataset_split: str, model: str, input_field: str, target_field: str, progress=gr.Progress()) -> Tuple[str, str, str]: """Run OpenEvolve to optimize the prompt.""" progress(0, desc="Validating inputs...") # Validate all inputs is_valid, validation_message = validate_inputs( dataset_name, dataset_split, input_field, target_field, initial_prompt ) if not is_valid: return f"## Validation Failed\n\n{validation_message}", "", "" progress(0.05, desc=f"Validation passed: {validation_message}") # Create temporary working directory work_dir = tempfile.mkdtemp(prefix="openevolve_") try: # Save initial prompt with EVOLVE-BLOCK markers for OpenEvolve # These markers tell OpenEvolve which part to optimize initial_prompt_path = os.path.join(work_dir, "initial_prompt.txt") with open(initial_prompt_path, "w") as f: # Wrap prompt in evolve markers so OpenEvolve knows what to optimize f.write("# EVOLVE-BLOCK-START\n") f.write(initial_prompt) f.write("\n# EVOLVE-BLOCK-END\n") # Create evaluator progress(0.1, desc="Creating evaluator...") evaluator_path = create_evaluator_file(dataset_name, dataset_split, model, input_field, target_field, work_dir) # Create config progress(0.15, desc="Creating configuration...") config_path = create_config_file(model, work_dir) # Run initial evaluation with 50 samples # IMPORTANT: We save the indices to ensure final eval uses THE SAME samples progress(0.2, desc="Running initial evaluation on 50 samples...") initial_eval = evaluate_prompt( initial_prompt, dataset_name, dataset_split, 50, model, input_field, target_field ) if "error" in initial_eval: return f"## Error\n\n❌ Initial evaluation failed: {initial_eval['error']}", "", "" if initial_eval["total"] == 0: return f"## Error\n\n❌ Initial evaluation failed: No samples could be evaluated. This usually means:\n- API key is invalid or has no credits\n- Model is unavailable or rate-limited\n- Dataset fields are incorrect\n- Network connectivity issues\n\nPlease check your configuration and try again.", "", "" # Save the indices for final evaluation (ensures fair comparison) eval_indices = initial_eval.get("indices", []) initial_results = f""" ### Initial Prompt Evaluation **Prompt:** ``` {initial_prompt} ``` **Results:** - Accuracy: {initial_eval['accuracy']:.2f}% - Correct: {initial_eval['correct']}/{initial_eval['total']} **Sample Results:** """ for i, result in enumerate(initial_eval['results'][:5], 1): initial_results += f"\n{i}. Input: {result['input']}\n" initial_results += f" Target: {result['target']}\n" initial_results += f" Prediction: {result['prediction']}\n" initial_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" # Run OpenEvolve progress(0.3, desc="Starting evolution: 10 iterations, 10 variants per generation...") output_dir = os.path.join(work_dir, "output") os.makedirs(output_dir, exist_ok=True) try: # Comprehensive fix for "signal only works in main thread" in Gradio # We need to prevent OpenEvolve from using signal handlers entirely # Step 1: Set environment variable to disable process pool import os as os_env os_env.environ['OPENEVOLVE_NO_PARALLEL'] = '1' # Step 2: Monkey-patch signal module to ignore signal calls in threads import signal import threading original_signal = signal.signal def safe_signal(signum, handler): """Only set signal handlers in main thread""" if threading.current_thread() is threading.main_thread(): return original_signal(signum, handler) else: # Return a dummy handler in non-main threads return signal.SIG_DFL signal.signal = safe_signal # Run evolution with signal patch in place result = run_evolution( initial_program=initial_prompt_path, evaluator=evaluator_path, config=config_path, output_dir=output_dir ) # Restore signal handler signal.signal = original_signal progress(0.80, desc="Parsing evolution history...") # Parse evolution history for visualization evolution_viz = parse_evolution_history(output_dir) progress(0.85, desc="Evaluating best evolved prompt...") # Get the best prompt (OpenEvolve saves to output_dir/best/best_program.txt) best_prompt_path = os.path.join(output_dir, "best", "best_program.txt") if os.path.exists(best_prompt_path): with open(best_prompt_path, "r") as f: best_prompt_raw = f.read() # Strip EVOLVE-BLOCK markers that we added best_prompt = best_prompt_raw.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip() print(f"\n[SELECTION] OpenEvolve selected best prompt from: {best_prompt_path}") print(f"[SELECTION] Raw prompt length: {len(best_prompt_raw)} chars") print(f"[SELECTION] Best prompt: '{best_prompt[:100].replace(chr(10), ' ')}...'") else: # Fallback: try without the "best" subdirectory best_prompt_path_alt = os.path.join(output_dir, "best_program.txt") if os.path.exists(best_prompt_path_alt): with open(best_prompt_path_alt, "r") as f: best_prompt_raw = f.read() # Strip EVOLVE-BLOCK markers best_prompt = best_prompt_raw.replace("# EVOLVE-BLOCK-START", "").replace("# EVOLVE-BLOCK-END", "").strip() print(f"\n[SELECTION] OpenEvolve selected best prompt from: {best_prompt_path_alt}") print(f"[SELECTION] Raw prompt length: {len(best_prompt_raw)} chars") print(f"[SELECTION] Best prompt: '{best_prompt[:100].replace(chr(10), ' ')}...'") else: best_prompt = initial_prompt print(f"\n[SELECTION] WARNING: No best_program.txt found, using initial prompt") # Final evaluation: Use same 50 samples as initial eval for fair comparison progress(0.85, desc="Evaluating best prompt on 50 samples (same as initial)...") final_eval = evaluate_prompt( best_prompt, dataset_name, dataset_split, 50, model, input_field, target_field, fixed_indices=eval_indices # Use same 50 samples as initial eval! ) progress(0.95, desc=f"Evaluation complete: {final_eval['correct']}/{final_eval['total']} = {final_eval['accuracy']:.1f}%") final_results = f""" ### Evolved Prompt Evaluation **Prompt:** ``` {best_prompt} ``` **Validation:** - Contains {{input}} placeholder: {'✓ Yes' if '{input}' in best_prompt else '❌ NO - This will break evaluation!'} - Prompt length: {len(best_prompt)} characters **Results:** - Accuracy: {final_eval['accuracy']:.2f}% - Correct: {final_eval['correct']}/{final_eval['total']} - Improvement: {final_eval['accuracy'] - initial_eval['accuracy']:+.2f}% **Sample Results:** """ for i, result in enumerate(final_eval['results'][:5], 1): final_results += f"\n{i}. Input: {result['input']}\n" final_results += f" Target: {result['target']}\n" final_results += f" Prediction: {result['prediction']}\n" final_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" summary = f""" ## 🎉 Optimization Complete! ### Summary - **Dataset**: {dataset_name} ({dataset_split} split) - **Evaluation Model**: {model} - **Evolution Model**: meta-llama/llama-3.1-8b-instruct (larger model for better prompt generation) - **Initial Eval**: 50 samples - **Final Eval**: 50 samples (same samples for fair comparison) - **Evolution**: 50 samples per variant (SAME samples as initial/final!) - **Iterations**: 10 (population: 15, elite: 40%, explore: 10%, exploit: 50%) ### Results - **Initial Accuracy**: {initial_eval['accuracy']:.2f}% ({initial_eval['correct']}/{initial_eval['total']}) - **Final Accuracy**: {final_eval['accuracy']:.2f}% ({final_eval['correct']}/{final_eval['total']}) - **Improvement**: {final_eval['accuracy'] - initial_eval['accuracy']:+.2f}% {validation_message} """ progress(1.0, desc="Complete!") return summary, initial_results, final_results except Exception as e: return f"## Error During Evolution\n\n❌ {str(e)}", initial_results, "" finally: # Don't clean up - keep prompts for browsing # User can manually clean /tmp if needed pass # Create Gradio interface with gr.Blocks(title="OpenEvolve Prompt Optimizer", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🧬 OpenEvolve Prompt Optimizer Automatically optimize prompts using evolutionary algorithms. Evolves better prompts by testing on real datasets. **Setup**: Duplicate this space, add your OpenRouter API key (`OPENAI_API_KEY`) in Settings → Secrets. Get free key at [openrouter.ai](https://openrouter.ai/) **Usage**: Enter initial prompt with `{input}` placeholder → Click optimize → Compare results **Model**: `meta-llama/llama-3.2-3b-instruct` (~$0.04 per 1M tokens) """) with gr.Row(): with gr.Column(): gr.Markdown("### Configuration") dataset_name = gr.Textbox( label="HuggingFace Dataset (Full Name)", value="stanfordnlp/imdb", placeholder="e.g., stanfordnlp/imdb, gsm8k, MathArena/aime_2025", info="Dataset name from HuggingFace Hub. Default: IMDB (sentiment classification)" ) dataset_split = gr.Textbox( label="Dataset Split", value="test", placeholder="e.g., train, test, validation" ) input_field = gr.Textbox( label="Input Field Name", value="text", placeholder="e.g., text, question, sentence", info="The field containing inputs to process" ) target_field = gr.Textbox( label="Target Field Name", value="label", placeholder="e.g., label, answer, target", info="The field containing expected outputs" ) initial_prompt = gr.TextArea( label="Initial Prompt", value="Review sentiment {input}", lines=5, info="Use {input} as placeholder. This baseline scores ~60% - evolution will improve it!" ) # Button outside the column for better visibility with gr.Row(): with gr.Column(): optimize_btn = gr.Button("🚀 Validate & Optimize Prompt", variant="primary", size="lg") # Results section - clearly separated gr.Markdown("---") gr.Markdown("## 📊 Results") with gr.Row(): with gr.Column(): summary = gr.Markdown("Click 'Validate & Optimize Prompt' to start optimization...", visible=True) # Side-by-side comparison: Initial vs Best Prompt gr.Markdown("---") gr.Markdown("## 🔍 Prompt Comparison: Initial vs Best") with gr.Row(): with gr.Column(): initial_results = gr.Markdown("### Initial Prompt\nWill appear here after validation...", visible=True) with gr.Column(): final_results = gr.Markdown("### Best Prompt\nWill appear here after optimization...", visible=True) # Wire up the optimize button with hardcoded model def optimize_with_fixed_model(initial_prompt, dataset_name, dataset_split, input_field, target_field, progress=gr.Progress()): """Wrapper to use fixed model instead of dropdown""" return optimize_prompt( initial_prompt, dataset_name, dataset_split, MODELS[0], # Use fixed llama-3.2-3b model input_field, target_field, progress ) optimize_btn.click( fn=optimize_with_fixed_model, inputs=[initial_prompt, dataset_name, dataset_split, input_field, target_field], outputs=[summary, initial_results, final_results] ) if __name__ == "__main__": demo.launch()