FaceSwapAll / app.py
Deepro Bardhan
added ffmeg functionality
655b242
raw
history blame
14 kB
import gradio as gr
import os
import cv2
import numpy as np
import shutil
import subprocess
from SinglePhoto import FaceSwapper
wellcomingMessage = """
<h1>Face Swapping Suite</h1>
<p>All-in-one face swapping: single photo, video, multi-source, and multi-destination!</p>
"""
swapper = FaceSwapper()
def swap_single_photo(src_img, src_idx, dst_img, dst_idx):
log = ""
try:
src_path = "SinglePhoto/data_src.jpg"
dst_path = "SinglePhoto/data_dst.jpg"
output_path = "SinglePhoto/output_swapped.jpg"
os.makedirs(os.path.dirname(src_path), exist_ok=True)
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(src_path, src_img_bgr)
cv2.imwrite(dst_path, dst_img_bgr)
log += f"Saved source to {src_path}, destination to {dst_path}\n"
result = swapper.swap_faces(src_path, int(src_idx), dst_path, int(dst_idx))
cv2.imwrite(output_path, result)
log += f"Swapped and saved result to {output_path}\n"
try:
if os.path.exists(src_path):
os.remove(src_path)
if os.path.exists(dst_path):
os.remove(dst_path)
log += "Cleaned up temp files.\n"
except Exception as cleanup_error:
log += f"Cleanup error: {cleanup_error}\n"
return output_path, log
except Exception as e:
log += f"Error: {e}\n"
return None, log
def swap_video(src_img, src_idx, video, dst_idx):
log = ""
src_path = "VideoSwapping/data_src.jpg"
dst_video_path = "VideoSwapping/data_dst.mp4"
frames_dir = "VideoSwapping/video_frames"
swapped_dir = "VideoSwapping/swapped_frames"
output_video_path = "VideoSwapping/output_tmp_output_video.mp4"
final_output_path = "VideoSwapping/output_with_audio.mp4"
os.makedirs(os.path.dirname(src_path), exist_ok=True)
os.makedirs(os.path.dirname(dst_video_path), exist_ok=True)
os.makedirs(frames_dir, exist_ok=True)
os.makedirs(swapped_dir, exist_ok=True)
src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(src_path, src_img_bgr)
log += f"Saved source image to {src_path}\n"
if isinstance(video, str) and os.path.exists(video):
shutil.copy(video, dst_video_path)
log += f"Copied video to {dst_video_path}\n"
else:
dst_video_path = video
from VideoSwapping import extract_frames, frames_to_video
frame_paths = extract_frames(dst_video_path, frames_dir)
log += f"Extracted {len(frame_paths)} frames to {frames_dir}\n"
for idx, frame_path in enumerate(frame_paths):
out_path = os.path.join(swapped_dir, f"swapped_{idx:05d}.jpg")
try:
swapped = swapper.swap_faces(src_path, int(src_idx), frame_path, int(dst_idx))
cv2.imwrite(out_path, swapped)
log += f"Swapped frame {idx} and saved to {out_path}\n"
except Exception as e:
cv2.imwrite(out_path, cv2.imread(frame_path))
log += f"Failed to swap frame {idx}: {e}\n"
cap = cv2.VideoCapture(dst_video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
frames_to_video(swapped_dir, output_video_path, fps)
log += f"Combined swapped frames into video {output_video_path}\n"
# Add audio from original video
ok, audio_log = add_audio_to_video(dst_video_path, output_video_path, final_output_path)
if ok:
log += f"Added audio to {final_output_path}\n"
else:
log += f"Audio muxing failed: {audio_log}\n"
final_output_path = output_video_path # fallback to video without audio
try:
if os.path.exists(src_path):
os.remove(src_path)
if os.path.exists(dst_video_path):
os.remove(dst_video_path)
if os.path.exists(frames_dir):
shutil.rmtree(frames_dir)
if os.path.exists(swapped_dir):
shutil.rmtree(swapped_dir)
log += "Cleaned up temp files and folders.\n"
except Exception as cleanup_error:
log += f"Cleanup error: {cleanup_error}\n"
return final_output_path, log
def add_audio_to_video(original_video_path, video_no_audio_path, output_path):
"""
Uses ffmpeg to mux audio from original_video_path into video_no_audio_path.
"""
cmd = [
"ffmpeg",
"-y",
"-i", video_no_audio_path,
"-i", original_video_path,
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0",
"-map", "1:a:0?",
"-shortest",
output_path
]
try:
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True, ""
except subprocess.CalledProcessError as e:
return False, e.stderr.decode()
def swap_multi_src_single_dst(src_imgs, dst_img, dst_idx):
log = ""
results = []
src_dir = "MultiSrcSingleDst/src"
dst_dir = "MultiSrcSingleDst/dst"
output_dir = "MultiSrcSingleDst/output"
os.makedirs(src_dir, exist_ok=True)
os.makedirs(dst_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
if isinstance(dst_img, tuple):
dst_img = dst_img[0]
dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR)
dst_path = os.path.join(dst_dir, "data_dst.jpg")
cv2.imwrite(dst_path, dst_img_bgr)
log += f"Saved destination image to {dst_path}\n"
for i, src_img in enumerate(src_imgs):
if isinstance(src_img, tuple):
src_img = src_img[0]
src_path = os.path.join(src_dir, f"data_src_{i}.jpg")
output_path = os.path.join(output_dir, f"output_swapped_{i}.jpg")
src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(src_path, src_img_bgr)
log += f"Saved source image {i} to {src_path}\n"
try:
result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx))
cv2.imwrite(output_path, result)
results.append(output_path)
log += f"Swapped and saved result to {output_path}\n"
except Exception as e:
results.append(f"Error: {e}")
log += f"Error swapping source {i}: {e}\n"
return results, log
def swap_multi_src_multi_dst(src_imgs, dst_imgs, dst_indices):
log = ""
results = []
src_dir = "MultiSrcMultiDst/src"
dst_dir = "MultiSrcMultiDst/dst"
output_dir = "MultiSrcMultiDst/output"
os.makedirs(src_dir, exist_ok=True)
os.makedirs(dst_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
if isinstance(dst_indices, str):
dst_indices_list = [int(idx.strip()) for idx in dst_indices.split(",") if idx.strip().isdigit()]
else:
dst_indices_list = [int(idx) for idx in dst_indices]
for i, src_img in enumerate(src_imgs):
if isinstance(src_img, tuple):
src_img = src_img[0]
if src_img is None:
results.append(f"Error: Source image at index {i} is None")
log += f"Source image at index {i} is None\n"
continue
src_path = os.path.join(src_dir, f"data_src_{i}.jpg")
if isinstance(src_img, np.ndarray):
src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(src_path, src_img_bgr)
log += f"Saved source image {i} to {src_path}\n"
elif isinstance(src_img, str) and os.path.exists(src_img):
shutil.copy(src_img, src_path)
log += f"Copied source image {i} from {src_img} to {src_path}\n"
else:
results.append(f"Error: Invalid source image at index {i}")
log += f"Invalid source image at index {i}\n"
continue
for j, dst_img in enumerate(dst_imgs):
if isinstance(dst_img, tuple):
dst_img = dst_img[0]
if dst_img is None:
results.append(f"Error: Destination image at index {j} is None")
log += f"Destination image at index {j} is None\n"
continue
dst_path = os.path.join(dst_dir, f"data_dst_{j}.jpg")
output_path = os.path.join(output_dir, f"output_swapped_{i}_{j}.jpg")
if isinstance(dst_img, np.ndarray):
dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(dst_path, dst_img_bgr)
log += f"Saved destination image {j} to {dst_path}\n"
elif isinstance(dst_img, str) and os.path.exists(dst_img):
shutil.copy(dst_img, dst_path)
log += f"Copied destination image {j} from {dst_img} to {dst_path}\n"
else:
results.append(f"Error: Invalid destination image at index {j}")
log += f"Invalid destination image at index {j}\n"
continue
try:
dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1
result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx))
cv2.imwrite(output_path, result)
results.append(output_path)
log += f"Swapped src {i} with dst {j} and saved to {output_path}\n"
except Exception as e:
results.append(f"Error: {e}")
log += f"Error swapping src {i} with dst {j}: {e}\n"
return results, log
def swap_single_src_multi_dst(src_img, dst_imgs, dst_indices):
log = ""
results = []
src_dir = "SingleSrcMultiDst/src"
dst_dir = "SingleSrcMultiDst/dst"
output_dir = "SingleSrcMultiDst/output"
os.makedirs(src_dir, exist_ok=True)
os.makedirs(dst_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
if isinstance(src_img, tuple):
src_img = src_img[0]
src_path = os.path.join(src_dir, "data_src.jpg")
src_img_bgr = cv2.cvtColor(src_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(src_path, src_img_bgr)
log += f"Saved source image to {src_path}\n"
if isinstance(dst_indices, str):
dst_indices_list = [int(idx.strip()) for idx in dst_indices.split(",") if idx.strip().isdigit()]
else:
dst_indices_list = [int(idx) for idx in dst_indices]
for j, dst_img in enumerate(dst_imgs):
if isinstance(dst_img, tuple):
dst_img = dst_img[0]
dst_path = os.path.join(dst_dir, f"data_dst_{j}.jpg")
output_path = os.path.join(output_dir, f"output_swapped_{j}.jpg")
dst_img_bgr = cv2.cvtColor(dst_img, cv2.COLOR_RGB2BGR)
cv2.imwrite(dst_path, dst_img_bgr)
log += f"Saved destination image {j} to {dst_path}\n"
try:
dst_idx = dst_indices_list[j] if j < len(dst_indices_list) else 1
result = swapper.swap_faces(src_path, 1, dst_path, int(dst_idx))
cv2.imwrite(output_path, result)
results.append(output_path)
log += f"Swapped and saved result to {output_path}\n"
except Exception as e:
results.append(f"Error: {e}")
log += f"Error swapping with destination {j}: {e}\n"
return results, log
with gr.Blocks() as demo:
gr.Markdown(wellcomingMessage)
with gr.Tab("Single Photo Swapping"):
gr.Interface(
fn=swap_single_photo,
inputs=[
gr.Image(label="Source Image"),
gr.Number(value=1, label="Source Face Index"),
gr.Image(label="Destination Image"),
gr.Number(value=1, label="Destination Face Index"),
],
outputs=[
gr.Image(label="Swapped Image"),
gr.Textbox(label="Log Output", lines=8, interactive=False)
],
)
with gr.Tab("Video Swapping"):
gr.Interface(
fn=swap_video,
inputs=[
gr.Image(label="Source Image"),
gr.Number(value=1, label="Source Face Index"),
gr.Video(label="Target Video"),
gr.Number(value=1, label="Destination Face Index"),
],
outputs=[
gr.Video(label="Swapped Video"),
gr.Textbox(label="Log Output", lines=8, interactive=False)
],
)
with gr.Tab("SingleSrc MultiDst"):
gr.Interface(
fn=swap_single_src_multi_dst,
inputs=[
gr.Image(label="Source Image"),
gr.Gallery(label="Destination Images", type="numpy", columns=3),
gr.Textbox(label="Destination Face Indices (comma-separated, e.g. 1,1,2)"),
],
outputs=[
gr.Gallery(label="Swapped Images"),
gr.Textbox(label="Log Output", lines=8, interactive=False)
],
)
with gr.Tab("MultiSrc SingleDst"):
gr.Interface(
fn=swap_multi_src_single_dst,
inputs=[
gr.Gallery(label="Source Images", type="numpy", columns=3),
gr.Image(label="Destination Image"),
gr.Number(value=1, label="Destination Face Index"),
],
outputs=[
gr.Gallery(label="Swapped Images"),
gr.Textbox(label="Log Output", lines=8, interactive=False)
],
)
with gr.Tab("MultiSrc MultiDst"):
gr.Interface(
fn=swap_multi_src_multi_dst,
inputs=[
gr.Gallery(label="Source Images", type="numpy", columns=3),
gr.Gallery(label="Destination Images", type="numpy", columns=3),
gr.Textbox(label="Destination Face Indices (comma-separated, e.g. 1,1,2)"),
],
outputs=[
gr.Gallery(label="Swapped Images"),
gr.Textbox(label="Log Output", lines=8, interactive=False)
],
)
if __name__ == "__main__":
demo.launch()