import gradio as gr import os import yaml import json import random from datasets import load_dataset, get_dataset_config_names, get_dataset_split_names from openai import OpenAI from openevolve import run_evolution from typing import Dict, List, Tuple, Optional import tempfile import shutil import requests import glob # Free models from OpenRouter - Curated selection (verified as of 2025) # IMPORTANT: The :free suffix is REQUIRED to use the free tier. Without it, requests are charged! FREE_MODELS = [ "qwen/qwen-2.5-72b-instruct:free", # 72B - Strong in coding/math/multilingual (default - better rate limits) "meta-llama/llama-3.3-70b-instruct:free", # 70B - Advanced reasoning "google/gemma-3-27b-it:free", # 27B - Strong instruction-tuned "mistralai/mistral-small-3.1-24b-instruct:free", # 24B - Efficient and capable "deepseek/deepseek-r1:free", # 671B (37B active) - Top-tier but heavily rate-limited ] def validate_dataset(dataset_name: str, split: str, input_field: str, target_field: str) -> Tuple[bool, str]: """ Validate that the dataset exists and has the required fields. Returns: Tuple of (is_valid, error_message) """ try: # Check if dataset name has correct format (should be org/name or just name) if not dataset_name or dataset_name.strip() == "": return False, "❌ Dataset name cannot be empty" dataset_name = dataset_name.strip() # Try to get dataset info from HuggingFace API hf_token = os.environ.get("HF_TOKEN", None) headers = {} if hf_token: headers["Authorization"] = f"Bearer {hf_token}" # Check if dataset exists on HuggingFace Hub api_url = f"https://huggingface.co/api/datasets/{dataset_name}" response = requests.get(api_url, headers=headers, timeout=10) if response.status_code == 404: return False, f"❌ Dataset '{dataset_name}' not found on HuggingFace Hub. Please use the full dataset name (e.g., 'stanfordnlp/imdb' or 'gsm8k')" elif response.status_code != 200: # Try to load anyway - might be a private dataset or API issue print(f"Warning: Could not verify dataset via API (status {response.status_code}), attempting to load...") # Try to load a small sample to verify it works and check fields print(f"Loading dataset {dataset_name} with split {split}...") # First, check if the split exists try: available_splits = get_dataset_split_names(dataset_name) if split not in available_splits: return False, f"❌ Split '{split}' not found. Available splits: {', '.join(available_splits)}" except Exception as e: print(f"Could not get split names: {e}. Will try to load anyway...") # Load a small sample to check fields dataset = load_dataset(dataset_name, split=split, streaming=True) # Get first example to check fields first_example = next(iter(dataset)) available_fields = list(first_example.keys()) # Check if input field exists if input_field not in available_fields: return False, f"❌ Input field '{input_field}' not found. Available fields: {', '.join(available_fields)}" # Check if target field exists if target_field not in available_fields: return False, f"❌ Target field '{target_field}' not found. Available fields: {', '.join(available_fields)}" # All validations passed return True, f"✅ Dataset validated successfully! Fields '{input_field}' and '{target_field}' found." except Exception as e: error_msg = str(e) if "404" in error_msg or "not found" in error_msg.lower(): return False, f"❌ Dataset '{dataset_name}' not found. Please check the dataset name (use format: org/dataset-name)" return False, f"❌ Error validating dataset: {error_msg}" def validate_inputs(dataset_name: str, split: str, input_field: str, target_field: str, initial_prompt: str) -> Tuple[bool, str]: """ Validate all inputs before starting optimization. Returns: Tuple of (is_valid, message) """ # Check API key api_key = os.environ.get("OPENAI_API_KEY") if not api_key: return False, "❌ OPENAI_API_KEY environment variable not set. Please set it in the Space secrets." # Check prompt contains {input} placeholder if "{input}" not in initial_prompt: return False, "❌ Prompt must contain '{input}' placeholder for dataset inputs" # Check dataset name format dataset_name = dataset_name.strip() if not dataset_name: return False, "❌ Dataset name cannot be empty" # Validate dataset and fields is_valid, message = validate_dataset(dataset_name, split, input_field, target_field) if not is_valid: return False, message return True, message def evaluate_prompt(prompt: str, dataset_name: str, split: str, num_samples: int, model: str, input_field: str, target_field: str) -> Dict: """Evaluate a prompt on a dataset using the selected model.""" try: # Get API key from environment api_key = os.environ.get("OPENAI_API_KEY") if not api_key: return { "error": "OPENAI_API_KEY not set in environment", "accuracy": 0, "correct": 0, "total": 0, "results": [] } # Load dataset dataset = load_dataset(dataset_name, split=split, streaming=False) # Sample random examples if len(dataset) > num_samples: indices = random.sample(range(len(dataset)), num_samples) samples = [dataset[i] for i in indices] else: samples = list(dataset)[:num_samples] # Initialize OpenAI client with OpenRouter client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=api_key, ) correct = 0 total = 0 results = [] errors = [] for idx, sample in enumerate(samples): try: # Get input and target input_text = sample.get(input_field, "") if isinstance(input_text, dict): input_text = str(input_text) target = sample.get(target_field, "") if isinstance(target, dict): target = str(target) # Format the prompt with the input formatted_prompt = prompt.replace("{input}", str(input_text)) # Call the model response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": formatted_prompt} ], temperature=0.1, max_tokens=500, ) prediction = response.choices[0].message.content.strip() # Smart evaluation - handle both exact match and semantic match target_str = str(target).lower().strip() pred_lower = prediction.lower() # Check exact match first is_correct = target_str in pred_lower # If not exact match, check for semantic equivalents (e.g., "1" = "positive") if not is_correct: # Common sentiment mappings if target_str in ["1", "positive", "pos"]: is_correct = any(word in pred_lower for word in ["positive", "good", "great"]) elif target_str in ["0", "negative", "neg"]: is_correct = any(word in pred_lower for word in ["negative", "bad", "poor"]) if is_correct: correct += 1 total += 1 results.append({ "input": str(input_text)[:100] + "..." if len(str(input_text)) > 100 else str(input_text), "target": str(target), "prediction": prediction[:100] + "..." if len(prediction) > 100 else prediction, "correct": is_correct }) except Exception as e: error_msg = f"Sample {idx+1}: {str(e)}" print(f"Error evaluating sample {idx+1}: {e}") errors.append(error_msg) # Only continue if we haven't failed on all samples if len(errors) > len(samples) // 2: # More than half failed print(f"Too many errors ({len(errors)} out of {len(samples)}), stopping evaluation") break continue accuracy = (correct / total * 100) if total > 0 else 0 result_dict = { "accuracy": accuracy, "correct": correct, "total": total, "results": results } # Add errors if any occurred if errors: result_dict["errors"] = errors if total == 0: # All samples failed - create a helpful error message result_dict["error"] = f"All {len(samples)} samples failed to evaluate. First few errors:\n" + "\n".join(errors[:3]) return result_dict except Exception as e: return { "error": str(e), "accuracy": 0, "correct": 0, "total": 0, "results": [] } def collect_prompt_history(output_dir: str) -> List[Dict]: """ Collect all prompts discovered during evolution with their scores. Returns a list of dicts with: {prompt, score, iteration, id} """ try: prompts = [] # Look for all program files program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt"))) # Also check for logs to get scores log_dir = os.path.join(output_dir, "logs") for pfile in program_files: try: with open(pfile, 'r') as f: prompt_content = f.read() # Extract program ID from filename prog_id = os.path.basename(pfile).replace("program_", "").replace(".txt", "") prompts.append({ "prompt": prompt_content, "id": prog_id, "file": pfile }) except: continue return prompts except Exception as e: print(f"Error collecting prompt history: {e}") return [] def parse_evolution_history(output_dir: str) -> str: """ Parse evolution history from OpenEvolve output directory. Returns a markdown string with visualization of the evolution process. """ try: evolution_viz = "## 🧬 Evolution Progress\n\n" # Look for generation files or logs generation_files = sorted(glob.glob(os.path.join(output_dir, "generation_*.txt"))) log_file = os.path.join(output_dir, "evolution.log") # Try to parse generation files if they exist if generation_files: evolution_viz += "### Generation-by-Generation Progress\n\n" for gen_file in generation_files: gen_num = os.path.basename(gen_file).replace("generation_", "").replace(".txt", "") try: with open(gen_file, 'r') as f: content = f.read() evolution_viz += f"**Generation {gen_num}:**\n```\n{content[:200]}{'...' if len(content) > 200 else ''}\n```\n\n" except: pass # Try to parse log file elif os.path.exists(log_file): evolution_viz += "### Evolution Log\n\n" try: with open(log_file, 'r') as f: log_content = f.read() evolution_viz += f"```\n{log_content[-1000:]}\n```\n\n" except: pass # Look for scores or history file scores_file = os.path.join(output_dir, "scores.json") if os.path.exists(scores_file): try: with open(scores_file, 'r') as f: scores = json.load(f) evolution_viz += "### Score Progression\n\n" evolution_viz += "| Generation | Best Score | Avg Score | Population |\n" evolution_viz += "|------------|-----------|-----------|------------|\n" for gen in scores: evolution_viz += f"| {gen['generation']} | {gen['best']:.3f} | {gen['avg']:.3f} | {gen['population']} |\n" evolution_viz += "\n" except: pass # Look for all program variants program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt"))) if program_files: evolution_viz += f"### Explored Variants\n\n" evolution_viz += f"OpenEvolve explored {len(program_files)} different prompt variants during evolution.\n\n" # Show a few intermediate prompts if len(program_files) > 3: sample_files = [program_files[0], program_files[len(program_files)//2], program_files[-2]] evolution_viz += "**Sample Intermediate Prompts:**\n\n" for idx, pfile in enumerate(sample_files, 1): try: with open(pfile, 'r') as f: prompt_content = f.read() evolution_viz += f"**Variant {idx}:**\n```\n{prompt_content[:150]}{'...' if len(prompt_content) > 150 else ''}\n```\n\n" except: pass # If no specific files found, show directory contents if not generation_files and not os.path.exists(log_file) and not os.path.exists(scores_file): evolution_viz += "### Evolution Complete\n\n" evolution_viz += "OpenEvolve ran 10 iterations of evolutionary optimization using:\n" evolution_viz += "- **Population Size**: 10 prompts per generation\n" evolution_viz += "- **Selection Strategy**: 10% elite, 30% explore, 60% exploit\n" evolution_viz += "- **Islands**: 1 population with mutation and crossover\n" evolution_viz += "- **Evaluation**: 100 samples per prompt variant\n\n" # Count files in output directory all_files = os.listdir(output_dir) evolution_viz += f"Generated {len(all_files)} files during evolution process.\n\n" return evolution_viz except Exception as e: return f"## 🧬 Evolution Progress\n\nEvolution completed successfully. Unable to parse detailed history: {str(e)}\n\n" def create_evaluator_file(dataset_name: str, split: str, model: str, input_field: str, target_field: str, work_dir: str): """Create an evaluator.py file for OpenEvolve with staged/cascading evaluation.""" evaluator_code = f''' import os import random from datasets import load_dataset from openai import OpenAI def evaluate(prompt: str) -> dict: """ Evaluate a prompt using 2-stage cascading evaluation to save API calls. Stage 1: Evaluate with 20 samples - If accuracy >= 0.5, proceed to Stage 2 - If accuracy < 0.5, return early (no point wasting 80 more samples) Stage 2: Evaluate with 80 more samples (total 100) - Combine results for final score Returns dict with combined_score (0-1), accuracy, correct, and total. """ try: # Load dataset dataset = load_dataset("{dataset_name}", split="{split}", streaming=False) # Initialize OpenAI client api_key = os.environ.get("OPENAI_API_KEY") client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=api_key, ) def evaluate_samples(samples, correct_so_far=0, total_so_far=0): """Helper function to evaluate a batch of samples.""" correct = correct_so_far total = total_so_far for sample in samples: try: # Get input and target input_text = sample.get("{input_field}", "") if isinstance(input_text, dict): input_text = str(input_text) target = sample.get("{target_field}", "") if isinstance(target, dict): target = str(target) # Format the prompt formatted_prompt = prompt.replace("{{input}}", str(input_text)) # Call the model response = client.chat.completions.create( model="{model}", messages=[ {{"role": "system", "content": "You are a helpful assistant."}}, {{"role": "user", "content": formatted_prompt}} ], temperature=0.1, max_tokens=500, ) prediction = response.choices[0].message.content.strip() # Smart evaluation - handle both exact match and semantic match target_str = str(target).lower().strip() pred_lower = prediction.lower() # Check exact match first is_correct = target_str in pred_lower # If not exact match, check for semantic equivalents (e.g., "1" = "positive") if not is_correct: # Common sentiment mappings if target_str in ["1", "positive", "pos"]: is_correct = any(word in pred_lower for word in ["positive", "good", "great"]) elif target_str in ["0", "negative", "neg"]: is_correct = any(word in pred_lower for word in ["negative", "bad", "poor"]) if is_correct: correct += 1 total += 1 except Exception as e: print(f"Error evaluating sample: {{e}}") continue return correct, total # STAGE 1: Evaluate with 20 samples first stage1_size = 20 stage1_samples_count = min(stage1_size, len(dataset)) if len(dataset) > stage1_samples_count: stage1_indices = random.sample(range(len(dataset)), stage1_samples_count) stage1_samples = [dataset[i] for i in stage1_indices] else: stage1_samples = list(dataset)[:stage1_samples_count] print(f"[Stage 1/2] Evaluating with {{len(stage1_samples)}} samples...") correct, total = evaluate_samples(stage1_samples) stage1_score = (correct / total) if total > 0 else 0.0 print(f"[Stage 1/2] Score: {{stage1_score:.3f}} ({{correct}}/{{total}})") # Early exit if Stage 1 score is below threshold if stage1_score < 0.5: print(f"[Stage 1/2] Score below 0.5 threshold - skipping Stage 2 (saved 80 API calls)") return {{ "combined_score": stage1_score, "accuracy": stage1_score, "correct": correct, "total": total, "stage": "stage1_early_exit" }} # STAGE 2: Continue with 80 more samples print(f"[Stage 2/2] Score >= 0.5 - proceeding with 80 more samples...") stage2_size = 80 stage2_samples_count = min(stage2_size, max(0, len(dataset) - stage1_samples_count)) if stage2_samples_count > 0: # Get different samples from Stage 1 remaining_indices = list(set(range(len(dataset))) - set(stage1_indices if 'stage1_indices' in locals() else [])) if len(remaining_indices) >= stage2_samples_count: stage2_indices = random.sample(remaining_indices, stage2_samples_count) stage2_samples = [dataset[i] for i in stage2_indices] else: stage2_samples = [dataset[i] for i in remaining_indices[:stage2_samples_count]] correct, total = evaluate_samples(stage2_samples, correct, total) final_score = (correct / total) if total > 0 else stage1_score print(f"[Stage 2/2] Final score: {{final_score:.3f}} ({{correct}}/{{total}})") return {{ "combined_score": final_score, "accuracy": final_score, "correct": correct, "total": total, "stage": "stage2_complete" }} else: print(f"[Stage 2/2] Not enough samples in dataset for Stage 2") return {{ "combined_score": stage1_score, "accuracy": stage1_score, "correct": correct, "total": total, "stage": "stage1_complete" }} except Exception as e: print(f"Error in evaluation: {{e}}") return {{ "combined_score": 0.0, "accuracy": 0.0, "correct": 0, "total": 0, "error": str(e) }} ''' evaluator_path = os.path.join(work_dir, "evaluator.py") with open(evaluator_path, "w") as f: f.write(evaluator_code) return evaluator_path def create_config_file(model: str, work_dir: str): """Create a config.yaml file for OpenEvolve.""" config = { "llm": { "primary_model": model, "api_base": "https://openrouter.ai/api/v1", # Use OpenRouter endpoint "temperature": 0.7, }, "max_iterations": 10, "evolution": { "population_size": 10, "num_islands": 1, "elite_ratio": 0.1, "explore_ratio": 0.3, "exploit_ratio": 0.6, }, "evaluator": { "timeout": None, # Disable timeout to avoid signal handling issues "cascade_evaluation": False, # Disable cascade to prevent signal errors "parallel_evaluations": 1, # Single worker "distributed": False, # No distributed processing } } config_path = os.path.join(work_dir, "config.yaml") with open(config_path, "w") as f: yaml.dump(config, f) return config_path def optimize_prompt(initial_prompt: str, dataset_name: str, dataset_split: str, model: str, input_field: str, target_field: str, progress=gr.Progress()) -> Tuple[str, str, str, str, List[str], int, int]: """Run OpenEvolve to optimize the prompt.""" progress(0, desc="Validating inputs...") # Validate all inputs is_valid, validation_message = validate_inputs( dataset_name, dataset_split, input_field, target_field, initial_prompt ) if not is_valid: return f"## Validation Failed\n\n{validation_message}", "", "", "", [], 0, 0 progress(0.05, desc=f"Validation passed: {validation_message}") # Create temporary working directory work_dir = tempfile.mkdtemp(prefix="openevolve_") try: # Save initial prompt initial_prompt_path = os.path.join(work_dir, "initial_prompt.txt") with open(initial_prompt_path, "w") as f: f.write(initial_prompt) # Create evaluator progress(0.1, desc="Creating evaluator...") evaluator_path = create_evaluator_file(dataset_name, dataset_split, model, input_field, target_field, work_dir) # Create config progress(0.15, desc="Creating configuration...") config_path = create_config_file(model, work_dir) # Run initial evaluation (using 20 samples to save API calls) progress(0.2, desc="Running initial evaluation on 20 samples...") initial_eval = evaluate_prompt( initial_prompt, dataset_name, dataset_split, 20, model, input_field, target_field ) if "error" in initial_eval: return f"## Error\n\n❌ Initial evaluation failed: {initial_eval['error']}", "", "", "", [initial_prompt], 0, 1 if initial_eval["total"] == 0: return f"## Error\n\n❌ Initial evaluation failed: No samples could be evaluated. This usually means:\n- API key is invalid or has no credits\n- Model is unavailable or rate-limited\n- Dataset fields are incorrect\n- Network connectivity issues\n\nPlease check your configuration and try again.", "", "", "", [initial_prompt], 0, 1 initial_results = f""" ### Initial Prompt Evaluation **Prompt:** ``` {initial_prompt} ``` **Results:** - Accuracy: {initial_eval['accuracy']:.2f}% - Correct: {initial_eval['correct']}/{initial_eval['total']} **Sample Results:** """ for i, result in enumerate(initial_eval['results'][:5], 1): initial_results += f"\n{i}. Input: {result['input']}\n" initial_results += f" Target: {result['target']}\n" initial_results += f" Prediction: {result['prediction']}\n" initial_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" # Run OpenEvolve progress(0.3, desc="Starting OpenEvolve optimization (10 iterations with staged evaluation)...") output_dir = os.path.join(work_dir, "output") os.makedirs(output_dir, exist_ok=True) try: # Run evolution # Note: OpenEvolve may show "Initialized process parallel controller" warnings # but they are harmless in this single-worker configuration result = run_evolution( initial_program=initial_prompt_path, evaluator=evaluator_path, config=config_path, output_dir=output_dir ) progress(0.80, desc="Parsing evolution history...") # Parse evolution history for visualization evolution_viz = parse_evolution_history(output_dir) progress(0.85, desc="Evaluating best evolved prompt on 20 samples...") # Get the best prompt best_prompt_path = os.path.join(output_dir, "best_program.txt") if os.path.exists(best_prompt_path): with open(best_prompt_path, "r") as f: best_prompt = f.read() else: best_prompt = initial_prompt # Evaluate best prompt (using 20 samples like initial eval for consistency) final_eval = evaluate_prompt( best_prompt, dataset_name, dataset_split, 20, model, input_field, target_field ) final_results = f""" ### Evolved Prompt Evaluation **Prompt:** ``` {best_prompt} ``` **Results:** - Accuracy: {final_eval['accuracy']:.2f}% - Correct: {final_eval['correct']}/{final_eval['total']} - Improvement: {final_eval['accuracy'] - initial_eval['accuracy']:+.2f}% **Sample Results:** """ for i, result in enumerate(final_eval['results'][:5], 1): final_results += f"\n{i}. Input: {result['input']}\n" final_results += f" Target: {result['target']}\n" final_results += f" Prediction: {result['prediction']}\n" final_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" summary = f""" ## 🎉 Optimization Complete! ### Summary - **Dataset**: {dataset_name} ({dataset_split} split) - **Model**: {model} - **Initial/Final Eval**: 20 samples each - **Evolution Eval**: Staged (20 → 100 if score ≥ 0.5) - **Iterations**: 10 ### Results - **Initial Accuracy**: {initial_eval['accuracy']:.2f}% - **Final Accuracy**: {final_eval['accuracy']:.2f}% - **Improvement**: {final_eval['accuracy'] - initial_eval['accuracy']:+.2f}% {validation_message} """ progress(1.0, desc="Complete!") # Collect all discovered prompts for browsing all_prompts = [initial_prompt] # Start with initial prompt prompt_history = collect_prompt_history(output_dir) for p in prompt_history: all_prompts.append(p["prompt"]) # Ensure we have the best prompt at the end if best_prompt not in all_prompts: all_prompts.append(best_prompt) return summary, initial_results, evolution_viz, final_results, all_prompts, 0, len(all_prompts) except Exception as e: return f"## Error During Evolution\n\n❌ {str(e)}", initial_results, "", "", [initial_prompt], 0, 1 finally: # Don't clean up - keep prompts for browsing # User can manually clean /tmp if needed pass # Create Gradio interface with gr.Blocks(title="OpenEvolve Prompt Optimizer", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🧬 OpenEvolve Prompt Optimizer Automatically evolve and optimize your prompts using evolutionary algorithms! This space uses [OpenEvolve](https://github.com/algorithmicsuperintelligence/openevolve) to iteratively improve prompts by testing them on real datasets and evolving better versions. ## How it works: 1. Enter an initial prompt (use `{input}` as a placeholder for dataset inputs) 2. Enter the full HuggingFace dataset name (e.g., `stanfordnlp/imdb`, `gsm8k`) 3. Specify the dataset split and field names 4. Choose a free model from OpenRouter 5. Click "Optimize Prompt" - the system will validate everything first! 6. Watch the evolution progress in real-time 7. Compare initial vs. evolved performance! **Note**: API key is read from `OPENAI_API_KEY` environment variable (set in Space secrets) """) with gr.Row(): with gr.Column(): gr.Markdown("### Configuration") model = gr.Dropdown( choices=FREE_MODELS, value=FREE_MODELS[0], label="Select Model", info="Choose from 5 curated free models on OpenRouter (24B to 671B parameters)" ) dataset_name = gr.Textbox( label="HuggingFace Dataset (Full Name)", value="stanfordnlp/imdb", placeholder="e.g., stanfordnlp/imdb, openai/gsm8k, SetFit/sst5", info="Full dataset name from HuggingFace Hub (org/dataset-name or dataset-name)" ) dataset_split = gr.Textbox( label="Dataset Split", value="test", placeholder="e.g., train, test, validation" ) input_field = gr.Textbox( label="Input Field Name", value="text", placeholder="e.g., text, question, context", info="The field containing inputs to process" ) target_field = gr.Textbox( label="Target Field Name", value="label", placeholder="e.g., label, answer, target", info="The field containing expected outputs" ) initial_prompt = gr.TextArea( label="Initial Prompt", value="Analyze the sentiment of the following text and classify it as positive or negative:\n\n{input}\n\nClassification:", lines=6, info="Use {input} as placeholder for dataset inputs" ) # Button outside the column for better visibility with gr.Row(): with gr.Column(): optimize_btn = gr.Button("🚀 Validate & Optimize Prompt", variant="primary", size="lg") # Results section - clearly separated gr.Markdown("---") gr.Markdown("## 📊 Results") with gr.Row(): with gr.Column(): summary = gr.Markdown("Click 'Validate & Optimize Prompt' to start optimization...", visible=True) with gr.Row(): with gr.Column(): initial_results = gr.Markdown("### Initial Results\nWill appear here after validation...", visible=True) with gr.Column(): final_results = gr.Markdown("### Final Results\nWill appear here after optimization...", visible=True) with gr.Row(): with gr.Column(): evolution_progress = gr.Markdown("### Evolution Progress\nEvolution progress will appear here during optimization...", visible=True) # Prompt History Browser gr.Markdown("---") gr.Markdown("## 📜 Prompt History Browser") gr.Markdown("Browse through all prompts discovered during evolution (initial → intermediate → final)") with gr.Row(): with gr.Column(scale=8): prompt_display = gr.TextArea( label="", lines=10, interactive=False, placeholder="Prompts will appear here after optimization completes...", show_label=False ) with gr.Column(scale=2): prompt_counter = gr.Markdown("**Prompt**: -/-") prev_btn = gr.Button("⬅️ Previous", size="sm") next_btn = gr.Button("Next ➡️", size="sm") gr.Markdown("**Prompt Types:**\n- First = Initial\n- Middle = Intermediate\n- Last = Final Best") # Hidden state to store prompt history and current index prompt_history_state = gr.State([]) current_prompt_index = gr.State(0) # Documentation section - in collapsible accordion gr.Markdown("---") with gr.Accordion("📚 Documentation & Examples", open=False): gr.Markdown(""" ### Example Datasets & Fields: | Dataset | Split | Input Field | Target Field | Task | |---------|-------|-------------|--------------|------| | stanfordnlp/imdb | test | text | label | Sentiment Analysis | | rajpurkar/squad | validation | question | answers | Question Answering | | dair-ai/emotion | test | text | label | Emotion Classification | | openai/gsm8k | test | question | answer | Math Reasoning | | fancyzhx/ag_news | test | text | label | News Classification | ### About This Demo Space: **This is a demonstration space** showcasing OpenEvolve's prompt optimization capabilities. The interface shows you how the system works, but **you'll need to set up your own instance to run optimizations**. ### How to Run This Yourself: 1. **Clone this Space**: Click "⋮" (three dots) at top-right → "Duplicate this Space" 2. **Set Environment Variables** in your cloned Space's settings: - `OPENAI_API_KEY`: Your OpenRouter API key (get free key at [openrouter.ai/keys](https://openrouter.ai/keys)) - `HF_TOKEN`: (Optional) HuggingFace token for private datasets 3. **Configure Your Optimization**: - Dataset: Use full name format (e.g., `stanfordnlp/imdb` or `openai/gsm8k`) - Fields: Specify exact field names from the dataset schema - Model: Choose from 5 curated free models (larger models = better results but slower/rate-limited) 4. **Run & Monitor**: - All inputs are validated before starting - Evolution uses staged evaluation (20 samples first, then 80 more if promising) - Saves API calls by early-stopping poor prompts (< 50% accuracy) - Watch evolution progress visualization in real-time ### About OpenEvolve: OpenEvolve is an open-source evolutionary optimization framework. Learn more at: - [GitHub Repository](https://github.com/algorithmicsuperintelligence/openevolve) - [Documentation](https://github.com/algorithmicsuperintelligence/openevolve#readme) """) # Navigation functions for prompt browser def show_previous_prompt(prompts, current_idx): if not prompts or len(prompts) == 0: return "", "**Prompt**: -/-", 0 new_idx = max(0, current_idx - 1) counter_text = f"**Prompt**: {new_idx + 1}/{len(prompts)}" if new_idx == 0: counter_text += " (Initial)" elif new_idx == len(prompts) - 1: counter_text += " (Final Best)" else: counter_text += " (Intermediate)" return prompts[new_idx], counter_text, new_idx def show_next_prompt(prompts, current_idx): if not prompts or len(prompts) == 0: return "", "**Prompt**: -/-", 0 new_idx = min(len(prompts) - 1, current_idx + 1) counter_text = f"**Prompt**: {new_idx + 1}/{len(prompts)}" if new_idx == 0: counter_text += " (Initial)" elif new_idx == len(prompts) - 1: counter_text += " (Final Best)" else: counter_text += " (Intermediate)" return prompts[new_idx], counter_text, new_idx def update_prompt_display(prompts, idx, total): if not prompts or len(prompts) == 0: return "", "**Prompt**: -/-" idx = min(idx, len(prompts) - 1) counter_text = f"**Prompt**: {idx + 1}/{len(prompts)}" if idx == 0: counter_text += " (Initial)" elif idx == len(prompts) - 1: counter_text += " (Final Best)" else: counter_text += " (Intermediate)" return prompts[idx], counter_text # Wire up the optimize button optimize_result = optimize_btn.click( fn=optimize_prompt, inputs=[initial_prompt, dataset_name, dataset_split, model, input_field, target_field], outputs=[summary, initial_results, evolution_progress, final_results, prompt_history_state, current_prompt_index, gr.State()] # dummy for total ) # Update prompt display when optimization completes optimize_result.then( fn=update_prompt_display, inputs=[prompt_history_state, current_prompt_index, gr.State()], outputs=[prompt_display, prompt_counter] ) # Wire up navigation buttons prev_btn.click( fn=show_previous_prompt, inputs=[prompt_history_state, current_prompt_index], outputs=[prompt_display, prompt_counter, current_prompt_index] ) next_btn.click( fn=show_next_prompt, inputs=[prompt_history_state, current_prompt_index], outputs=[prompt_display, prompt_counter, current_prompt_index] ) if __name__ == "__main__": demo.launch()