|
|
import gradio as gr |
|
|
import os |
|
|
import yaml |
|
|
import json |
|
|
import random |
|
|
from datasets import load_dataset |
|
|
from openai import OpenAI |
|
|
from openevolve import run_evolution |
|
|
from typing import Dict, List, Tuple |
|
|
import tempfile |
|
|
import shutil |
|
|
|
|
|
|
|
|
FREE_MODELS = [ |
|
|
"google/gemini-2.0-flash-001:free", |
|
|
"google/gemini-flash-1.5-8b:free", |
|
|
"meta-llama/llama-3.2-3b-instruct:free", |
|
|
"meta-llama/llama-3.2-1b-instruct:free", |
|
|
"microsoft/phi-3-mini-128k-instruct:free", |
|
|
"microsoft/phi-3-medium-128k-instruct:free", |
|
|
"qwen/qwen-2-7b-instruct:free", |
|
|
"mistralai/mistral-7b-instruct:free", |
|
|
] |
|
|
|
|
|
|
|
|
SAMPLE_DATASETS = { |
|
|
"Question Answering": [ |
|
|
"hotpot_qa", |
|
|
"squad", |
|
|
"trivia_qa", |
|
|
], |
|
|
"Sentiment Analysis": [ |
|
|
"imdb", |
|
|
"yelp_review_full", |
|
|
"emotion", |
|
|
], |
|
|
"Text Classification": [ |
|
|
"ag_news", |
|
|
"dbpedia_14", |
|
|
"SetFit/sst5", |
|
|
], |
|
|
"Math Reasoning": [ |
|
|
"gsm8k", |
|
|
"math_qa", |
|
|
], |
|
|
} |
|
|
|
|
|
|
|
|
def evaluate_prompt(prompt: str, dataset_name: str, split: str, num_samples: int, |
|
|
api_key: str, model: str, input_field: str, target_field: str) -> Dict: |
|
|
"""Evaluate a prompt on a dataset using the selected model.""" |
|
|
try: |
|
|
|
|
|
dataset = load_dataset(dataset_name, split=split, streaming=False) |
|
|
|
|
|
|
|
|
if len(dataset) > num_samples: |
|
|
indices = random.sample(range(len(dataset)), num_samples) |
|
|
samples = [dataset[i] for i in indices] |
|
|
else: |
|
|
samples = list(dataset)[:num_samples] |
|
|
|
|
|
|
|
|
client = OpenAI( |
|
|
base_url="https://openrouter.ai/api/v1", |
|
|
api_key=api_key, |
|
|
) |
|
|
|
|
|
correct = 0 |
|
|
total = 0 |
|
|
results = [] |
|
|
|
|
|
for sample in samples: |
|
|
try: |
|
|
|
|
|
input_text = sample.get(input_field, "") |
|
|
if isinstance(input_text, dict): |
|
|
input_text = str(input_text) |
|
|
|
|
|
target = sample.get(target_field, "") |
|
|
if isinstance(target, dict): |
|
|
target = str(target) |
|
|
|
|
|
|
|
|
formatted_prompt = prompt.replace("{input}", str(input_text)) |
|
|
|
|
|
|
|
|
response = client.chat.completions.create( |
|
|
model=model, |
|
|
messages=[ |
|
|
{"role": "system", "content": "You are a helpful assistant."}, |
|
|
{"role": "user", "content": formatted_prompt} |
|
|
], |
|
|
temperature=0.1, |
|
|
max_tokens=500, |
|
|
) |
|
|
|
|
|
prediction = response.choices[0].message.content.strip() |
|
|
|
|
|
|
|
|
is_correct = str(target).lower().strip() in prediction.lower() |
|
|
if is_correct: |
|
|
correct += 1 |
|
|
total += 1 |
|
|
|
|
|
results.append({ |
|
|
"input": str(input_text)[:100] + "...", |
|
|
"target": str(target), |
|
|
"prediction": prediction[:100] + "...", |
|
|
"correct": is_correct |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error evaluating sample: {e}") |
|
|
continue |
|
|
|
|
|
accuracy = (correct / total * 100) if total > 0 else 0 |
|
|
|
|
|
return { |
|
|
"accuracy": accuracy, |
|
|
"correct": correct, |
|
|
"total": total, |
|
|
"results": results |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"error": str(e), |
|
|
"accuracy": 0, |
|
|
"correct": 0, |
|
|
"total": 0, |
|
|
"results": [] |
|
|
} |
|
|
|
|
|
|
|
|
def create_evaluator_file(dataset_name: str, split: str, model: str, |
|
|
input_field: str, target_field: str, work_dir: str): |
|
|
"""Create an evaluator.py file for OpenEvolve.""" |
|
|
evaluator_code = f''' |
|
|
import os |
|
|
import random |
|
|
from datasets import load_dataset |
|
|
from openai import OpenAI |
|
|
|
|
|
def evaluate(prompt: str) -> float: |
|
|
"""Evaluate a prompt and return a score between 0 and 1.""" |
|
|
try: |
|
|
# Load dataset |
|
|
dataset = load_dataset("{dataset_name}", split="{split}", streaming=False) |
|
|
|
|
|
# Sample 100 random examples |
|
|
num_samples = min(100, len(dataset)) |
|
|
if len(dataset) > num_samples: |
|
|
indices = random.sample(range(len(dataset)), num_samples) |
|
|
samples = [dataset[i] for i in indices] |
|
|
else: |
|
|
samples = list(dataset)[:num_samples] |
|
|
|
|
|
# Initialize OpenAI client |
|
|
api_key = os.environ.get("OPENAI_API_KEY") |
|
|
client = OpenAI( |
|
|
base_url="https://openrouter.ai/api/v1", |
|
|
api_key=api_key, |
|
|
) |
|
|
|
|
|
correct = 0 |
|
|
total = 0 |
|
|
|
|
|
for sample in samples: |
|
|
try: |
|
|
# Get input and target |
|
|
input_text = sample.get("{input_field}", "") |
|
|
if isinstance(input_text, dict): |
|
|
input_text = str(input_text) |
|
|
|
|
|
target = sample.get("{target_field}", "") |
|
|
if isinstance(target, dict): |
|
|
target = str(target) |
|
|
|
|
|
# Format the prompt |
|
|
formatted_prompt = prompt.replace("{{input}}", str(input_text)) |
|
|
|
|
|
# Call the model |
|
|
response = client.chat.completions.create( |
|
|
model="{model}", |
|
|
messages=[ |
|
|
{{"role": "system", "content": "You are a helpful assistant."}}, |
|
|
{{"role": "user", "content": formatted_prompt}} |
|
|
], |
|
|
temperature=0.1, |
|
|
max_tokens=500, |
|
|
) |
|
|
|
|
|
prediction = response.choices[0].message.content.strip() |
|
|
|
|
|
# Simple evaluation |
|
|
is_correct = str(target).lower().strip() in prediction.lower() |
|
|
if is_correct: |
|
|
correct += 1 |
|
|
total += 1 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error evaluating sample: {{e}}") |
|
|
continue |
|
|
|
|
|
# Return score between 0 and 1 |
|
|
return (correct / total) if total > 0 else 0.0 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error in evaluation: {{e}}") |
|
|
return 0.0 |
|
|
''' |
|
|
|
|
|
evaluator_path = os.path.join(work_dir, "evaluator.py") |
|
|
with open(evaluator_path, "w") as f: |
|
|
f.write(evaluator_code) |
|
|
|
|
|
return evaluator_path |
|
|
|
|
|
|
|
|
def create_config_file(model: str, work_dir: str): |
|
|
"""Create a config.yaml file for OpenEvolve.""" |
|
|
config = { |
|
|
"llm": { |
|
|
"api_base": "https://openrouter.ai/api/v1", |
|
|
"model": model, |
|
|
"temperature": 0.7, |
|
|
"max_tokens": 4096, |
|
|
}, |
|
|
"evolution": { |
|
|
"max_iterations": 10, |
|
|
"population_size": 10, |
|
|
"num_islands": 1, |
|
|
"elite_ratio": 0.1, |
|
|
"explore_ratio": 0.3, |
|
|
"exploit_ratio": 0.6, |
|
|
}, |
|
|
"evaluation": { |
|
|
"timeout": 1800, |
|
|
} |
|
|
} |
|
|
|
|
|
config_path = os.path.join(work_dir, "config.yaml") |
|
|
with open(config_path, "w") as f: |
|
|
yaml.dump(config, f) |
|
|
|
|
|
return config_path |
|
|
|
|
|
|
|
|
def optimize_prompt(initial_prompt: str, dataset_name: str, dataset_split: str, |
|
|
model: str, api_key: str, input_field: str, target_field: str, |
|
|
progress=gr.Progress()) -> Tuple[str, str, str]: |
|
|
"""Run OpenEvolve to optimize the prompt.""" |
|
|
|
|
|
if not api_key: |
|
|
return "Error: OpenAI API Key is required", "", "" |
|
|
|
|
|
|
|
|
os.environ["OPENAI_API_KEY"] = api_key |
|
|
|
|
|
progress(0, desc="Setting up...") |
|
|
|
|
|
|
|
|
work_dir = tempfile.mkdtemp(prefix="openevolve_") |
|
|
|
|
|
try: |
|
|
|
|
|
initial_prompt_path = os.path.join(work_dir, "initial_prompt.txt") |
|
|
with open(initial_prompt_path, "w") as f: |
|
|
f.write(initial_prompt) |
|
|
|
|
|
|
|
|
progress(0.1, desc="Creating evaluator...") |
|
|
evaluator_path = create_evaluator_file(dataset_name, dataset_split, model, |
|
|
input_field, target_field, work_dir) |
|
|
|
|
|
|
|
|
progress(0.2, desc="Creating configuration...") |
|
|
config_path = create_config_file(model, work_dir) |
|
|
|
|
|
|
|
|
progress(0.3, desc="Running initial evaluation...") |
|
|
initial_eval = evaluate_prompt( |
|
|
initial_prompt, dataset_name, dataset_split, 100, |
|
|
api_key, model, input_field, target_field |
|
|
) |
|
|
|
|
|
initial_results = f""" |
|
|
### Initial Prompt Evaluation |
|
|
|
|
|
**Prompt:** |
|
|
``` |
|
|
{initial_prompt} |
|
|
``` |
|
|
|
|
|
**Results:** |
|
|
- Accuracy: {initial_eval['accuracy']:.2f}% |
|
|
- Correct: {initial_eval['correct']}/{initial_eval['total']} |
|
|
|
|
|
**Sample Results:** |
|
|
""" |
|
|
for i, result in enumerate(initial_eval['results'][:5], 1): |
|
|
initial_results += f"\n{i}. Input: {result['input']}\n" |
|
|
initial_results += f" Target: {result['target']}\n" |
|
|
initial_results += f" Prediction: {result['prediction']}\n" |
|
|
initial_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" |
|
|
|
|
|
|
|
|
progress(0.4, desc="Running OpenEvolve (this may take several minutes)...") |
|
|
|
|
|
output_dir = os.path.join(work_dir, "output") |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
try: |
|
|
|
|
|
result = run_evolution( |
|
|
initial_program_path=initial_prompt_path, |
|
|
evaluator_path=evaluator_path, |
|
|
config_path=config_path, |
|
|
output_dir=output_dir, |
|
|
verbose=True |
|
|
) |
|
|
|
|
|
progress(0.8, desc="Evaluating best prompt...") |
|
|
|
|
|
|
|
|
best_prompt_path = os.path.join(output_dir, "best_program.txt") |
|
|
if os.path.exists(best_prompt_path): |
|
|
with open(best_prompt_path, "r") as f: |
|
|
best_prompt = f.read() |
|
|
else: |
|
|
best_prompt = initial_prompt |
|
|
|
|
|
|
|
|
final_eval = evaluate_prompt( |
|
|
best_prompt, dataset_name, dataset_split, 100, |
|
|
api_key, model, input_field, target_field |
|
|
) |
|
|
|
|
|
final_results = f""" |
|
|
### Evolved Prompt Evaluation |
|
|
|
|
|
**Prompt:** |
|
|
``` |
|
|
{best_prompt} |
|
|
``` |
|
|
|
|
|
**Results:** |
|
|
- Accuracy: {final_eval['accuracy']:.2f}% |
|
|
- Correct: {final_eval['correct']}/{final_eval['total']} |
|
|
- Improvement: {final_eval['accuracy'] - initial_eval['accuracy']:.2f}% |
|
|
|
|
|
**Sample Results:** |
|
|
""" |
|
|
for i, result in enumerate(final_eval['results'][:5], 1): |
|
|
final_results += f"\n{i}. Input: {result['input']}\n" |
|
|
final_results += f" Target: {result['target']}\n" |
|
|
final_results += f" Prediction: {result['prediction']}\n" |
|
|
final_results += f" ✓ Correct\n" if result['correct'] else f" ✗ Incorrect\n" |
|
|
|
|
|
summary = f""" |
|
|
## Optimization Complete! |
|
|
|
|
|
### Summary |
|
|
- Initial Accuracy: {initial_eval['accuracy']:.2f}% |
|
|
- Final Accuracy: {final_eval['accuracy']:.2f}% |
|
|
- Improvement: {final_eval['accuracy'] - initial_eval['accuracy']:.2f}% |
|
|
- Dataset: {dataset_name} |
|
|
- Model: {model} |
|
|
- Samples Evaluated: 100 |
|
|
- Iterations: 10 |
|
|
""" |
|
|
|
|
|
progress(1.0, desc="Complete!") |
|
|
|
|
|
return summary, initial_results, final_results |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error during evolution: {str(e)}", initial_results, "" |
|
|
|
|
|
finally: |
|
|
|
|
|
try: |
|
|
shutil.rmtree(work_dir) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="OpenEvolve Prompt Optimizer", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown(""" |
|
|
# 🧬 OpenEvolve Prompt Optimizer |
|
|
|
|
|
Automatically evolve and optimize your prompts using evolutionary algorithms! |
|
|
|
|
|
This space uses [OpenEvolve](https://github.com/codelion/openevolve) to iteratively improve prompts |
|
|
by testing them on real datasets and evolving better versions. |
|
|
|
|
|
## How it works: |
|
|
1. Enter an initial prompt (use `{input}` as a placeholder for dataset inputs) |
|
|
2. Select a HuggingFace dataset to test on |
|
|
3. Choose a free model from OpenRouter |
|
|
4. Click "Optimize Prompt" to evolve better versions |
|
|
5. Compare initial vs. evolved performance! |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### Configuration") |
|
|
|
|
|
api_key = gr.Textbox( |
|
|
label="OpenAI API Key (for OpenRouter)", |
|
|
type="password", |
|
|
placeholder="sk-or-v1-...", |
|
|
info="Get your free key at https://openrouter.ai/keys" |
|
|
) |
|
|
|
|
|
model = gr.Dropdown( |
|
|
choices=FREE_MODELS, |
|
|
value=FREE_MODELS[0], |
|
|
label="Select Model", |
|
|
info="Free models available on OpenRouter" |
|
|
) |
|
|
|
|
|
dataset_name = gr.Textbox( |
|
|
label="HuggingFace Dataset", |
|
|
value="imdb", |
|
|
placeholder="e.g., imdb, hotpot_qa, gsm8k", |
|
|
info="Any dataset from HuggingFace Hub" |
|
|
) |
|
|
|
|
|
dataset_split = gr.Textbox( |
|
|
label="Dataset Split", |
|
|
value="test", |
|
|
placeholder="e.g., train, test, validation" |
|
|
) |
|
|
|
|
|
input_field = gr.Textbox( |
|
|
label="Input Field Name", |
|
|
value="text", |
|
|
placeholder="e.g., text, question, context", |
|
|
info="The field containing inputs to process" |
|
|
) |
|
|
|
|
|
target_field = gr.Textbox( |
|
|
label="Target Field Name", |
|
|
value="label", |
|
|
placeholder="e.g., label, answer, target", |
|
|
info="The field containing expected outputs" |
|
|
) |
|
|
|
|
|
initial_prompt = gr.TextArea( |
|
|
label="Initial Prompt", |
|
|
value="Analyze the sentiment of the following text and classify it as positive or negative:\n\n{input}\n\nClassification:", |
|
|
lines=6, |
|
|
info="Use {input} as placeholder for dataset inputs" |
|
|
) |
|
|
|
|
|
optimize_btn = gr.Button("🚀 Optimize Prompt", variant="primary", size="lg") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
summary = gr.Markdown(label="Summary") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
initial_results = gr.Markdown(label="Initial Results") |
|
|
with gr.Column(): |
|
|
final_results = gr.Markdown(label="Evolved Results") |
|
|
|
|
|
gr.Markdown(""" |
|
|
### Example Datasets & Fields: |
|
|
|
|
|
| Dataset | Split | Input Field | Target Field | Task | |
|
|
|---------|-------|-------------|--------------|------| |
|
|
| imdb | test | text | label | Sentiment Analysis | |
|
|
| hotpot_qa | validation | question | answer | Question Answering | |
|
|
| emotion | test | text | label | Emotion Classification | |
|
|
| gsm8k | test | question | answer | Math Reasoning | |
|
|
| ag_news | test | text | label | News Classification | |
|
|
|
|
|
### Notes: |
|
|
- Evolution runs for 10 iterations with 1 island |
|
|
- Each evaluation uses 100 random samples from the dataset |
|
|
- The process may take 5-15 minutes depending on the dataset and model |
|
|
- Make sure your API key has sufficient credits for the requests |
|
|
""") |
|
|
|
|
|
optimize_btn.click( |
|
|
fn=optimize_prompt, |
|
|
inputs=[initial_prompt, dataset_name, dataset_split, model, api_key, |
|
|
input_field, target_field], |
|
|
outputs=[summary, initial_results, final_results] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|