ladybug11's picture
fix error
5b510b1
raw
history blame
23.5 kB
import os
import time
import json
import shutil
import random
import tempfile
import requests
import gradio as gr
from openai import OpenAI
from smolagents import CodeAgent, MCPClient, tool
from huggingface_hub import InferenceClient
from quote_generator_gemini import HybridQuoteGenerator
# -------------------------------------------------
# GLOBAL CLIENTS & CONFIG
# -------------------------------------------------
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY")
# Hybrid Gemini + OpenAI quote generator
hybrid_quote_generator = HybridQuoteGenerator(
gemini_key=os.getenv("GEMINI_API_KEY"),
openai_client=openai_client,
)
# Optional MCP client (non-fatal if not installed)
try:
mcp_client = MCPClient("https://abidlabs-mcp-tools.hf.space")
mcp_enabled = True
except Exception as e:
print(f"MCP initialization warning: {e}")
mcp_enabled = False
# -------------------------------------------------
# TOOLS
# -------------------------------------------------
@tool
def generate_quote_tool(niche: str, style: str) -> str:
"""
Generate a unique inspirational quote using the HybridQuoteGenerator.
Args:
niche: The category of the quote (e.g. Motivation, Fitness, Mindfulness).
style: The visual style or aesthetic (e.g. Cinematic, Nature, Urban).
Returns:
A single quote string. If an error occurs, returns a human-readable error message.
"""
try:
result = hybrid_quote_generator.generate_quote(
niche=niche,
style=style,
prefer_gemini=True,
)
if result.get("success"):
quote = result["quote"]
source = result.get("source")
if source == "gemini":
stats = result.get("stats", {})
print(
f"✨ Generated with Gemini. Total quotes: "
f"{stats.get('total_quotes_generated', 0)}"
)
else:
print("✨ Generated with OpenAI fallback")
return quote
else:
return f"Error generating quote: {result.get('error', 'Unknown error')}"
except Exception as e:
return f"Error generating quote: {str(e)}"
@tool
def search_pexels_video_tool(style: str, niche: str) -> dict:
"""
Search and fetch a portrait video from Pexels that matches a style and niche.
Args:
style: Visual style (e.g. Cinematic, Nature, Urban, Minimal, Abstract).
niche: Content niche (e.g. Motivation, Business/Entrepreneurship, Fitness).
Returns:
A dictionary with:
- success: Whether a suitable video was found.
- video_url: The direct link to the video file (or None).
- search_query: The query used to search.
- pexels_url: The Pexels page URL (or None).
- error: Optional error message on failure.
"""
search_strategies = {
"Motivation": {
"Cinematic": ["person climbing mountain", "running sunrise", "achievement success"],
"Nature": ["sunrise mountain peak", "ocean waves powerful", "forest light"],
"Urban": ["city skyline dawn", "person running city", "urban success"],
"Minimal": ["minimal motivation", "single person silhouette", "clean inspiring"],
"Abstract": ["light rays hope", "particles rising", "abstract energy"],
},
"Business/Entrepreneurship": {
"Cinematic": ["business cityscape", "office modern", "handshake deal"],
"Nature": ["growth plant", "river flowing", "sunrise new beginning"],
"Urban": ["city business", "office skyline", "modern workspace"],
"Minimal": ["desk minimal", "workspace clean", "simple office"],
"Abstract": ["network connections", "growth chart", "abstract progress"],
},
"Fitness": {
"Cinematic": ["athlete training", "gym workout", "running outdoor"],
"Nature": ["outdoor workout", "mountain hiking", "beach running"],
"Urban": ["city running", "urban fitness", "street workout"],
"Minimal": ["gym minimal", "simple workout", "clean fitness"],
"Abstract": ["energy motion", "strength power", "dynamic movement"],
},
"Mindfulness": {
"Cinematic": ["meditation sunset", "peaceful landscape", "calm water"],
"Nature": ["forest peaceful", "calm lake", "zen garden"],
"Urban": ["city peaceful morning", "quiet street", "urban calm"],
"Minimal": ["minimal zen", "simple meditation", "clean peaceful"],
"Abstract": ["calm waves", "gentle motion", "soft particles"],
},
"Stoicism": {
"Cinematic": ["ancient architecture", "statue philosopher", "timeless landscape"],
"Nature": ["mountain strong", "oak tree", "stone nature"],
"Urban": ["classical building", "statue city", "ancient modern"],
"Minimal": ["stone minimal", "simple strong", "pillar minimal"],
"Abstract": ["marble texture", "stone abstract", "timeless pattern"],
},
"Leadership": {
"Cinematic": ["team meeting", "leader speaking", "group collaboration"],
"Nature": ["eagle flying", "lion pride", "mountain top"],
"Urban": ["office leadership", "boardroom", "city leadership"],
"Minimal": ["chess pieces", "simple leadership", "clean professional"],
"Abstract": ["network leader", "connection points", "guiding light"],
},
"Love & Relationships": {
"Cinematic": ["couple sunset", "romance beautiful", "love cinematic"],
"Nature": ["couple nature", "romantic sunset", "peaceful together"],
"Urban": ["couple city", "romance urban", "love city lights"],
"Minimal": ["hands holding", "simple love", "minimal romance"],
"Abstract": ["hearts flowing", "love particles", "connection abstract"],
},
}
queries = search_strategies.get(niche, {}).get(style, ["aesthetic nature"])
try:
headers = {"Authorization": PEXELS_API_KEY} if PEXELS_API_KEY else {}
query = random.choice(queries)
url = (
f"https://api.pexels.com/videos/search"
f"?query={query}&per_page=15&orientation=portrait"
)
response = requests.get(url, headers=headers)
data = response.json()
if "videos" in data and len(data["videos"]) > 0:
video = random.choice(data["videos"][:10])
video_files = video.get("video_files", [])
portrait_videos = [
vf
for vf in video_files
if vf.get("width", 0) < vf.get("height", 0)
]
if portrait_videos:
selected = random.choice(portrait_videos)
return {
"success": True,
"video_url": selected.get("link"),
"search_query": query,
"pexels_url": video.get("url"),
}
if video_files:
return {
"success": True,
"video_url": video_files[0].get("link"),
"search_query": query,
"pexels_url": video.get("url"),
}
return {
"success": False,
"video_url": None,
"search_query": query,
"pexels_url": None,
"error": "No suitable videos found",
}
except Exception as e:
return {
"success": False,
"video_url": None,
"search_query": "",
"pexels_url": None,
"error": str(e),
}
@tool
def create_quote_video_tool(video_url: str, quote_text: str, output_path: str) -> dict:
"""
Create a quote video by calling a Modal endpoint that overlays text on a background video.
Args:
video_url: Direct URL of the background video (e.g. from Pexels).
quote_text: The quote text to be overlaid on the video.
output_path: Local file path where the resulting video should be saved.
Returns:
A dictionary with:
- success: Whether the generation succeeded.
- output_path: The saved video path on disk (or None).
- message: A human-readable status message.
"""
modal_endpoint = os.getenv("MODAL_ENDPOINT_URL")
if not modal_endpoint:
print("ℹ️ MODAL_ENDPOINT_URL not configured, cannot generate video.")
return {
"success": False,
"output_path": None,
"message": (
"Modal endpoint not configured. Set MODAL_ENDPOINT_URL to use remote "
"video generation (modal deploy modal_video_processing.py)."
),
}
try:
print("πŸš€ Processing on Modal (fast!)...")
response = requests.post(
modal_endpoint,
json={
"video_url": video_url,
"quote_text": quote_text,
},
timeout=120,
)
if response.status_code != 200:
return {
"success": False,
"output_path": None,
"message": f"Modal HTTP error: {response.status_code}",
}
result = response.json()
if not result.get("success"):
return {
"success": False,
"output_path": None,
"message": f"Modal error: {result.get('error', 'Unknown error')}",
}
import base64
video_b64 = result["video"]
video_bytes = base64.b64decode(video_b64)
with open(output_path, "wb") as f:
f.write(video_bytes)
size_mb = result.get("size_mb", len(video_bytes) / 1024 / 1024)
print(f"βœ… Modal processing complete! {size_mb:.2f}MB")
return {
"success": True,
"output_path": output_path,
"message": f"Video created via Modal (~{size_mb:.2f}MB).",
}
except Exception as e:
return {
"success": False,
"output_path": None,
"message": f"Error calling Modal: {str(e)}",
}
# -------------------------------------------------
# AGENT INITIALIZATION
# -------------------------------------------------
def initialize_agent():
"""Initialize the CodeAgent with optional MCP client."""
try:
hf_token = os.getenv("HF_TOKEN")
model = InferenceClient(token=hf_token)
agent = CodeAgent(
tools=[generate_quote_tool, search_pexels_video_tool, create_quote_video_tool],
model=model,
additional_authorized_imports=[
"os",
"time",
"json",
"random",
"tempfile",
"requests",
],
max_steps=15,
)
if mcp_enabled:
agent.mcp_clients = [mcp_client]
return agent, None
except Exception as e:
return None, f"Agent initialization error: {str(e)}"
agent, agent_error = initialize_agent()
# -------------------------------------------------
# PIPELINES
# -------------------------------------------------
def mcp_agent_pipeline(niche: str, style: str, num_variations: int = 1):
"""
MAIN PIPELINE: uses smolagents CodeAgent.run to plan & call tools.
The agent:
- calls generate_quote_tool
- calls search_pexels_video_tool multiple times
- calls create_quote_video_tool for each variation
- returns JSON with status_log + video_paths
"""
base_log = ["πŸ€– **MCP AGENT RUN**"]
if agent_error or agent is None:
base_log.append(f"❌ Agent initialization failed: {agent_error}")
base_log.append("πŸ”„ Falling back to direct tool pipeline...")
status, vids = fallback_pipeline(niche, style, num_variations)
return "\n".join(base_log + [status]), vids
try:
output_dir = "/tmp/quote_videos"
gallery_dir = "/data/gallery_videos"
os.makedirs(output_dir, exist_ok=True)
os.makedirs(gallery_dir, exist_ok=True)
timestamp = int(time.time())
base_prefix = os.path.join(output_dir, f"agent_{timestamp}_v")
user_task = f"""
You are an autonomous Python agent helping creators generate short vertical quote videos.
Niche: {niche}
Style: {style}
Number of variations: {num_variations}
You have these TOOLS available:
1. generate_quote_tool(niche: str, style: str) -> str
- Returns a single quote as plain text.
2. search_pexels_video_tool(style: str, niche: str) -> dict
- Returns a dict with:
- "success": bool
- "video_url": str or None
3. create_quote_video_tool(video_url: str, quote_text: str, output_path: str) -> dict
- Writes a video file to output_path and returns a dict with:
- "success": bool
- "output_path": str or None
Your job:
1. Call generate_quote_tool once to obtain quote_text.
2. For each variation i from 1 to {num_variations}:
- Call search_pexels_video_tool(style, niche).
- If it succeeds, compute output_path exactly as:
"{base_prefix}{{i}}.mp4"
- Call create_quote_video_tool(video_url, quote_text, output_path).
3. Collect only variations where create_quote_video_tool returns success == True and a non-empty output_path.
4. Build a human-readable status_log string summarizing:
- Which tools you called
- How many videos succeeded or failed
5. Return ONLY a valid JSON object of the form:
{{
"status_log": "multi-line human readable description of what you did",
"video_paths": [
"{base_prefix}1.mp4",
"... only paths that actually succeeded ..."
]
}}
CRITICAL:
- Do NOT wrap the JSON in markdown or backticks.
- Do NOT add extra keys.
- Do NOT print anything except the JSON object as your final answer.
"""
agent_result = agent.run(user_task)
try:
parsed = json.loads(agent_result)
except Exception as parse_err:
raise ValueError(
f"Agent output was not valid JSON: {parse_err}\n"
f"Raw agent output (first 500 chars): {agent_result[:500]}"
)
status_log = parsed.get("status_log", "")
video_paths = parsed.get("video_paths", [])
valid_paths = [
p for p in video_paths if isinstance(p, str) and os.path.exists(p)
]
if not valid_paths:
raise ValueError("Agent returned no valid video paths or files not found.")
for idx, path in enumerate(valid_paths):
try:
filename = os.path.basename(path)
gallery_path = os.path.join(
gallery_dir,
f"gallery_{timestamp}_v{idx+1}_{filename}",
)
shutil.copy2(path, gallery_path)
except Exception as e:
print(f"⚠️ Failed to copy to gallery for {path}: {e}")
full_status = "\n".join(base_log + [status_log])
return full_status, valid_paths[:3]
except Exception as e:
fallback_status, fallback_videos = fallback_pipeline(niche, style, num_variations)
combined_status = "\n".join(
base_log
+ [f"⚠️ Agent pipeline error: {str(e)}", "", "πŸ”„ Switched to fallback pipeline:", fallback_status]
)
return combined_status, fallback_videos
def fallback_pipeline(niche: str, style: str, num_variations: int = 1):
"""Fallback pipeline: direct tool calls without agent planning."""
status_log = []
status_log.append("πŸ”„ **FALLBACK MODE (Direct Tool Execution)**\n")
status_log.append("🧠 Generating quote with HybridQuoteGenerator...")
quote = generate_quote_tool(niche, style)
if isinstance(quote, str) and quote.startswith("Error generating quote"):
return "\n".join(status_log) + f"\n❌ {quote}", []
status_log.append(" βœ… Quote generated\n")
status_log.append(f"πŸ” Searching for {num_variations} videos...")
video_results = []
for _ in range(num_variations):
vr = search_pexels_video_tool(style, niche)
if vr.get("success"):
video_results.append(vr)
if not video_results:
status_log.append("❌ No videos found\n")
return "\n".join(status_log), []
status_log.append(f" βœ… Found {len(video_results)} videos\n")
status_log.append("🎬 Creating videos via Modal...")
output_dir = "/tmp/quote_videos"
gallery_dir = "/data/gallery_videos"
os.makedirs(output_dir, exist_ok=True)
os.makedirs(gallery_dir, exist_ok=True)
timestamp = int(time.time())
created_videos = []
for i, vr in enumerate(video_results):
output_filename = f"quote_video_v{i+1}_{timestamp}.mp4"
output_path = os.path.join(output_dir, output_filename)
creation_result = create_quote_video_tool(
video_url=vr["video_url"],
quote_text=quote,
output_path=output_path,
)
if creation_result.get("success"):
created_videos.append(creation_result["output_path"])
gallery_filename = f"gallery_{timestamp}_v{i+1}.mp4"
gallery_path = os.path.join(gallery_dir, gallery_filename)
try:
shutil.copy2(creation_result["output_path"], gallery_path)
except Exception as e:
print(f"⚠️ Gallery copy failed: {e}")
else:
error_msg = creation_result.get("message", "Unknown error")
status_log.append(f" ❌ Video {i+1} error: {error_msg}")
if not created_videos:
status_log.append("❌ Video creation failed\n")
return "\n".join(status_log), []
status_log.append(f" βœ… Created {len(created_videos)} videos!\n")
status_log.append("🎬 **COMPLETE!**")
return "\n".join(status_log), created_videos
# -------------------------------------------------
# GRADIO UI
# -------------------------------------------------
with gr.Blocks(
title="AIQuoteClipGenerator - MCP + Gemini Edition",
theme=gr.themes.Soft(),
) as demo:
gr.Markdown(
"""
# 🎬 AIQuoteClipGenerator
### MCP-Powered with Gemini AI
**Key Features:**
- 🌟 **Gemini AI** with quote-history to avoid repetition
- πŸ€– **smolagents CodeAgent** for planning & tool-use
- πŸ”— **MCP Client Ready** (uses external MCP tools if available)
- πŸŽ₯ **Modal** for fast video rendering
- 🎨 Generate multiple vertical quote video variations
"""
)
with gr.Accordion("πŸ“Έ Example Gallery - Recent Videos", open=True):
gr.Markdown(
"See what others (or you) have generated. Auto-updates after each run."
)
with gr.Row():
gallery_video1 = gr.Video(height=300, show_label=False, interactive=False)
gallery_video2 = gr.Video(height=300, show_label=False, interactive=False)
gallery_video3 = gr.Video(height=300, show_label=False, interactive=False)
with gr.Row():
gallery_video4 = gr.Video(height=300, show_label=False, interactive=False)
gallery_video5 = gr.Video(height=300, show_label=False, interactive=False)
gallery_video6 = gr.Video(height=300, show_label=False, interactive=False)
def load_gallery_videos():
gallery_output_dir = "/data/gallery_videos"
os.makedirs(gallery_output_dir, exist_ok=True)
import glob
existing_videos = sorted(
glob.glob(os.path.join(gallery_output_dir, "*.mp4")),
key=os.path.getmtime,
reverse=True,
)[:6]
videos = [None] * 6
for i, path in enumerate(existing_videos):
if i < 6:
videos[i] = path
return videos
gr.Markdown("---")
gr.Markdown("## 🎯 Generate Your Own Quote Video")
with gr.Row():
with gr.Column():
gr.Markdown("### 🎯 Input")
niche = gr.Dropdown(
choices=[
"Motivation",
"Business/Entrepreneurship",
"Fitness",
"Mindfulness",
"Stoicism",
"Leadership",
"Love & Relationships",
],
label="πŸ“‚ Select Niche",
value="Motivation",
)
style = gr.Dropdown(
choices=["Cinematic", "Nature", "Urban", "Minimal", "Abstract"],
label="🎨 Visual Style",
value="Cinematic",
)
num_variations = gr.Slider(
minimum=1,
maximum=3,
step=1,
value=1,
label="🎬 Number of Video Variations",
info="Generate multiple versions to choose from",
)
generate_btn = gr.Button(
"πŸ€– Run MCP Agent with Gemini", variant="primary", size="lg"
)
with gr.Column():
gr.Markdown("### πŸ“Š MCP Agent Activity Log")
output = gr.Textbox(lines=20, show_label=False)
with gr.Row():
gr.Markdown("### ✨ Your Quote Videos")
with gr.Row():
video1 = gr.Video(label="Video 1", visible=True, height=500)
video2 = gr.Video(label="Video 2", visible=False, height=500)
video3 = gr.Video(label="Video 3", visible=False, height=500)
gr.Markdown(
"""
---
### ✨ Features
- 🌟 **Gemini-powered** quote variety (history-aware)
- 🎨 Multiple aesthetic video variations
- ⚑ **Modal**-accelerated rendering
- πŸ€– **smolagents** CodeAgent for autonomous tool-calling
- πŸ”— Optional MCP integration via MCPClient
### πŸ† Hackathon: MCP 1st Birthday
**Track:** Track 2 - MCP in Action
**Category:** Productivity / Creator Tools
**Stack:** Gradio Β· smolagents Β· Gemini Β· OpenAI Β· Pexels Β· Modal Β· MCP
"""
)
def process_and_display(niche, style, num_variations):
status, videos = mcp_agent_pipeline(
niche=str(niche),
style=str(style),
num_variations=int(num_variations),
)
v1 = videos[0] if len(videos) > 0 else None
v2 = videos[1] if len(videos) > 1 else None
v3 = videos[2] if len(videos) > 2 else None
gallery_vids = load_gallery_videos()
return [status, v1, v2, v3] + gallery_vids
generate_btn.click(
process_and_display,
inputs=[niche, style, num_variations],
outputs=[
output,
video1,
video2,
video3,
gallery_video1,
gallery_video2,
gallery_video3,
gallery_video4,
gallery_video5,
gallery_video6,
],
)
demo.load(
load_gallery_videos,
outputs=[
gallery_video1,
gallery_video2,
gallery_video3,
gallery_video4,
gallery_video5,
gallery_video6,
],
)
if __name__ == "__main__":
demo.launch(allowed_paths=["/data/gallery_videos"])