codelion's picture
Upload app.py
c1d7cbf verified
raw
history blame
38.7 kB
import gradio as gr
import os
import yaml
import json
import random
from datasets import load_dataset, get_dataset_config_names, get_dataset_split_names
from openai import OpenAI
from openevolve import run_evolution
from typing import Dict, List, Tuple, Optional
import tempfile
import shutil
import requests
import glob
# Free models from OpenRouter - Curated selection (verified as of 2025)
# IMPORTANT: The :free suffix is REQUIRED to use the free tier. Without it, requests are charged!
FREE_MODELS = [
"qwen/qwen-2.5-72b-instruct:free", # 72B - Strong in coding/math/multilingual (default - better rate limits)
"meta-llama/llama-3.3-70b-instruct:free", # 70B - Advanced reasoning
"google/gemma-3-27b-it:free", # 27B - Strong instruction-tuned
"mistralai/mistral-small-3.1-24b-instruct:free", # 24B - Efficient and capable
"deepseek/deepseek-r1:free", # 671B (37B active) - Top-tier but heavily rate-limited
]
def validate_dataset(dataset_name: str, split: str, input_field: str, target_field: str) -> Tuple[bool, str]:
"""
Validate that the dataset exists and has the required fields.
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check if dataset name has correct format (should be org/name or just name)
if not dataset_name or dataset_name.strip() == "":
return False, "❌ Dataset name cannot be empty"
dataset_name = dataset_name.strip()
# Try to get dataset info from HuggingFace API
hf_token = os.environ.get("HF_TOKEN", None)
headers = {}
if hf_token:
headers["Authorization"] = f"Bearer {hf_token}"
# Check if dataset exists on HuggingFace Hub
api_url = f"https://huggingface.co/api/datasets/{dataset_name}"
response = requests.get(api_url, headers=headers, timeout=10)
if response.status_code == 404:
return False, f"❌ Dataset '{dataset_name}' not found on HuggingFace Hub. Please use the full dataset name (e.g., 'stanfordnlp/imdb' or 'gsm8k')"
elif response.status_code != 200:
# Try to load anyway - might be a private dataset or API issue
print(f"Warning: Could not verify dataset via API (status {response.status_code}), attempting to load...")
# Try to load a small sample to verify it works and check fields
print(f"Loading dataset {dataset_name} with split {split}...")
# First, check if the split exists
try:
available_splits = get_dataset_split_names(dataset_name)
if split not in available_splits:
return False, f"❌ Split '{split}' not found. Available splits: {', '.join(available_splits)}"
except Exception as e:
print(f"Could not get split names: {e}. Will try to load anyway...")
# Load a small sample to check fields
dataset = load_dataset(dataset_name, split=split, streaming=True)
# Get first example to check fields
first_example = next(iter(dataset))
available_fields = list(first_example.keys())
# Check if input field exists
if input_field not in available_fields:
return False, f"❌ Input field '{input_field}' not found. Available fields: {', '.join(available_fields)}"
# Check if target field exists
if target_field not in available_fields:
return False, f"❌ Target field '{target_field}' not found. Available fields: {', '.join(available_fields)}"
# All validations passed
return True, f"✅ Dataset validated successfully! Fields '{input_field}' and '{target_field}' found."
except Exception as e:
error_msg = str(e)
if "404" in error_msg or "not found" in error_msg.lower():
return False, f"❌ Dataset '{dataset_name}' not found. Please check the dataset name (use format: org/dataset-name)"
return False, f"❌ Error validating dataset: {error_msg}"
def validate_inputs(dataset_name: str, split: str, input_field: str, target_field: str,
initial_prompt: str) -> Tuple[bool, str]:
"""
Validate all inputs before starting optimization.
Returns:
Tuple of (is_valid, message)
"""
# Check API key
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
return False, "❌ OPENAI_API_KEY environment variable not set. Please set it in the Space secrets."
# Check prompt contains {input} placeholder
if "{input}" not in initial_prompt:
return False, "❌ Prompt must contain '{input}' placeholder for dataset inputs"
# Check dataset name format
dataset_name = dataset_name.strip()
if not dataset_name:
return False, "❌ Dataset name cannot be empty"
# Validate dataset and fields
is_valid, message = validate_dataset(dataset_name, split, input_field, target_field)
if not is_valid:
return False, message
return True, message
def evaluate_prompt(prompt: str, dataset_name: str, split: str, num_samples: int,
model: str, input_field: str, target_field: str) -> Dict:
"""Evaluate a prompt on a dataset using the selected model."""
try:
# Get API key from environment
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
return {
"error": "OPENAI_API_KEY not set in environment",
"accuracy": 0,
"correct": 0,
"total": 0,
"results": []
}
# Load dataset
dataset = load_dataset(dataset_name, split=split, streaming=False)
# Sample random examples
if len(dataset) > num_samples:
indices = random.sample(range(len(dataset)), num_samples)
samples = [dataset[i] for i in indices]
else:
samples = list(dataset)[:num_samples]
# Initialize OpenAI client with OpenRouter
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
correct = 0
total = 0
results = []
errors = []
for idx, sample in enumerate(samples):
try:
# Get input and target
input_text = sample.get(input_field, "")
if isinstance(input_text, dict):
input_text = str(input_text)
target = sample.get(target_field, "")
if isinstance(target, dict):
target = str(target)
# Format the prompt with the input
formatted_prompt = prompt.replace("{input}", str(input_text))
# Call the model
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": formatted_prompt}
],
temperature=0.1,
max_tokens=500,
)
prediction = response.choices[0].message.content.strip()
# Smart evaluation - handle both exact match and semantic match
target_str = str(target).lower().strip()
pred_lower = prediction.lower()
# Check exact match first
is_correct = target_str in pred_lower
# If not exact match, check for semantic equivalents (e.g., "1" = "positive")
if not is_correct:
# Common sentiment mappings
if target_str in ["1", "positive", "pos"]:
is_correct = any(word in pred_lower for word in ["positive", "good", "great"])
elif target_str in ["0", "negative", "neg"]:
is_correct = any(word in pred_lower for word in ["negative", "bad", "poor"])
if is_correct:
correct += 1
total += 1
results.append({
"input": str(input_text)[:100] + "..." if len(str(input_text)) > 100 else str(input_text),
"target": str(target),
"prediction": prediction[:100] + "..." if len(prediction) > 100 else prediction,
"correct": is_correct
})
except Exception as e:
error_msg = f"Sample {idx+1}: {str(e)}"
print(f"Error evaluating sample {idx+1}: {e}")
errors.append(error_msg)
# Only continue if we haven't failed on all samples
if len(errors) > len(samples) // 2: # More than half failed
print(f"Too many errors ({len(errors)} out of {len(samples)}), stopping evaluation")
break
continue
accuracy = (correct / total * 100) if total > 0 else 0
result_dict = {
"accuracy": accuracy,
"correct": correct,
"total": total,
"results": results
}
# Add errors if any occurred
if errors:
result_dict["errors"] = errors
if total == 0:
# All samples failed - create a helpful error message
result_dict["error"] = f"All {len(samples)} samples failed to evaluate. First few errors:\n" + "\n".join(errors[:3])
return result_dict
except Exception as e:
return {
"error": str(e),
"accuracy": 0,
"correct": 0,
"total": 0,
"results": []
}
def collect_prompt_history(output_dir: str) -> List[Dict]:
"""
Collect all prompts discovered during evolution with their scores.
Returns a list of dicts with: {prompt, score, iteration, id}
"""
try:
prompts = []
# Look for all program files
program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt")))
# Also check for logs to get scores
log_dir = os.path.join(output_dir, "logs")
for pfile in program_files:
try:
with open(pfile, 'r') as f:
prompt_content = f.read()
# Extract program ID from filename
prog_id = os.path.basename(pfile).replace("program_", "").replace(".txt", "")
prompts.append({
"prompt": prompt_content,
"id": prog_id,
"file": pfile
})
except:
continue
return prompts
except Exception as e:
print(f"Error collecting prompt history: {e}")
return []
def parse_evolution_history(output_dir: str) -> str:
"""
Parse evolution history from OpenEvolve output directory.
Returns a markdown string with visualization of the evolution process.
"""
try:
evolution_viz = "## 🧬 Evolution Progress\n\n"
# Look for generation files or logs
generation_files = sorted(glob.glob(os.path.join(output_dir, "generation_*.txt")))
log_file = os.path.join(output_dir, "evolution.log")
# Try to parse generation files if they exist
if generation_files:
evolution_viz += "### Generation-by-Generation Progress\n\n"
for gen_file in generation_files:
gen_num = os.path.basename(gen_file).replace("generation_", "").replace(".txt", "")
try:
with open(gen_file, 'r') as f:
content = f.read()
evolution_viz += f"**Generation {gen_num}:**\n```\n{content[:200]}{'...' if len(content) > 200 else ''}\n```\n\n"
except:
pass
# Try to parse log file
elif os.path.exists(log_file):
evolution_viz += "### Evolution Log\n\n"
try:
with open(log_file, 'r') as f:
log_content = f.read()
evolution_viz += f"```\n{log_content[-1000:]}\n```\n\n"
except:
pass
# Look for scores or history file
scores_file = os.path.join(output_dir, "scores.json")
if os.path.exists(scores_file):
try:
with open(scores_file, 'r') as f:
scores = json.load(f)
evolution_viz += "### Score Progression\n\n"
evolution_viz += "| Generation | Best Score | Avg Score | Population |\n"
evolution_viz += "|------------|-----------|-----------|------------|\n"
for gen in scores:
evolution_viz += f"| {gen['generation']} | {gen['best']:.3f} | {gen['avg']:.3f} | {gen['population']} |\n"
evolution_viz += "\n"
except:
pass
# Look for all program variants
program_files = sorted(glob.glob(os.path.join(output_dir, "program_*.txt")))
if program_files:
evolution_viz += f"### Explored Variants\n\n"
evolution_viz += f"OpenEvolve explored {len(program_files)} different prompt variants during evolution.\n\n"
# Show a few intermediate prompts
if len(program_files) > 3:
sample_files = [program_files[0], program_files[len(program_files)//2], program_files[-2]]
evolution_viz += "**Sample Intermediate Prompts:**\n\n"
for idx, pfile in enumerate(sample_files, 1):
try:
with open(pfile, 'r') as f:
prompt_content = f.read()
evolution_viz += f"**Variant {idx}:**\n```\n{prompt_content[:150]}{'...' if len(prompt_content) > 150 else ''}\n```\n\n"
except:
pass
# If no specific files found, show directory contents
if not generation_files and not os.path.exists(log_file) and not os.path.exists(scores_file):
evolution_viz += "### Evolution Complete\n\n"
evolution_viz += "OpenEvolve ran 10 iterations of evolutionary optimization using:\n"
evolution_viz += "- **Population Size**: 10 prompts per generation\n"
evolution_viz += "- **Selection Strategy**: 10% elite, 30% explore, 60% exploit\n"
evolution_viz += "- **Islands**: 1 population with mutation and crossover\n"
evolution_viz += "- **Evaluation**: 100 samples per prompt variant\n\n"
# Count files in output directory
all_files = os.listdir(output_dir)
evolution_viz += f"Generated {len(all_files)} files during evolution process.\n\n"
return evolution_viz
except Exception as e:
return f"## 🧬 Evolution Progress\n\nEvolution completed successfully. Unable to parse detailed history: {str(e)}\n\n"
def create_evaluator_file(dataset_name: str, split: str, model: str,
input_field: str, target_field: str, work_dir: str):
"""Create an evaluator.py file for OpenEvolve with staged/cascading evaluation."""
evaluator_code = f'''
import os
import random
from datasets import load_dataset
from openai import OpenAI
def evaluate(prompt: str) -> dict:
"""
Evaluate a prompt using 2-stage cascading evaluation to save API calls.
Stage 1: Evaluate with 20 samples
- If accuracy >= 0.5, proceed to Stage 2
- If accuracy < 0.5, return early (no point wasting 80 more samples)
Stage 2: Evaluate with 80 more samples (total 100)
- Combine results for final score
Returns dict with combined_score (0-1), accuracy, correct, and total.
"""
try:
# Load dataset
dataset = load_dataset("{dataset_name}", split="{split}", streaming=False)
# Initialize OpenAI client
api_key = os.environ.get("OPENAI_API_KEY")
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
def evaluate_samples(samples, correct_so_far=0, total_so_far=0):
"""Helper function to evaluate a batch of samples."""
correct = correct_so_far
total = total_so_far
for sample in samples:
try:
# Get input and target
input_text = sample.get("{input_field}", "")
if isinstance(input_text, dict):
input_text = str(input_text)
target = sample.get("{target_field}", "")
if isinstance(target, dict):
target = str(target)
# Format the prompt
formatted_prompt = prompt.replace("{{input}}", str(input_text))
# Call the model
response = client.chat.completions.create(
model="{model}",
messages=[
{{"role": "system", "content": "You are a helpful assistant."}},
{{"role": "user", "content": formatted_prompt}}
],
temperature=0.1,
max_tokens=500,
)
prediction = response.choices[0].message.content.strip()
# Smart evaluation - handle both exact match and semantic match
target_str = str(target).lower().strip()
pred_lower = prediction.lower()
# Check exact match first
is_correct = target_str in pred_lower
# If not exact match, check for semantic equivalents (e.g., "1" = "positive")
if not is_correct:
# Common sentiment mappings
if target_str in ["1", "positive", "pos"]:
is_correct = any(word in pred_lower for word in ["positive", "good", "great"])
elif target_str in ["0", "negative", "neg"]:
is_correct = any(word in pred_lower for word in ["negative", "bad", "poor"])
if is_correct:
correct += 1
total += 1
except Exception as e:
print(f"Error evaluating sample: {{e}}")
continue
return correct, total
# STAGE 1: Evaluate with 20 samples first
stage1_size = 20
stage1_samples_count = min(stage1_size, len(dataset))
if len(dataset) > stage1_samples_count:
stage1_indices = random.sample(range(len(dataset)), stage1_samples_count)
stage1_samples = [dataset[i] for i in stage1_indices]
else:
stage1_samples = list(dataset)[:stage1_samples_count]
print(f"[Stage 1/2] Evaluating with {{len(stage1_samples)}} samples...")
correct, total = evaluate_samples(stage1_samples)
stage1_score = (correct / total) if total > 0 else 0.0
print(f"[Stage 1/2] Score: {{stage1_score:.3f}} ({{correct}}/{{total}})")
# Early exit if Stage 1 score is below threshold
if stage1_score < 0.5:
print(f"[Stage 1/2] Score below 0.5 threshold - skipping Stage 2 (saved 80 API calls)")
return {{
"combined_score": stage1_score,
"accuracy": stage1_score,
"correct": correct,
"total": total,
"stage": "stage1_early_exit"
}}
# STAGE 2: Continue with 80 more samples
print(f"[Stage 2/2] Score >= 0.5 - proceeding with 80 more samples...")
stage2_size = 80
stage2_samples_count = min(stage2_size, max(0, len(dataset) - stage1_samples_count))
if stage2_samples_count > 0:
# Get different samples from Stage 1
remaining_indices = list(set(range(len(dataset))) - set(stage1_indices if 'stage1_indices' in locals() else []))
if len(remaining_indices) >= stage2_samples_count:
stage2_indices = random.sample(remaining_indices, stage2_samples_count)
stage2_samples = [dataset[i] for i in stage2_indices]
else:
stage2_samples = [dataset[i] for i in remaining_indices[:stage2_samples_count]]
correct, total = evaluate_samples(stage2_samples, correct, total)
final_score = (correct / total) if total > 0 else stage1_score
print(f"[Stage 2/2] Final score: {{final_score:.3f}} ({{correct}}/{{total}})")
return {{
"combined_score": final_score,
"accuracy": final_score,
"correct": correct,
"total": total,
"stage": "stage2_complete"
}}
else:
print(f"[Stage 2/2] Not enough samples in dataset for Stage 2")
return {{
"combined_score": stage1_score,
"accuracy": stage1_score,
"correct": correct,
"total": total,
"stage": "stage1_complete"
}}
except Exception as e:
print(f"Error in evaluation: {{e}}")
return {{
"combined_score": 0.0,
"accuracy": 0.0,
"correct": 0,
"total": 0,
"error": str(e)
}}
'''
evaluator_path = os.path.join(work_dir, "evaluator.py")
with open(evaluator_path, "w") as f:
f.write(evaluator_code)
return evaluator_path
def create_config_file(model: str, work_dir: str):
"""Create a config.yaml file for OpenEvolve."""
config = {
"llm": {
"primary_model": model,
"api_base": "https://openrouter.ai/api/v1", # Use OpenRouter endpoint
"temperature": 0.7,
},
"max_iterations": 10,
"evolution": {
"population_size": 10,
"num_islands": 1,
"elite_ratio": 0.1,
"explore_ratio": 0.3,
"exploit_ratio": 0.6,
},
"evaluator": {
"timeout": None, # Disable timeout to avoid signal handling issues
"cascade_evaluation": False, # Disable cascade to prevent signal errors
"parallel_evaluations": 1, # Single worker
"distributed": False, # No distributed processing
}
}
config_path = os.path.join(work_dir, "config.yaml")
with open(config_path, "w") as f:
yaml.dump(config, f)
return config_path
def optimize_prompt(initial_prompt: str, dataset_name: str, dataset_split: str,
model: str, input_field: str, target_field: str,
progress=gr.Progress()) -> Tuple[str, str, str, str, List[str], int, int]:
"""Run OpenEvolve to optimize the prompt."""
progress(0, desc="Validating inputs...")
# Validate all inputs
is_valid, validation_message = validate_inputs(
dataset_name, dataset_split, input_field, target_field, initial_prompt
)
if not is_valid:
return f"## Validation Failed\n\n{validation_message}", "", "", "", [], 0, 0
progress(0.05, desc=f"Validation passed: {validation_message}")
# Create temporary working directory
work_dir = tempfile.mkdtemp(prefix="openevolve_")
try:
# Save initial prompt
initial_prompt_path = os.path.join(work_dir, "initial_prompt.txt")
with open(initial_prompt_path, "w") as f:
f.write(initial_prompt)
# Create evaluator
progress(0.1, desc="Creating evaluator...")
evaluator_path = create_evaluator_file(dataset_name, dataset_split, model,
input_field, target_field, work_dir)
# Create config
progress(0.15, desc="Creating configuration...")
config_path = create_config_file(model, work_dir)
# Run initial evaluation (using 20 samples to save API calls)
progress(0.2, desc="Running initial evaluation on 20 samples...")
initial_eval = evaluate_prompt(
initial_prompt, dataset_name, dataset_split, 20,
model, input_field, target_field
)
if "error" in initial_eval:
return f"## Error\n\n❌ Initial evaluation failed: {initial_eval['error']}", "", "", "", [initial_prompt], 0, 1
if initial_eval["total"] == 0:
return f"## Error\n\n❌ Initial evaluation failed: No samples could be evaluated. This usually means:\n- API key is invalid or has no credits\n- Model is unavailable or rate-limited\n- Dataset fields are incorrect\n- Network connectivity issues\n\nPlease check your configuration and try again.", "", "", "", [initial_prompt], 0, 1
initial_results = f"""
### Initial Prompt Evaluation
**Prompt:**
```
{initial_prompt}
```
**Results:**
- Accuracy: {initial_eval['accuracy']:.2f}%
- Correct: {initial_eval['correct']}/{initial_eval['total']}
**Sample Results:**
"""
for i, result in enumerate(initial_eval['results'][:5], 1):
initial_results += f"\n{i}. Input: {result['input']}\n"
initial_results += f" Target: {result['target']}\n"
initial_results += f" Prediction: {result['prediction']}\n"
initial_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n"
# Run OpenEvolve
progress(0.3, desc="Starting OpenEvolve optimization (10 iterations with staged evaluation)...")
output_dir = os.path.join(work_dir, "output")
os.makedirs(output_dir, exist_ok=True)
try:
# Run evolution
# Note: OpenEvolve may show "Initialized process parallel controller" warnings
# but they are harmless in this single-worker configuration
result = run_evolution(
initial_program=initial_prompt_path,
evaluator=evaluator_path,
config=config_path,
output_dir=output_dir
)
progress(0.80, desc="Parsing evolution history...")
# Parse evolution history for visualization
evolution_viz = parse_evolution_history(output_dir)
progress(0.85, desc="Evaluating best evolved prompt on 20 samples...")
# Get the best prompt
best_prompt_path = os.path.join(output_dir, "best_program.txt")
if os.path.exists(best_prompt_path):
with open(best_prompt_path, "r") as f:
best_prompt = f.read()
else:
best_prompt = initial_prompt
# Evaluate best prompt (using 20 samples like initial eval for consistency)
final_eval = evaluate_prompt(
best_prompt, dataset_name, dataset_split, 20,
model, input_field, target_field
)
final_results = f"""
### Evolved Prompt Evaluation
**Prompt:**
```
{best_prompt}
```
**Results:**
- Accuracy: {final_eval['accuracy']:.2f}%
- Correct: {final_eval['correct']}/{final_eval['total']}
- Improvement: {final_eval['accuracy'] - initial_eval['accuracy']:+.2f}%
**Sample Results:**
"""
for i, result in enumerate(final_eval['results'][:5], 1):
final_results += f"\n{i}. Input: {result['input']}\n"
final_results += f" Target: {result['target']}\n"
final_results += f" Prediction: {result['prediction']}\n"
final_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n"
summary = f"""
## 🎉 Optimization Complete!
### Summary
- **Dataset**: {dataset_name} ({dataset_split} split)
- **Model**: {model}
- **Initial/Final Eval**: 20 samples each
- **Evolution Eval**: Staged (20 → 100 if score ≥ 0.5)
- **Iterations**: 10
### Results
- **Initial Accuracy**: {initial_eval['accuracy']:.2f}%
- **Final Accuracy**: {final_eval['accuracy']:.2f}%
- **Improvement**: {final_eval['accuracy'] - initial_eval['accuracy']:+.2f}%
{validation_message}
"""
progress(1.0, desc="Complete!")
# Collect all discovered prompts for browsing
all_prompts = [initial_prompt] # Start with initial prompt
prompt_history = collect_prompt_history(output_dir)
for p in prompt_history:
all_prompts.append(p["prompt"])
# Ensure we have the best prompt at the end
if best_prompt not in all_prompts:
all_prompts.append(best_prompt)
return summary, initial_results, evolution_viz, final_results, all_prompts, 0, len(all_prompts)
except Exception as e:
return f"## Error During Evolution\n\n❌ {str(e)}", initial_results, "", "", [initial_prompt], 0, 1
finally:
# Don't clean up - keep prompts for browsing
# User can manually clean /tmp if needed
pass
# Create Gradio interface
with gr.Blocks(title="OpenEvolve Prompt Optimizer", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🧬 OpenEvolve Prompt Optimizer
Automatically evolve and optimize your prompts using evolutionary algorithms!
This space uses [OpenEvolve](https://github.com/algorithmicsuperintelligence/openevolve) to iteratively improve prompts
by testing them on real datasets and evolving better versions.
## How it works:
1. Enter an initial prompt (use `{input}` as a placeholder for dataset inputs)
2. Enter the full HuggingFace dataset name (e.g., `stanfordnlp/imdb`, `gsm8k`)
3. Specify the dataset split and field names
4. Choose a free model from OpenRouter
5. Click "Optimize Prompt" - the system will validate everything first!
6. Watch the evolution progress in real-time
7. Compare initial vs. evolved performance!
**Note**: API key is read from `OPENAI_API_KEY` environment variable (set in Space secrets)
""")
with gr.Row():
with gr.Column():
gr.Markdown("### Configuration")
model = gr.Dropdown(
choices=FREE_MODELS,
value=FREE_MODELS[0],
label="Select Model",
info="Choose from 5 curated free models on OpenRouter (24B to 671B parameters)"
)
dataset_name = gr.Textbox(
label="HuggingFace Dataset (Full Name)",
value="stanfordnlp/imdb",
placeholder="e.g., stanfordnlp/imdb, openai/gsm8k, SetFit/sst5",
info="Full dataset name from HuggingFace Hub (org/dataset-name or dataset-name)"
)
dataset_split = gr.Textbox(
label="Dataset Split",
value="test",
placeholder="e.g., train, test, validation"
)
input_field = gr.Textbox(
label="Input Field Name",
value="text",
placeholder="e.g., text, question, context",
info="The field containing inputs to process"
)
target_field = gr.Textbox(
label="Target Field Name",
value="label",
placeholder="e.g., label, answer, target",
info="The field containing expected outputs"
)
initial_prompt = gr.TextArea(
label="Initial Prompt",
value="Analyze the sentiment of the following text and classify it as positive or negative:\n\n{input}\n\nClassification:",
lines=6,
info="Use {input} as placeholder for dataset inputs"
)
# Button outside the column for better visibility
with gr.Row():
with gr.Column():
optimize_btn = gr.Button("🚀 Validate & Optimize Prompt", variant="primary", size="lg")
# Results section - clearly separated
gr.Markdown("---")
gr.Markdown("## 📊 Results")
with gr.Row():
with gr.Column():
summary = gr.Markdown("Click 'Validate & Optimize Prompt' to start optimization...", visible=True)
with gr.Row():
with gr.Column():
initial_results = gr.Markdown("### Initial Results\nWill appear here after validation...", visible=True)
with gr.Column():
final_results = gr.Markdown("### Final Results\nWill appear here after optimization...", visible=True)
with gr.Row():
with gr.Column():
evolution_progress = gr.Markdown("### Evolution Progress\nEvolution progress will appear here during optimization...", visible=True)
# Prompt History Browser
gr.Markdown("---")
gr.Markdown("## 📜 Prompt History Browser")
gr.Markdown("Browse through all prompts discovered during evolution (initial → intermediate → final)")
with gr.Row():
with gr.Column(scale=8):
prompt_display = gr.TextArea(
label="",
lines=10,
interactive=False,
placeholder="Prompts will appear here after optimization completes...",
show_label=False
)
with gr.Column(scale=2):
prompt_counter = gr.Markdown("**Prompt**: -/-")
prev_btn = gr.Button("⬅️ Previous", size="sm")
next_btn = gr.Button("Next ➡️", size="sm")
gr.Markdown("**Prompt Types:**\n- First = Initial\n- Middle = Intermediate\n- Last = Final Best")
# Hidden state to store prompt history and current index
prompt_history_state = gr.State([])
current_prompt_index = gr.State(0)
# Documentation section - in collapsible accordion
gr.Markdown("---")
with gr.Accordion("📚 Documentation & Examples", open=False):
gr.Markdown("""
### Example Datasets & Fields:
| Dataset | Split | Input Field | Target Field | Task |
|---------|-------|-------------|--------------|------|
| stanfordnlp/imdb | test | text | label | Sentiment Analysis |
| rajpurkar/squad | validation | question | answers | Question Answering |
| dair-ai/emotion | test | text | label | Emotion Classification |
| openai/gsm8k | test | question | answer | Math Reasoning |
| fancyzhx/ag_news | test | text | label | News Classification |
### About This Demo Space:
**This is a demonstration space** showcasing OpenEvolve's prompt optimization capabilities.
The interface shows you how the system works, but **you'll need to set up your own instance to run optimizations**.
### How to Run This Yourself:
1. **Clone this Space**: Click "⋮" (three dots) at top-right → "Duplicate this Space"
2. **Set Environment Variables** in your cloned Space's settings:
- `OPENAI_API_KEY`: Your OpenRouter API key (get free key at [openrouter.ai/keys](https://openrouter.ai/keys))
- `HF_TOKEN`: (Optional) HuggingFace token for private datasets
3. **Configure Your Optimization**:
- Dataset: Use full name format (e.g., `stanfordnlp/imdb` or `openai/gsm8k`)
- Fields: Specify exact field names from the dataset schema
- Model: Choose from 5 curated free models (larger models = better results but slower/rate-limited)
4. **Run & Monitor**:
- All inputs are validated before starting
- Evolution uses staged evaluation (20 samples first, then 80 more if promising)
- Saves API calls by early-stopping poor prompts (< 50% accuracy)
- Watch evolution progress visualization in real-time
### About OpenEvolve:
OpenEvolve is an open-source evolutionary optimization framework. Learn more at:
- [GitHub Repository](https://github.com/algorithmicsuperintelligence/openevolve)
- [Documentation](https://github.com/algorithmicsuperintelligence/openevolve#readme)
""")
# Navigation functions for prompt browser
def show_previous_prompt(prompts, current_idx):
if not prompts or len(prompts) == 0:
return "", "**Prompt**: -/-", 0
new_idx = max(0, current_idx - 1)
counter_text = f"**Prompt**: {new_idx + 1}/{len(prompts)}"
if new_idx == 0:
counter_text += " (Initial)"
elif new_idx == len(prompts) - 1:
counter_text += " (Final Best)"
else:
counter_text += " (Intermediate)"
return prompts[new_idx], counter_text, new_idx
def show_next_prompt(prompts, current_idx):
if not prompts or len(prompts) == 0:
return "", "**Prompt**: -/-", 0
new_idx = min(len(prompts) - 1, current_idx + 1)
counter_text = f"**Prompt**: {new_idx + 1}/{len(prompts)}"
if new_idx == 0:
counter_text += " (Initial)"
elif new_idx == len(prompts) - 1:
counter_text += " (Final Best)"
else:
counter_text += " (Intermediate)"
return prompts[new_idx], counter_text, new_idx
def update_prompt_display(prompts, idx, total):
if not prompts or len(prompts) == 0:
return "", "**Prompt**: -/-"
idx = min(idx, len(prompts) - 1)
counter_text = f"**Prompt**: {idx + 1}/{len(prompts)}"
if idx == 0:
counter_text += " (Initial)"
elif idx == len(prompts) - 1:
counter_text += " (Final Best)"
else:
counter_text += " (Intermediate)"
return prompts[idx], counter_text
# Wire up the optimize button
optimize_result = optimize_btn.click(
fn=optimize_prompt,
inputs=[initial_prompt, dataset_name, dataset_split, model,
input_field, target_field],
outputs=[summary, initial_results, evolution_progress, final_results,
prompt_history_state, current_prompt_index, gr.State()] # dummy for total
)
# Update prompt display when optimization completes
optimize_result.then(
fn=update_prompt_display,
inputs=[prompt_history_state, current_prompt_index, gr.State()],
outputs=[prompt_display, prompt_counter]
)
# Wire up navigation buttons
prev_btn.click(
fn=show_previous_prompt,
inputs=[prompt_history_state, current_prompt_index],
outputs=[prompt_display, prompt_counter, current_prompt_index]
)
next_btn.click(
fn=show_next_prompt,
inputs=[prompt_history_state, current_prompt_index],
outputs=[prompt_display, prompt_counter, current_prompt_index]
)
if __name__ == "__main__":
demo.launch()