Spaces:
Build error
Build error
| import gradio as gr | |
| import time | |
| import torch | |
| import tempfile | |
| import numpy as np | |
| import scipy.io.wavfile as wavfile | |
| from transformers import AutoProcessor, BarkModel | |
| import whisper | |
| import gradio as gr | |
| import time | |
| import tempfile | |
| import numpy as np | |
| import scipy.io.wavfile as wavfile | |
| import cv2 | |
| import os | |
| import json | |
| from moviepy.editor import VideoFileClip | |
| import shutil | |
| # Bark TTS | |
| model_bark = BarkModel.from_pretrained("suno/bark") | |
| processor_bark = AutoProcessor.from_pretrained("suno/bark") | |
| model_bark.to("cuda" if torch.cuda.is_available() else "cpu") | |
| bark_voice_preset = "v2/en_speaker_6" | |
| def bark_tts(text): | |
| inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset) | |
| inputs = {k: v.to(model_bark.device) for k, v in inputs.items()} | |
| speech_values = model_bark.generate(**inputs) | |
| speech = speech_values.cpu().numpy().squeeze() | |
| speech = (speech * 32767).astype(np.int16) | |
| temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| wavfile.write(temp_wav.name, 22050, speech) | |
| return temp_wav.name | |
| # Whisper STT | |
| whisper_model = whisper.load_model("base") | |
| def whisper_stt(audio_path): | |
| if not audio_path or not os.path.exists(audio_path): return "" | |
| result = whisper_model.transcribe(audio_path) | |
| return result["text"] | |
| # DeepFace (Video Face Emotion) | |
| def ensure_mp4(video_input): | |
| # video_input could be a file-like object, a path, or a Gradio temp path | |
| if isinstance(video_input, str): | |
| input_path = video_input | |
| else: | |
| # It's a file-like object (rare for Gradio video, but handle it) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as temp_in: | |
| temp_in.write(video_input.read()) | |
| input_path = temp_in.name | |
| # If already mp4, return as is | |
| if input_path.endswith(".mp4"): | |
| return input_path | |
| # Convert to mp4 using moviepy | |
| mp4_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| try: | |
| clip = VideoFileClip(input_path) | |
| clip.write_videofile(mp4_path, codec="libx264", audio=False, verbose=False, logger=None) | |
| clip.close() | |
| except Exception as e: | |
| print("Video conversion failed:", e) | |
| # As fallback, just copy original | |
| shutil.copy(input_path, mp4_path) | |
| return mp4_path | |
| def analyze_video_emotions(video_input, sample_rate=15): | |
| # Convert input to an mp4 file OpenCV can process | |
| mp4_path = ensure_mp4(video_input) | |
| if not mp4_path or not os.path.exists(mp4_path): | |
| return "neutral" | |
| cap = cv2.VideoCapture(mp4_path) | |
| frame_count = 0 | |
| emotion_counts = {} | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: break | |
| if frame_count % sample_rate == 0: | |
| try: | |
| result = DeepFace.analyze(frame, actions=['emotion'], enforce_detection=False) | |
| dominant = result[0]["dominant_emotion"] if isinstance(result, list) else result["dominant_emotion"] | |
| emotion_counts[dominant] = emotion_counts.get(dominant, 0) + 1 | |
| except Exception: pass | |
| frame_count += 1 | |
| cap.release() | |
| if not emotion_counts: return "neutral" | |
| return max(emotion_counts.items(), key=lambda x: x[1])[0] | |
| wav2vec_model_name = "HaniaRuby/speech-emotion-recognition-wav2vec2" | |
| wav2vec_processor = Wav2Vec2Processor.from_pretrained(wav2vec_model_name) | |
| wav2vec_model = Wav2Vec2ForSequenceClassification.from_pretrained(wav2vec_model_name) | |
| wav2vec_model.eval() | |
| voice_label_map = { | |
| 0: 'angry', 1: 'disgust', 2: 'fear', 3: 'happy', | |
| 4: 'neutral', 5: 'sad', 6: 'surprise' | |
| } | |
| def analyze_audio_emotion(audio_path): | |
| if not audio_path or not os.path.exists(audio_path): return "neutral" | |
| speech, sr = librosa.load(audio_path, sr=16000) | |
| inputs = wav2vec_processor(speech, sampling_rate=16000, return_tensors="pt") | |
| with torch.no_grad(): | |
| logits = wav2vec_model(**inputs).logits | |
| probs = torch.nn.functional.softmax(logits, dim=-1) | |
| predicted_id = torch.argmax(probs, dim=-1).item() | |
| return voice_label_map.get(predicted_id, "neutral") | |
| # --- Effective confidence calculation | |
| def interpret_confidence(voice_label, face_label, answer_score_label, k=0.2): | |
| emotion_map = {"happy": 0.9, "neutral": 0.6, "surprised": 0.7, "sad": 0.4, "angry": 0.3, "disgust": 0.2, "fear": 0.3, "no_face": 0.5, "unknown": 0.5} | |
| answer_score_map = {"excellent": 1.0, "good": 0.8, "medium": 0.6, "poor": 0.3} | |
| voice_score, face_score, answer_score = emotion_map.get(voice_label, 0.5), emotion_map.get(face_label, 0.5), answer_score_map.get(answer_score_label, 0.5) | |
| avg_emotion = (voice_score + face_score) / 2 | |
| control_bonus = max(0, answer_score - avg_emotion) * k | |
| eff_conf = (0.5 * answer_score + 0.22 * voice_score + 0.18 * face_score + 0.1 * control_bonus) | |
| return {"effective_confidence": round(eff_conf, 3), "answer_score": round(answer_score, 2), "voice_score": round(voice_score, 2), "face_score": round(face_score, 2), "control_bonus": round(control_bonus, 3)} | |
| seniority_mapping = { | |
| "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 | |
| } | |
| import gradio as gr | |
| import time | |
| import tempfile | |
| import numpy as np | |
| import scipy.io.wavfile as wavfile | |
| import cv2 | |
| import os | |
| import json | |
| # --- 2. Gradio App --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| user_data = gr.State({}) | |
| interview_state = gr.State({}) | |
| missing_fields_state = gr.State([]) | |
| # --- UI Layout --- | |
| with gr.Column(visible=True) as user_info_section: | |
| gr.Markdown("## Candidate Information") | |
| cv_file = gr.File(label="Upload CV") | |
| job_desc = gr.Textbox(label="Job Description") | |
| start_btn = gr.Button("Continue", interactive=False) | |
| with gr.Column(visible=False) as missing_section: | |
| gr.Markdown("## Missing Information") | |
| name_in = gr.Textbox(label="Name", visible=False) | |
| role_in = gr.Textbox(label="Job Role", visible=False) | |
| seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) | |
| skills_in = gr.Textbox(label="Skills", visible=False) | |
| submit_btn = gr.Button("Submit", interactive=False) | |
| with gr.Column(visible=False) as interview_pre_section: | |
| pre_interview_greeting_md = gr.Markdown() | |
| start_interview_final_btn = gr.Button("Start Interview") | |
| with gr.Column(visible=False) as interview_section: | |
| gr.Markdown("## Interview in Progress") | |
| question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) | |
| question_text = gr.Markdown() | |
| user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") | |
| user_video_input = gr.Video(sources=["webcam"], label="2. Record Video Answer") | |
| stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") | |
| confirm_btn = gr.Button("Confirm Answer") | |
| evaluation_display = gr.Markdown() | |
| emotion_display = gr.Markdown() | |
| interview_summary = gr.Markdown(visible=False) | |
| # --- UI Logic --- | |
| def validate_start_btn(cv_file, job_desc): | |
| return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) | |
| cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
| job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
| def process_and_route_initial(cv_file, job_desc): | |
| details = extract_candidate_details(cv_file.name) | |
| job_info = extract_job_details(job_desc) | |
| data = { | |
| "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), | |
| "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) | |
| } | |
| missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] | |
| if missing: | |
| return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." | |
| return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) | |
| start_btn.click( | |
| process_and_route_initial, | |
| [cv_file, job_desc], | |
| [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md] | |
| ) | |
| def show_missing(missing): | |
| if missing is None: missing = [] | |
| return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) | |
| missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) | |
| def validate_fields(name, role, seniority, skills, missing): | |
| if not missing: return gr.update(interactive=False) | |
| all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip())),]) | |
| return gr.update(interactive=all_filled) | |
| for inp in [name_in, role_in, seniority_in, skills_in]: | |
| inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) | |
| def complete_manual(data, name, role, seniority, skills): | |
| if data["name"].lower() == "unknown": data["name"] = name | |
| if data["job_role"].lower() == "unknown": data["job_role"] = role | |
| if data["seniority"].lower() == "unknown": data["seniority"] = seniority | |
| if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] | |
| greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." | |
| return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) | |
| submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) | |
| def start_interview(data): | |
| # --- Advanced state with full logging --- | |
| state = { | |
| "questions": [], "answers": [], "face_labels": [], "voice_labels": [], "timings": [], | |
| "question_evaluations": [], "answer_evaluations": [], "effective_confidences": [], | |
| "conversation_history": [], | |
| "difficulty_adjustment": None, | |
| "question_idx": 0, "max_questions": 3, "q_start_time": time.time(), | |
| "log": [] | |
| } | |
| # --- Optionally: context retrieval here (currently just blank) --- | |
| context = "" | |
| prompt = build_interview_prompt( | |
| conversation_history=[], user_response="", context=context, job_role=data["job_role"], | |
| skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, | |
| voice_label="neutral", face_label="neutral" | |
| ) | |
| first_q = groq_llm.predict(prompt) | |
| # Evaluate Q for quality | |
| q_eval = eval_question_quality(first_q, data["job_role"], data["seniority"], None) | |
| state["questions"].append(first_q) | |
| state["question_evaluations"].append(q_eval) | |
| state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) | |
| audio_path = bark_tts(first_q) | |
| # LOG | |
| state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()}) | |
| return state, gr.update(visible=False), gr.update(visible=True), audio_path, f"*Question 1:* {first_q}" | |
| start_interview_final_btn.click(start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text]) | |
| def transcribe(audio_path): | |
| return whisper_stt(audio_path) | |
| user_audio_input.change(transcribe, user_audio_input, stt_transcript) | |
| def process_answer(transcript, audio_path, video_path, state, data): | |
| if not transcript and not video_path: | |
| return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) | |
| state["timings"].append(elapsed) | |
| state["answers"].append(transcript) | |
| state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) | |
| # --- 1. Emotion analysis --- | |
| voice_label = analyze_audio_emotion(audio_path) | |
| face_label = analyze_video_emotions(video_path) | |
| state["voice_labels"].append(voice_label) | |
| state["face_labels"].append(face_label) | |
| # --- 2. Evaluate previous Q and Answer --- | |
| last_q = state["questions"][-1] | |
| q_eval = state["question_evaluations"][-1] # Already in state | |
| ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) | |
| answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) | |
| state["answer_evaluations"].append(answer_eval) | |
| answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" | |
| # --- 3. Adaptive difficulty --- | |
| if answer_score == "excellent": | |
| state["difficulty_adjustment"] = "harder" | |
| elif answer_score in ("medium", "poor"): | |
| state["difficulty_adjustment"] = "easier" | |
| else: | |
| state["difficulty_adjustment"] = None | |
| # --- 4. Effective confidence --- | |
| eff_conf = interpret_confidence(voice_label, face_label, answer_score) | |
| state["effective_confidences"].append(eff_conf) | |
| # --- LOG --- | |
| state["log"].append({ | |
| "type": "answer", | |
| "question": last_q, | |
| "answer": transcript, | |
| "answer_eval": answer_eval, | |
| "ref_answer": ref_answer, | |
| "face_label": face_label, | |
| "voice_label": voice_label, | |
| "effective_confidence": eff_conf, | |
| "timing": elapsed, | |
| "timestamp": time.time() | |
| }) | |
| # --- Next or End --- | |
| qidx = state["question_idx"] + 1 | |
| if qidx >= state["max_questions"]: | |
| # Save as JSON (optionally) | |
| timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| log_file = f"interview_log_{timestamp}.json" | |
| with open(log_file, "w", encoding="utf-8") as f: | |
| json.dump(state["log"], f, indent=2, ensure_ascii=False) | |
| # Report | |
| summary = "# Interview Summary\n" | |
| for i, q in enumerate(state["questions"]): | |
| summary += (f"\n### Q{i + 1}: {q}\n" | |
| f"- *Answer*: {state['answers'][i]}\n" | |
| f"- *Q Eval*: {state['question_evaluations'][i]}\n" | |
| f"- *A Eval*: {state['answer_evaluations'][i]}\n" | |
| f"- *Face Emotion: {state['face_labels'][i]}, **Voice Emotion*: {state['voice_labels'][i]}\n" | |
| f"- *Effective Confidence*: {state['effective_confidences'][i]['effective_confidence']}\n" | |
| f"- *Time*: {state['timings'][i]}s\n") | |
| summary += f"\n\n⏺ Full log saved as {log_file}." | |
| return (state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}")) | |
| else: | |
| # --- Build next prompt using adaptive difficulty --- | |
| state["question_idx"] = qidx | |
| state["q_start_time"] = time.time() | |
| context = "" # You can add your context logic here | |
| prompt = build_interview_prompt( | |
| conversation_history=state["conversation_history"], | |
| user_response=transcript, | |
| context=context, | |
| job_role=data["job_role"], | |
| skills=data["skills"], | |
| seniority=data["seniority"], | |
| difficulty_adjustment=state["difficulty_adjustment"], | |
| face_label=face_label, | |
| voice_label=voice_label, | |
| effective_confidence=eff_conf | |
| ) | |
| next_q = groq_llm.predict(prompt) | |
| # Evaluate Q quality | |
| q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) | |
| state["questions"].append(next_q) | |
| state["question_evaluations"].append(q_eval) | |
| state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) | |
| state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) | |
| audio_path = bark_tts(next_q) | |
| # Display evaluations | |
| eval_md = f"*Last Answer Eval:* {answer_eval}\n\n*Effective Confidence:* {eff_conf}" | |
| return ( | |
| state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", | |
| gr.update(value=None), gr.update(value=None), | |
| gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}"), | |
| ) | |
| confirm_btn.click( | |
| process_answer, | |
| [stt_transcript, user_audio_input, user_video_input, interview_state, user_data], | |
| [interview_state, interview_summary, question_audio, question_text, user_audio_input, user_video_input, emotion_display] | |
| ).then( | |
| lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, user_video_input] | |
| ) | |
| demo.launch() |