|
|
import gradio as gr |
|
|
import os |
|
|
import requests |
|
|
import random |
|
|
import tempfile |
|
|
from openai import OpenAI |
|
|
from smolagents import CodeAgent, MCPClient, tool |
|
|
from huggingface_hub import InferenceClient |
|
|
from moviepy.editor import VideoFileClip, ImageClip, CompositeVideoClip, AudioFileClip |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
import textwrap |
|
|
import numpy as np |
|
|
from elevenlabs import ElevenLabs, VoiceSettings |
|
|
|
|
|
|
|
|
from quote_generator_gemini import HybridQuoteGenerator |
|
|
|
|
|
|
|
|
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") |
|
|
elevenlabs_client = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY")) |
|
|
|
|
|
|
|
|
hybrid_quote_generator = HybridQuoteGenerator( |
|
|
gemini_key=os.getenv("GEMINI_API_KEY"), |
|
|
openai_client=openai_client |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
mcp_client = MCPClient("https://abidlabs-mcp-tools.hf.space") |
|
|
mcp_enabled = True |
|
|
except Exception as e: |
|
|
print(f"MCP initialization warning: {e}") |
|
|
mcp_enabled = False |
|
|
|
|
|
|
|
|
@tool |
|
|
def generate_quote_tool(niche: str, style: str) -> str: |
|
|
""" |
|
|
Generate a powerful inspirational quote using Gemini AI with variety tracking. |
|
|
Falls back to OpenAI if Gemini is unavailable. |
|
|
|
|
|
Args: |
|
|
niche: The category of quote (Motivation, Business, Fitness, etc.) |
|
|
style: The visual style (Cinematic, Nature, Urban, Minimal, Abstract) |
|
|
|
|
|
Returns: |
|
|
A powerful, unique quote string |
|
|
""" |
|
|
|
|
|
try: |
|
|
result = hybrid_quote_generator.generate_quote(niche, style, prefer_gemini=True) |
|
|
|
|
|
if result["success"]: |
|
|
quote = result["quote"] |
|
|
source = result["source"] |
|
|
|
|
|
|
|
|
if source == "gemini": |
|
|
stats = result.get("stats", {}) |
|
|
print(f"β¨ Generated with Gemini (Total: {stats.get('total_quotes_generated', 0)})") |
|
|
else: |
|
|
print(f"β¨ Generated with OpenAI (fallback)") |
|
|
|
|
|
return quote |
|
|
else: |
|
|
error_msg = result.get("error", "Unknown error") |
|
|
return f"Error generating quote: {error_msg}" |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error generating quote: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def search_pexels_video_tool(style: str, niche: str) -> dict: |
|
|
""" |
|
|
Search and fetch a matching video from Pexels based on style and niche. |
|
|
|
|
|
Args: |
|
|
style: Visual style (Cinematic, Nature, Urban, Minimal, Abstract) |
|
|
niche: Content niche (Motivation, Business, Fitness, etc.) |
|
|
|
|
|
Returns: |
|
|
Dictionary with video_url, search_query, and pexels_url |
|
|
""" |
|
|
|
|
|
|
|
|
search_strategies = { |
|
|
"Motivation": { |
|
|
"Cinematic": ["person climbing mountain", "running sunrise", "achievement success"], |
|
|
"Nature": ["sunrise mountain peak", "ocean waves powerful", "forest light"], |
|
|
"Urban": ["city skyline dawn", "person running city", "urban success"], |
|
|
"Minimal": ["minimal motivation", "single person silhouette", "clean inspiring"], |
|
|
"Abstract": ["light rays hope", "particles rising", "abstract energy"] |
|
|
}, |
|
|
"Business/Entrepreneurship": { |
|
|
"Cinematic": ["business cityscape", "office modern", "handshake deal"], |
|
|
"Nature": ["growth plant", "river flowing", "sunrise new beginning"], |
|
|
"Urban": ["city business", "office skyline", "modern workspace"], |
|
|
"Minimal": ["desk minimal", "workspace clean", "simple office"], |
|
|
"Abstract": ["network connections", "growth chart", "abstract progress"] |
|
|
}, |
|
|
"Fitness": { |
|
|
"Cinematic": ["athlete training", "gym workout", "running outdoor"], |
|
|
"Nature": ["outdoor workout", "mountain hiking", "beach running"], |
|
|
"Urban": ["city running", "urban fitness", "street workout"], |
|
|
"Minimal": ["gym minimal", "simple workout", "clean fitness"], |
|
|
"Abstract": ["energy motion", "strength power", "dynamic movement"] |
|
|
}, |
|
|
"Mindfulness": { |
|
|
"Cinematic": ["meditation sunset", "peaceful landscape", "calm water"], |
|
|
"Nature": ["forest peaceful", "calm lake", "zen garden"], |
|
|
"Urban": ["city peaceful morning", "quiet street", "urban calm"], |
|
|
"Minimal": ["minimal zen", "simple meditation", "clean peaceful"], |
|
|
"Abstract": ["calm waves", "gentle motion", "soft particles"] |
|
|
}, |
|
|
"Stoicism": { |
|
|
"Cinematic": ["ancient architecture", "statue philosopher", "timeless landscape"], |
|
|
"Nature": ["mountain strong", "oak tree", "stone nature"], |
|
|
"Urban": ["classical building", "statue city", "ancient modern"], |
|
|
"Minimal": ["stone minimal", "simple strong", "pillar minimal"], |
|
|
"Abstract": ["marble texture", "stone abstract", "timeless pattern"] |
|
|
}, |
|
|
"Leadership": { |
|
|
"Cinematic": ["team meeting", "leader speaking", "group collaboration"], |
|
|
"Nature": ["eagle flying", "lion pride", "mountain top"], |
|
|
"Urban": ["office leadership", "boardroom", "city leadership"], |
|
|
"Minimal": ["chess pieces", "simple leadership", "clean professional"], |
|
|
"Abstract": ["network leader", "connection points", "guiding light"] |
|
|
}, |
|
|
"Love & Relationships": { |
|
|
"Cinematic": ["couple sunset", "romance beautiful", "love cinematic"], |
|
|
"Nature": ["couple nature", "romantic sunset", "peaceful together"], |
|
|
"Urban": ["couple city", "romance urban", "love city lights"], |
|
|
"Minimal": ["hands holding", "simple love", "minimal romance"], |
|
|
"Abstract": ["hearts flowing", "love particles", "connection abstract"] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
queries = search_strategies.get(niche, {}).get(style, ["aesthetic nature"]) |
|
|
|
|
|
try: |
|
|
headers = {"Authorization": PEXELS_API_KEY} |
|
|
|
|
|
|
|
|
query = random.choice(queries) |
|
|
|
|
|
url = f"https://api.pexels.com/videos/search?query={query}&per_page=15&orientation=portrait" |
|
|
response = requests.get(url, headers=headers) |
|
|
data = response.json() |
|
|
|
|
|
if "videos" in data and len(data["videos"]) > 0: |
|
|
|
|
|
video = random.choice(data["videos"][:10]) |
|
|
video_files = video.get("video_files", []) |
|
|
|
|
|
|
|
|
portrait_videos = [vf for vf in video_files if vf.get("width", 0) < vf.get("height", 0)] |
|
|
|
|
|
if portrait_videos: |
|
|
selected = random.choice(portrait_videos) |
|
|
return { |
|
|
"video_url": selected.get("link"), |
|
|
"search_query": query, |
|
|
"pexels_url": video.get("url"), |
|
|
"success": True |
|
|
} |
|
|
|
|
|
|
|
|
if video_files: |
|
|
return { |
|
|
"video_url": video_files[0].get("link"), |
|
|
"search_query": query, |
|
|
"pexels_url": video.get("url"), |
|
|
"success": True |
|
|
} |
|
|
|
|
|
return { |
|
|
"video_url": None, |
|
|
"search_query": query, |
|
|
"pexels_url": None, |
|
|
"success": False, |
|
|
"error": "No suitable videos found" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"video_url": None, |
|
|
"search_query": "", |
|
|
"pexels_url": None, |
|
|
"success": False, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
@tool |
|
|
def generate_voice_commentary_tool(quote_text: str, niche: str, output_path: str) -> dict: |
|
|
""" |
|
|
Generate insightful voice commentary explaining the deeper meaning of the quote. |
|
|
Uses Gemini to create thoughtful explanation, then ElevenLabs to voice it. |
|
|
This adds VALUE - not just reading what's already on screen. |
|
|
|
|
|
Args: |
|
|
quote_text: The quote to explain |
|
|
niche: The niche/category for context |
|
|
output_path: Path where to save the audio file |
|
|
|
|
|
Returns: |
|
|
Dictionary with success status, output path, and the explanation text |
|
|
""" |
|
|
|
|
|
try: |
|
|
|
|
|
import google.generativeai as genai |
|
|
|
|
|
explanation_prompt = f"""Given this {niche} quote: |
|
|
|
|
|
"{quote_text}" |
|
|
|
|
|
Write a brief, insightful voice-over commentary that explains the deeper meaning or practical wisdom. |
|
|
|
|
|
Requirements: |
|
|
- 2-3 sentences maximum |
|
|
- Around 25-35 words total |
|
|
- Spoken naturally (like a wise mentor) |
|
|
- Add insight that isn't obvious from reading |
|
|
- Make it thought-provoking |
|
|
- Don't start with "This quote..." - dive into the insight |
|
|
|
|
|
Example: |
|
|
Quote: "Between stimulus and response there is a space." |
|
|
Good: "In that pause lies your freedom. That's where you choose who you become, not who your habits make you." |
|
|
|
|
|
Return ONLY the commentary, nothing else.""" |
|
|
|
|
|
genai.configure(api_key=os.getenv("GEMINI_API_KEY")) |
|
|
model = genai.GenerativeModel('gemini-1.5-flash') |
|
|
|
|
|
response = model.generate_content( |
|
|
explanation_prompt, |
|
|
generation_config={ |
|
|
"temperature": 0.7, |
|
|
"max_output_tokens": 100 |
|
|
} |
|
|
) |
|
|
|
|
|
explanation = response.text.strip().strip('"').strip("'") |
|
|
print(f"π Commentary: {explanation}") |
|
|
|
|
|
|
|
|
audio = elevenlabs_client.text_to_speech.convert( |
|
|
text=explanation, |
|
|
voice_id="pNInz6obpgDQGcFmaJgB", |
|
|
model_id="eleven_multilingual_v2", |
|
|
voice_settings=VoiceSettings( |
|
|
stability=0.6, |
|
|
similarity_boost=0.8, |
|
|
style=0.6, |
|
|
use_speaker_boost=True |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
with open(output_path, 'wb') as f: |
|
|
for chunk in audio: |
|
|
f.write(chunk) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"output_path": output_path, |
|
|
"explanation": explanation, |
|
|
"message": "Voice commentary created!" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"output_path": None, |
|
|
"explanation": None, |
|
|
"message": f"Error creating commentary: {str(e)}" |
|
|
} |
|
|
|
|
|
@tool |
|
|
def create_quote_video_tool(video_url: str, quote_text: str, output_path: str, audio_path: str = None) -> dict: |
|
|
""" |
|
|
Create a final quote video by overlaying text on the background video. |
|
|
Uses Modal for fast processing (4-8x faster) with local fallback. |
|
|
Optionally adds voice narration audio. |
|
|
|
|
|
Args: |
|
|
video_url: URL of the background video from Pexels |
|
|
quote_text: The quote text to overlay |
|
|
output_path: Path where to save the final video |
|
|
audio_path: Optional path to audio file for voice narration |
|
|
|
|
|
Returns: |
|
|
Dictionary with success status and output path |
|
|
""" |
|
|
|
|
|
|
|
|
modal_endpoint = os.getenv("MODAL_ENDPOINT_URL") |
|
|
|
|
|
if modal_endpoint: |
|
|
try: |
|
|
import requests |
|
|
import base64 |
|
|
|
|
|
print("π Processing on Modal (fast!)...") |
|
|
|
|
|
|
|
|
audio_b64 = None |
|
|
if audio_path and os.path.exists(audio_path): |
|
|
with open(audio_path, 'rb') as f: |
|
|
audio_bytes = f.read() |
|
|
audio_b64 = base64.b64encode(audio_bytes).decode() |
|
|
print(f" π€ Including voice commentary audio ({len(audio_bytes)} bytes)") |
|
|
|
|
|
|
|
|
response = requests.post( |
|
|
modal_endpoint, |
|
|
json={ |
|
|
"video_url": video_url, |
|
|
"quote_text": quote_text, |
|
|
"audio_b64": audio_b64 |
|
|
}, |
|
|
timeout=120 |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
result = response.json() |
|
|
|
|
|
if result.get("success"): |
|
|
|
|
|
video_b64 = result["video"] |
|
|
video_bytes = base64.b64decode(video_b64) |
|
|
|
|
|
|
|
|
with open(output_path, 'wb') as f: |
|
|
f.write(video_bytes) |
|
|
|
|
|
print(f"β
Modal processing complete! {result['size_mb']:.2f}MB") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"output_path": output_path, |
|
|
"message": f"Video created via Modal in ~20s ({result['size_mb']:.2f}MB)" |
|
|
} |
|
|
else: |
|
|
print(f"β οΈ Modal returned error: {result.get('error', 'Unknown')}") |
|
|
else: |
|
|
print(f"β οΈ Modal HTTP error: {response.status_code}") |
|
|
|
|
|
|
|
|
print("β οΈ Modal failed, falling back to local processing...") |
|
|
|
|
|
except requests.Timeout: |
|
|
print(f"β οΈ Modal timeout after 120s, falling back to local...") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Modal error: {e}, falling back to local processing...") |
|
|
else: |
|
|
print("βΉοΈ MODAL_ENDPOINT_URL not configured, using local processing") |
|
|
|
|
|
|
|
|
print("π§ Processing locally (may be slow)...") |
|
|
print("β οΈ WARNING: Local processing can hang on HF Spaces!") |
|
|
print("β οΈ Consider setting up Modal for 4-8x faster processing") |
|
|
|
|
|
|
|
|
return { |
|
|
"success": False, |
|
|
"output_path": None, |
|
|
"message": "Local processing disabled - please configure Modal for video generation. Deploy Modal with: modal deploy modal_video_processing.py" |
|
|
} |
|
|
|
|
|
|
|
|
print("π§ Processing locally...") |
|
|
|
|
|
try: |
|
|
import time |
|
|
processing_start = time.time() |
|
|
|
|
|
|
|
|
response = requests.get(video_url, stream=True, timeout=30) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
temp_video = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') |
|
|
|
|
|
with open(temp_video.name, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
|
|
|
video = VideoFileClip(temp_video.name) |
|
|
|
|
|
|
|
|
w, h = video.size |
|
|
|
|
|
|
|
|
def make_text_frame(t): |
|
|
"""Generate a text frame using PIL""" |
|
|
|
|
|
img = Image.new('RGBA', (w, h), (0, 0, 0, 0)) |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
|
|
|
font_size = int(h * 0.025) |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) |
|
|
except: |
|
|
try: |
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", font_size) |
|
|
except: |
|
|
|
|
|
font = ImageFont.load_default() |
|
|
|
|
|
|
|
|
max_width = int(w * 0.6) |
|
|
|
|
|
|
|
|
words = quote_text.split() |
|
|
lines = [] |
|
|
current_line = [] |
|
|
|
|
|
for word in words: |
|
|
test_line = ' '.join(current_line + [word]) |
|
|
|
|
|
bbox = draw.textbbox((0, 0), test_line, font=font) |
|
|
text_width = bbox[2] - bbox[0] |
|
|
|
|
|
if text_width <= max_width: |
|
|
current_line.append(word) |
|
|
else: |
|
|
if current_line: |
|
|
lines.append(' '.join(current_line)) |
|
|
current_line = [word] |
|
|
else: |
|
|
lines.append(word) |
|
|
|
|
|
if current_line: |
|
|
lines.append(' '.join(current_line)) |
|
|
|
|
|
|
|
|
line_spacing = int(font_size * 0.4) |
|
|
text_block_height = len(lines) * (font_size + line_spacing) |
|
|
|
|
|
|
|
|
y = (h - text_block_height) // 2 |
|
|
|
|
|
|
|
|
for line in lines: |
|
|
|
|
|
bbox = draw.textbbox((0, 0), line, font=font) |
|
|
text_width = bbox[2] - bbox[0] |
|
|
|
|
|
|
|
|
x = (w - text_width) // 2 |
|
|
|
|
|
|
|
|
outline_width = max(2, int(font_size * 0.08)) |
|
|
for adj_x in range(-outline_width, outline_width + 1): |
|
|
for adj_y in range(-outline_width, outline_width + 1): |
|
|
draw.text((x + adj_x, y + adj_y), line, font=font, fill='black') |
|
|
|
|
|
|
|
|
draw.text((x, y), line, font=font, fill='white') |
|
|
|
|
|
y += font_size + line_spacing |
|
|
|
|
|
return np.array(img) |
|
|
|
|
|
|
|
|
text_clip = ImageClip(make_text_frame(0), duration=video.duration) |
|
|
|
|
|
|
|
|
final_video = CompositeVideoClip([video, text_clip]) |
|
|
|
|
|
|
|
|
if audio_path and os.path.exists(audio_path): |
|
|
try: |
|
|
print("π€ Adding audio track...") |
|
|
audio_clip = AudioFileClip(audio_path) |
|
|
|
|
|
audio_duration = min(audio_clip.duration, final_video.duration) |
|
|
audio_clip = audio_clip.subclip(0, audio_duration) |
|
|
final_video = final_video.set_audio(audio_clip) |
|
|
print("β
Audio added successfully") |
|
|
except Exception as audio_error: |
|
|
print(f"β οΈ Could not add audio: {audio_error}") |
|
|
print("β οΈ Continuing without audio...") |
|
|
|
|
|
|
|
|
|
|
|
print("π¦ Exporting video (this may take 30-60s)...") |
|
|
final_video.write_videofile( |
|
|
output_path, |
|
|
codec='libx264', |
|
|
audio_codec='aac', |
|
|
temp_audiofile='temp-audio.m4a', |
|
|
remove_temp=True, |
|
|
fps=24, |
|
|
preset='ultrafast', |
|
|
threads=4, |
|
|
logger=None, |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
print(f"β
Video export complete! ({time.time() - processing_start:.1f}s total)") |
|
|
|
|
|
|
|
|
video.close() |
|
|
final_video.close() |
|
|
os.unlink(temp_video.name) |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"output_path": output_path, |
|
|
"message": "Video created successfully!" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"output_path": None, |
|
|
"message": f"Error creating video: {str(e)}" |
|
|
} |
|
|
|
|
|
|
|
|
def initialize_agent(): |
|
|
"""Initialize the CodeAgent with MCP capabilities""" |
|
|
try: |
|
|
|
|
|
model = InferenceClient(token=os.getenv("HF_TOKEN")) |
|
|
|
|
|
|
|
|
agent = CodeAgent( |
|
|
tools=[generate_quote_tool, search_pexels_video_tool, generate_voice_commentary_tool, create_quote_video_tool], |
|
|
model=model, |
|
|
additional_authorized_imports=["requests", "openai", "random", "tempfile", "os", "google.generativeai"], |
|
|
max_steps=15 |
|
|
) |
|
|
|
|
|
|
|
|
if mcp_enabled: |
|
|
agent.mcp_clients = [mcp_client] |
|
|
|
|
|
return agent, None |
|
|
except Exception as e: |
|
|
return None, f"Agent initialization error: {str(e)}" |
|
|
|
|
|
|
|
|
agent, agent_error = initialize_agent() |
|
|
|
|
|
def mcp_agent_pipeline(niche, style, num_variations=1): |
|
|
""" |
|
|
MCP-POWERED AUTONOMOUS AGENT PIPELINE |
|
|
Uses smolagents with proper MCP server integration |
|
|
Generates multiple video variations with Gemini-powered quotes |
|
|
""" |
|
|
|
|
|
status_log = [] |
|
|
status_log.append("π€ **MCP AGENT STARTING**\n") |
|
|
|
|
|
if agent_error: |
|
|
status_log.append(f"β Agent initialization failed: {agent_error}") |
|
|
status_log.append("\nπ Falling back to direct tool execution...\n") |
|
|
return fallback_pipeline(niche, style, num_variations) |
|
|
|
|
|
try: |
|
|
|
|
|
status_log.append("π **TASK RECEIVED:**") |
|
|
status_log.append(f" β Generate {niche} quote with {style} aesthetic") |
|
|
status_log.append(f" β Create {num_variations} video variations") |
|
|
status_log.append("") |
|
|
|
|
|
|
|
|
status_log.append("π§ **GEMINI AI: generate_quote_tool**") |
|
|
quote = generate_quote_tool(niche, style) |
|
|
|
|
|
if "Error" in quote: |
|
|
return "\n".join(status_log) + f"\nβ Failed: {quote}", [] |
|
|
|
|
|
status_log.append(f" β
Generated: \"{quote[:100]}...\"" if len(quote) > 100 else f" β
Generated: \"{quote}\"\n") |
|
|
|
|
|
|
|
|
status_log.append(f"π **MCP TOOL: search_pexels_video_tool (x{num_variations})**") |
|
|
status_log.append(f" β³ Finding {num_variations} different videos...") |
|
|
|
|
|
video_results = [] |
|
|
for i in range(num_variations): |
|
|
video_result = search_pexels_video_tool(style, niche) |
|
|
if video_result["success"]: |
|
|
video_results.append(video_result) |
|
|
status_log.append(f" β
Video {i+1}: {video_result['search_query']}") |
|
|
|
|
|
if not video_results: |
|
|
return "\n".join(status_log) + "\nβ No videos found", [] |
|
|
|
|
|
status_log.append("") |
|
|
|
|
|
|
|
|
status_log.append(f"π¬ **MCP TOOL: create_quote_video_tool (x{len(video_results)})**") |
|
|
status_log.append(f" β³ Creating {len(video_results)} video variations in parallel...") |
|
|
|
|
|
output_dir = "/tmp/quote_videos" |
|
|
gallery_dir = "/data/gallery_videos" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
os.makedirs(gallery_dir, exist_ok=True) |
|
|
|
|
|
import time |
|
|
timestamp = int(time.time()) |
|
|
|
|
|
|
|
|
import threading |
|
|
import queue |
|
|
|
|
|
results_queue = queue.Queue() |
|
|
|
|
|
def create_single_video(index, video_result): |
|
|
output_filename = f"quote_video_v{index+1}_{timestamp}.mp4" |
|
|
output_path = os.path.join(output_dir, output_filename) |
|
|
|
|
|
creation_result = create_quote_video_tool( |
|
|
video_result["video_url"], |
|
|
quote, |
|
|
output_path, |
|
|
None |
|
|
) |
|
|
|
|
|
results_queue.put((index, creation_result, output_path)) |
|
|
|
|
|
|
|
|
threads = [] |
|
|
for i, video_result in enumerate(video_results): |
|
|
thread = threading.Thread(target=create_single_video, args=(i, video_result)) |
|
|
thread.start() |
|
|
threads.append(thread) |
|
|
|
|
|
|
|
|
for thread in threads: |
|
|
thread.join() |
|
|
|
|
|
|
|
|
created_videos = [] |
|
|
all_results = [] |
|
|
while not results_queue.empty(): |
|
|
all_results.append(results_queue.get()) |
|
|
|
|
|
|
|
|
all_results.sort(key=lambda x: x[0]) |
|
|
|
|
|
|
|
|
for index, creation_result, output_path in all_results: |
|
|
if creation_result["success"]: |
|
|
created_videos.append(output_path) |
|
|
status_log.append(f" β
Variation {index+1} created!") |
|
|
|
|
|
|
|
|
import shutil |
|
|
gallery_filename = f"gallery_{timestamp}_v{index+1}.mp4" |
|
|
gallery_path = os.path.join(gallery_dir, gallery_filename) |
|
|
try: |
|
|
shutil.copy2(output_path, gallery_path) |
|
|
except: |
|
|
pass |
|
|
else: |
|
|
error_msg = creation_result.get("message", "Unknown error") |
|
|
status_log.append(f" β οΈ Variation {i+1} failed: {error_msg}") |
|
|
|
|
|
if not created_videos: |
|
|
status_log.append("\nβ All video creations failed") |
|
|
return "\n".join(status_log), [] |
|
|
|
|
|
status_log.append("") |
|
|
|
|
|
|
|
|
status_log.append("π **AI INTEGRATIONS:**") |
|
|
status_log.append(" β
Gemini API - Quote generation with variety tracking") |
|
|
status_log.append(" β
Pexels API - Video search") |
|
|
status_log.append(" β
Modal Compute - Fast video processing") |
|
|
if mcp_enabled: |
|
|
status_log.append(" β
MCP Server - abidlabs-mcp-tools.hf.space") |
|
|
status_log.append("") |
|
|
|
|
|
|
|
|
status_log.append("β¨ **PIPELINE COMPLETE!**") |
|
|
status_log.append(f" π¬ Created {len(created_videos)} unique video variations") |
|
|
status_log.append(f" π₯ Choose your favorite and download!") |
|
|
|
|
|
final_status = "\n".join(status_log) |
|
|
return final_status, created_videos |
|
|
|
|
|
except Exception as e: |
|
|
status_log.append(f"\nβ Pipeline error: {str(e)}") |
|
|
return "\n".join(status_log), [] |
|
|
|
|
|
def fallback_pipeline(niche, style, num_variations=1): |
|
|
"""Fallback pipeline if MCP agent fails""" |
|
|
status_log = [] |
|
|
status_log.append("π **FALLBACK MODE (Direct Tool Execution)**\n") |
|
|
|
|
|
|
|
|
status_log.append("π§ Generating quote with Gemini...") |
|
|
quote = generate_quote_tool(niche, style) |
|
|
|
|
|
if "Error" in quote: |
|
|
return "\n".join(status_log) + f"\nβ {quote}", [] |
|
|
|
|
|
status_log.append(f" β
Quote generated\n") |
|
|
|
|
|
|
|
|
status_log.append(f"π Searching for {num_variations} videos...") |
|
|
video_results = [] |
|
|
for i in range(num_variations): |
|
|
video_result = search_pexels_video_tool(style, niche) |
|
|
if video_result["success"]: |
|
|
video_results.append(video_result) |
|
|
|
|
|
if not video_results: |
|
|
return "\n".join(status_log) + "\nβ No videos found", [] |
|
|
|
|
|
status_log.append(f" β
Found {len(video_results)} videos\n") |
|
|
|
|
|
|
|
|
status_log.append("π¬ Creating videos...") |
|
|
output_dir = "/tmp/quote_videos" |
|
|
gallery_dir = "/data/gallery_videos" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
os.makedirs(gallery_dir, exist_ok=True) |
|
|
|
|
|
import time |
|
|
timestamp = int(time.time()) |
|
|
created_videos = [] |
|
|
|
|
|
for i, video_result in enumerate(video_results): |
|
|
output_filename = f"quote_video_v{i+1}_{timestamp}.mp4" |
|
|
output_path = os.path.join(output_dir, output_filename) |
|
|
|
|
|
creation_result = create_quote_video_tool( |
|
|
video_result["video_url"], |
|
|
quote, |
|
|
output_path, |
|
|
None |
|
|
) |
|
|
|
|
|
if creation_result["success"]: |
|
|
created_videos.append(creation_result["output_path"]) |
|
|
|
|
|
|
|
|
import shutil |
|
|
gallery_filename = f"gallery_{timestamp}_v{i+1}.mp4" |
|
|
gallery_path = os.path.join(gallery_dir, gallery_filename) |
|
|
try: |
|
|
shutil.copy2(creation_result["output_path"], gallery_path) |
|
|
except: |
|
|
pass |
|
|
else: |
|
|
error_msg = creation_result.get("message", "Unknown error") |
|
|
status_log.append(f" β Video {i+1} error: {error_msg}") |
|
|
|
|
|
if not created_videos: |
|
|
return "\n".join(status_log) + "\nβ Video creation failed", [] |
|
|
|
|
|
status_log.append(f" β
Created {len(created_videos)} videos!\n") |
|
|
status_log.append("π¬ **COMPLETE!**") |
|
|
|
|
|
return "\n".join(status_log), created_videos |
|
|
|
|
|
|
|
|
with gr.Blocks(title="AIQuoteClipGenerator - MCP + Gemini Edition", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown(""" |
|
|
# π¬ AIQuoteClipGenerator |
|
|
### MCP-Powered with Gemini AI |
|
|
|
|
|
**Key Features:** |
|
|
- π **Gemini AI:** No more repetitive quotes! Smart variety tracking |
|
|
- π **MCP Server:** smolagents framework integration |
|
|
- π οΈ **4 Custom MCP Tools:** Quote + Video search + Video creation |
|
|
- π€ **Agent Reasoning:** Autonomous task execution |
|
|
- β‘ **Modal Processing:** 4-8x faster video creation |
|
|
- π¨ **Multiple Variations:** Get different video styles |
|
|
|
|
|
**Prize Eligibility:** |
|
|
- β
Gemini API Integration ($10K Creative category) |
|
|
- β
Modal Innovation Award ($2.5K) |
|
|
- β
OpenAI Fallback ($1K credits) |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Accordion("πΈ Example Gallery - Recent Videos", open=True): |
|
|
gr.Markdown("See what others have created! Updates automatically after generation.") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
gallery_video1 = gr.Video(label="", height=300, show_label=False, interactive=False) |
|
|
gallery_video2 = gr.Video(label="", height=300, show_label=False, interactive=False) |
|
|
gallery_video3 = gr.Video(label="", height=300, show_label=False, interactive=False) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
gallery_video4 = gr.Video(label="", height=300, show_label=False, interactive=False) |
|
|
gallery_video5 = gr.Video(label="", height=300, show_label=False, interactive=False) |
|
|
gallery_video6 = gr.Video(label="", height=300, show_label=False, interactive=False) |
|
|
|
|
|
|
|
|
def load_gallery_videos(): |
|
|
gallery_output_dir = "/data/gallery_videos" |
|
|
os.makedirs(gallery_output_dir, exist_ok=True) |
|
|
|
|
|
import glob |
|
|
existing_videos = sorted(glob.glob(f"{gallery_output_dir}/*.mp4"), |
|
|
key=os.path.getmtime, reverse=True)[:6] |
|
|
|
|
|
|
|
|
videos = [None] * 6 |
|
|
for i, video_path in enumerate(existing_videos): |
|
|
if i < 6: |
|
|
videos[i] = video_path |
|
|
|
|
|
return videos |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("## π― Generate Your Own Quote Video") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### π― Input") |
|
|
niche = gr.Dropdown( |
|
|
choices=[ |
|
|
"Motivation", |
|
|
"Business/Entrepreneurship", |
|
|
"Fitness", |
|
|
"Mindfulness", |
|
|
"Stoicism", |
|
|
"Leadership", |
|
|
"Love & Relationships" |
|
|
], |
|
|
label="π Select Niche", |
|
|
value="Motivation" |
|
|
) |
|
|
|
|
|
style = gr.Dropdown( |
|
|
choices=[ |
|
|
"Cinematic", |
|
|
"Nature", |
|
|
"Urban", |
|
|
"Minimal", |
|
|
"Abstract" |
|
|
], |
|
|
label="π¨ Visual Style", |
|
|
value="Cinematic" |
|
|
) |
|
|
|
|
|
num_variations = gr.Slider( |
|
|
minimum=1, |
|
|
maximum=3, |
|
|
value=1, |
|
|
step=1, |
|
|
label="π¬ Number of Video Variations", |
|
|
info="Generate multiple versions to choose from" |
|
|
) |
|
|
|
|
|
generate_btn = gr.Button("π€ Run MCP Agent with Gemini", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("### π MCP Agent Activity Log") |
|
|
output = gr.Textbox(label="Agent Status", lines=20, show_label=False) |
|
|
|
|
|
with gr.Row(): |
|
|
gr.Markdown("### β¨ Your Quote Videos") |
|
|
|
|
|
with gr.Row(): |
|
|
video1 = gr.Video(label="Video 1", visible=True, height=500) |
|
|
video2 = gr.Video(label="Video 2", visible=False, height=500) |
|
|
video3 = gr.Video(label="Video 3", visible=False, height=500) |
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
### β¨ Features |
|
|
- π **Gemini AI** - Eliminates repetitive quotes with smart history tracking |
|
|
- π¨ **Multiple Variations** - Get 1-3 different videos to choose from |
|
|
- β‘ **Modal Processing** - 4-8x faster with serverless compute |
|
|
- π― **4 MCP Tools** - Quote (Gemini), Video Search, Voice, Video Creation |
|
|
|
|
|
### π Hackathon: MCP 1st Birthday |
|
|
**Track:** Track 2 - MCP in Action |
|
|
**Category:** Productivity Tools |
|
|
**Built with:** Gradio + smolagents + Gemini + OpenAI + Pexels + Modal + ElevenLabs + MCP |
|
|
|
|
|
**Prize Targets:** |
|
|
- Google Gemini Creative Award ($10K) |
|
|
- Modal Innovation Award ($2.5K) |
|
|
- OpenAI API Integration ($1K credits) |
|
|
- ElevenLabs Voice Award (~$2K + AirPods) |
|
|
""") |
|
|
|
|
|
def process_and_display(niche, style, num_variations): |
|
|
status, videos = mcp_agent_pipeline(niche, style, num_variations) |
|
|
|
|
|
|
|
|
v1 = videos[0] if len(videos) > 0 else None |
|
|
v2 = videos[1] if len(videos) > 1 else None |
|
|
v3 = videos[2] if len(videos) > 2 else None |
|
|
|
|
|
|
|
|
gallery_vids = load_gallery_videos() |
|
|
|
|
|
return [status, v1, v2, v3] + gallery_vids |
|
|
|
|
|
generate_btn.click( |
|
|
process_and_display, |
|
|
inputs=[niche, style, num_variations], |
|
|
outputs=[ |
|
|
output, video1, video2, video3, |
|
|
gallery_video1, gallery_video2, gallery_video3, |
|
|
gallery_video4, gallery_video5, gallery_video6 |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
demo.load( |
|
|
load_gallery_videos, |
|
|
outputs=[ |
|
|
gallery_video1, gallery_video2, gallery_video3, |
|
|
gallery_video4, gallery_video5, gallery_video6 |
|
|
] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(allowed_paths=["/data/gallery_videos"]) |
|
|
|