import json import os import random from gradio import ChatMessage from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, ToolMessage from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode from support.log_manager import logger from support.my_tools import captain_agent_tools, boss_agent_tools from support.tools.boss_agent_tools import create_fake_tool_call from support.tools.captain_agent_tools import create_agent_fake_tool_call from support.prompts import captain_agent_system_prompt, player_agent_system_prompt, boss_agent_system_prompt from typing import Annotated, List, Literal, Optional from typing_extensions import TypedDict class State(TypedDict): messages: Annotated[list, add_messages] chat_history: List[str] round_messages: Annotated[list, add_messages] original_board: dict board: dict players: List # Player type current_team: Literal['red', 'blue'] current_role: str turn: int last_user_message: str guesses: List[str] clue: Optional[str] clue_number: Optional[int] next_team: Literal['red', 'blue'] history_guessed_words: List[str] human_clue: str human_clue_number: int teams_reviewed: List[str] end_round: bool winner_and_score: tuple red_boss_is_called_counter: int blue_boss_is_called_counter: int red_captain_is_called_counter: int blue_captain_is_called_counter: int class MyGraph: def __init__(self): self.red_team = self._create_red_team_graph() self.blue_team = self._create_blue_team_graph() self.graph = self._create_graph() self.players = [] self.board = None self.guessed_words = [] self.current_team = "" self.chat_history = [] self.winners = [] self.IS_HUMAN_PLAYING = None def _create_red_team_graph(self): """Compile red team subgraph""" builder = StateGraph(State) # Core team nodes builder.add_node("red_boss", self.red_boss) builder.add_node("red_captain", self.red_captain) builder.add_node("red_agent_1", self.red_agent_1) builder.add_node("red_agent_2", self.red_agent_2) builder.add_node("red_boss_is_called", self.red_boss_is_called) builder.add_node("red_captain_is_called", self.red_captain_is_called) # Tool nodes choose_word_tool = ToolNode(tools=[boss_agent_tools[0]]) final_choice = ToolNode(tools=[captain_agent_tools[0]]) transfer_to_agent_1 = ToolNode(tools=[captain_agent_tools[1]]) transfer_to_agent_2 = ToolNode(tools=[captain_agent_tools[2]]) builder.add_node("choose_word_tool", choose_word_tool) builder.add_node("final_choice", final_choice) builder.add_node("transfer_to_agent_1", transfer_to_agent_1) builder.add_node("transfer_to_agent_2", transfer_to_agent_2) builder.add_node("update_turn", self.update_turn) # Team flow builder.add_edge(START, "red_boss") builder.add_conditional_edges( "red_boss", self.boss_choice, { "choose_word_tool": "choose_word_tool", "red_boss_is_called": "red_boss_is_called", }, ) builder.add_edge("red_boss_is_called", "red_boss") builder.add_edge("choose_word_tool", "red_captain") builder.add_conditional_edges( "red_captain", self.should_continue, { "final_choice": "final_choice", "transfer_to_agent_1": "transfer_to_agent_1", "transfer_to_agent_2": "transfer_to_agent_2", "red_captain_is_called": "red_captain_is_called", }, ) builder.add_edge("red_captain_is_called", "red_captain") builder.add_edge("transfer_to_agent_1", "red_agent_1") builder.add_edge("transfer_to_agent_2", "red_agent_2") builder.add_edge("red_agent_1", "red_captain") builder.add_edge("red_agent_2", "red_captain") builder.add_edge("final_choice", "update_turn") builder.add_edge("update_turn", END) return builder.compile() def _create_blue_team_graph(self): """Compile blue team subgraph""" builder = StateGraph(State) # Core team nodes builder.add_node("blue_boss", self.blue_boss) builder.add_node("blue_captain", self.blue_captain) builder.add_node("blue_agent_1", self.blue_agent_1) builder.add_node("blue_agent_2", self.blue_agent_2) builder.add_node("blue_boss_is_called", self.blue_boss_is_called) builder.add_node("blue_captain_is_called", self.blue_captain_is_called) # Tool nodes choose_word_tool = ToolNode(tools=[boss_agent_tools[0]]) final_choice = ToolNode(tools=[captain_agent_tools[0]]) transfer_to_agent_1 = ToolNode(tools=[captain_agent_tools[1]]) transfer_to_agent_2 = ToolNode(tools=[captain_agent_tools[2]]) builder.add_node("choose_word_tool", choose_word_tool) builder.add_node("final_choice", final_choice) builder.add_node("transfer_to_agent_1", transfer_to_agent_1) builder.add_node("transfer_to_agent_2", transfer_to_agent_2) builder.add_node("update_turn", self.update_turn) # Team flow builder.add_edge(START, "blue_boss") builder.add_conditional_edges( "blue_boss", self.boss_choice, { "choose_word_tool": "choose_word_tool", "blue_boss_is_called": "blue_boss_is_called", }, ) builder.add_edge("blue_boss_is_called", "blue_boss") builder.add_edge("choose_word_tool", "blue_captain") builder.add_conditional_edges( "blue_captain", self.should_continue, { "final_choice": "final_choice", "transfer_to_agent_1": "transfer_to_agent_1", "transfer_to_agent_2": "transfer_to_agent_2", "blue_captain_is_called": "blue_captain_is_called", }, ) builder.add_edge("blue_captain_is_called", "blue_captain") builder.add_edge("transfer_to_agent_1", "blue_agent_1") builder.add_edge("transfer_to_agent_2", "blue_agent_2") builder.add_edge("blue_agent_1", "blue_captain") builder.add_edge("blue_agent_2", "blue_captain") builder.add_edge("final_choice", "update_turn") builder.add_edge("update_turn", END) return builder.compile() def _create_graph(self) -> StateGraph: """Create and compile the graph.""" builder = StateGraph(State) red_graph = self._create_red_team_graph() blue_graph = self._create_blue_team_graph() builder.add_node("judge", self.judge) # builder.add_node("red_team", lambda s: self.call_team(s, "red")) # builder.add_node("blue_team", lambda s: self.call_team(s, "blue")) builder.add_node("red_team", red_graph) builder.add_node("blue_team", blue_graph) builder.add_edge(START, "judge") builder.add_conditional_edges( "judge", self.route_after_judge, ["red_team", "blue_team", END], ) builder.add_edge("red_team", "judge") builder.add_edge("blue_team", "judge") graph = builder.compile() # Optional visualization if not os.path.exists("graph.png"): try: img = graph.get_graph(xray=True).draw_mermaid_png() with open("graph.png", "wb") as f: f.write(img) except Exception as e: logger.error(f"[GRAPH IMAGE ERROR]: {e}") return graph # --- Agent Node Implementations --- async def red_boss(self, state: State): """Red team boss gives a clue""" logger.info("[RED BOSS] MOMENT ") boss = next((p for p in state["players"] if p.team == "red" and p.role == "boss"), None) if not boss: return state board = state["board"] team_name = state["current_team"].upper() formatted_boss_system_prompt = boss_agent_system_prompt.format(team_name) chat_history = state["chat_history"] # formatted_history = "\n".join([ # entry.get("content", "") # for entry in chat_history # ]) formatted_history, current_round_messages = self._split_chat_history(chat_history) self.current_team = state["current_team"] new_message = [ { "role": "system", "content": formatted_boss_system_prompt }, { "role": "user", "content": f""" Keep in mind the history of the game so far:\n [HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n Here is the current board:\n Red words: {", ".join(board['red'])}\n Blue words: {", ".join(board['blue'])}\n Neutral words: {", ".join(board['neutral'])}\n Killer word: {board['killer']}\n\n Based on this board, provide a clue and a number of words that relate to that clue. """ } ] if self.IS_HUMAN_PLAYING and boss.model_name == "Human brain": clue_ = state['human_clue'] clue_number = state['human_clue_number'] answer = create_fake_tool_call(clue_, clue_number) else: if boss.model_name == "claude-sonnet-4-5-20250929": llm_with_tools = boss.model.bind_tools(boss_agent_tools) else: llm_with_tools = boss.model.bind_tools(boss_agent_tools, tool_choice="ChooseWord") answer = await llm_with_tools.ainvoke(new_message) logger.info(f"[RED BOSS ANSWER]: {answer}") chat_entry, clue, clue_number, _ = self._format_chat_entry(boss, answer) return { "messages": [answer], "current_role": "captain", "chat_history": state.get("chat_history", []) + [chat_entry], "clue": clue, "clue_number": clue_number, } async def red_captain(self, state: State): """Red team captain coordinates guessing""" logger.info("°°°"*50) logger.info(f"[RED CAPTAIN] State clue: {state['clue']}") captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] if not captain: return state board = state["board"] available_words = ( board["red"] + board["blue"] + board["neutral"] + [board["killer"]] ) random.shuffle(available_words) logger.info(f"AVAILABLE WORDS: {available_words}") chat_history = state["chat_history"] # formatted_history = "\n".join([ # entry.get("content", "") # for entry in chat_history # ]) formatted_history, current_round_messages = self._split_chat_history(chat_history) team_name = state["current_team"].upper() formatted_captain_system_prompt = captain_agent_system_prompt.format( team_name, agents[0].name, agents[1].name ) new_message = [ { "role": "system", "content": formatted_captain_system_prompt }, { "role": "user", "content": f""" Consider the history of the game so far:\n[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n Here is the list of words on the board:\n{', '.join(available_words)}\n\n This is what your boss said: '{state['clue']}' {state['clue_number']}, suggest which words to guess. """ } ] llm_with_tools = captain.model.bind_tools( captain_agent_tools, ) if state['red_captain_is_called_counter'] > 4: logger.info("Creating fake answer to stop loop") answer = create_agent_fake_tool_call() else: logger.info(f"red_captain_is_called_counter: {state['red_captain_is_called_counter']}") answer = await llm_with_tools.ainvoke(new_message) logger.info(f"[RED CAPTAIN ANSWER]: {answer}") logger.info("°°°"*50) chat_entry, _, _, guesses = self._format_chat_entry(captain, answer) return { "messages": [answer], "chat_history": state.get("chat_history", []) + [chat_entry], "guesses": guesses } async def red_agent_1(self, state: State): """Red team agent 1 discusses the clue""" logger.info("---"*50) logger.info("[RED AGENT 1]") logger.info("MESSAGES") logger.info(state['messages']) logger.info("---"*20) chat_history = state["chat_history"] # formatted_history = "\n".join([ # entry.get("content", "") # for entry in chat_history # ]) formatted_history, current_round_messages = self._split_chat_history(chat_history) captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] if not agents: return state agent = agents[0] board = state["board"] available_words = ( board["red"] + board["blue"] + board["neutral"] + [board["killer"]] ) random.shuffle(available_words) team_name = state["current_team"].upper() formatted_player_system_prompt = player_agent_system_prompt.format( team_name, captain.name, agents[1].name ) new_message = [ { "role": "system", "content": formatted_player_system_prompt }, { "role": "user", "content": f""" [HISTORY]\n{formatted_history}\n[END HISTORY]\n\n [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n Available words: {', '.join(available_words)} """ } ] answer = await agent.model.ainvoke(new_message) chat_entry, _, _, _ = self._format_chat_entry(agent, answer) logger.info(['RED AGENT 1 ANSWER']) logger.info(answer) return { "messages": [answer], "chat_history": state.get("chat_history", []) + [chat_entry] } async def red_agent_2(self, state: State): """Red team agent 2 discusses the clue""" logger.info(f"[RED AGENT 2] State clue: {state['clue']}") chat_history = state["chat_history"] # formatted_history = "\n".join([ # entry.get("content", "") # for entry in chat_history # ]) formatted_history, current_round_messages = self._split_chat_history(chat_history) captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] if len(agents) < 2: return state agent = agents[1] board = state["board"] available_words = ( board["red"] + board["blue"] + board["neutral"] + [board["killer"]] ) random.shuffle(available_words) team_name = state["current_team"].upper() formatted_player_system_prompt = player_agent_system_prompt.format( team_name, captain.name, agents[0].name ) new_message = [ { "role": "system", "content": formatted_player_system_prompt }, { "role": "user", "content": f""" [HISTORY]\n{formatted_history}\n[END HISTORY]\n\n [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n Available words: {', '.join(available_words)} """ } ] answer = await agent.model.ainvoke(new_message) chat_entry, _, _, _ = self._format_chat_entry(agent, answer) logger.info(['RED AGENT 2 ANSWER']) logger.info(answer) return { "messages": [answer], "chat_history": state.get("chat_history", []) + [chat_entry] } async def blue_boss(self, state: State): """Blue team boss gives a clue""" logger.info("[BLUE BOSS MOMENT]") chat_history = state["chat_history"] # formatted_history = "\n".join([ # entry.get("content", "") # for entry in chat_history # ]) formatted_history, current_round_messages = self._split_chat_history(chat_history) boss = next((p for p in state["players"] if p.team == "blue" and p.role == "boss"), None) if not boss: return state board = state["board"] team_name = state["current_team"].upper() formatted_boss_system_prompt = boss_agent_system_prompt.format(team_name) self.current_team = state["current_team"] new_message = [ { "role": "system", "content": formatted_boss_system_prompt }, { "role": "user", "content": f""" Keep in mind the history of the game so far:\n [HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n Here is the current board:\n Red words: {", ".join(board['red'])}\n Blue words: {", ".join(board['blue'])}\n Neutral words: {", ".join(board['neutral'])}\n Killer word: {board['killer']}\n\n Based on this board, provide a clue and a number of words that relate to that clue. """ } ] if self.IS_HUMAN_PLAYING and boss.model_name == "Human brain": clue_ = state['human_clue'] clue_number = state['human_clue_number'] answer = create_fake_tool_call(clue_, clue_number) else: if boss.model_name == "claude-sonnet-4-5-20250929": llm_with_tools = boss.model.bind_tools(boss_agent_tools) else: llm_with_tools = boss.model.bind_tools(boss_agent_tools, tool_choice="ChooseWord") answer = await llm_with_tools.ainvoke(new_message) logger.info(f"[BLUE BOSS ANSWER] : {answer}") chat_entry, clue, clue_number, _ = self._format_chat_entry(boss, answer) return { "messages": [answer], "current_role": "captain", "chat_history": state.get("chat_history", []) + [chat_entry], "clue": clue, "clue_number": clue_number, } async def blue_captain(self, state: State): """Blue team captain coordinates guessing""" logger.info("°°°"*50) logger.info(f"[BLUE CAPTAIN] State clue: {state['clue']}") captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] if not captain: return state board = state["board"] chat_history = state["chat_history"] # formatted_history = "\n".join([ # entry.get("content", "") # for entry in chat_history # ]) formatted_history, current_round_messages = self._split_chat_history(chat_history) available_words = ( board["red"] + board["blue"] + board["neutral"] + [board["killer"]] ) random.shuffle(available_words) team_name = state["current_team"].upper() formatted_captain_system_prompt = captain_agent_system_prompt.format( team_name, agents[0].name, agents[1].name ) new_message = [ { "role": "system", "content": formatted_captain_system_prompt }, { "role": "user", "content": f""" Consider the history of the game so far:\n[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n Here is the list of words on the board:\n{', '.join(available_words)}\n\n This is what your boss said: '{state['clue']}' {state['clue_number']}, suggest which words to guess. """ } ] llm_with_tools = captain.model.bind_tools( captain_agent_tools, ) if state['blue_captain_is_called_counter'] > 4: logger.info("Creating fake answer to stop loop") answer = create_agent_fake_tool_call() else: logger.info(f"blue_captain_is_called_counter: {state['blue_captain_is_called_counter']}") answer = await llm_with_tools.ainvoke(new_message) logger.info(f"[BLUE CAPTAIN ANSWER] : {answer}") logger.info("°°°"*50) chat_entry, _, _, guesses = self._format_chat_entry(captain, answer) return { "messages": [answer], "chat_history": state.get("chat_history", []) + [chat_entry], "guesses": guesses } async def blue_agent_1(self, state: State): """Blue team agent 1 discusses the clue""" logger.info("---"*50) logger.info("[BLUE AGENT 1]") chat_history = state["chat_history"] # formatted_history = "\n".join([ # entry.get("content", "") # for entry in chat_history # ]) formatted_history, current_round_messages = self._split_chat_history(chat_history) captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] if not agents: return state agent = agents[0] board = state["board"] available_words = ( board["red"] + board["blue"] + board["neutral"] + [board["killer"]] ) random.shuffle(available_words) team_name = state["current_team"].upper() formatted_player_system_prompt = player_agent_system_prompt.format( team_name, captain.name, agents[1].name ) new_message = [ { "role": "system", "content": formatted_player_system_prompt }, { "role": "user", "content": f""" [HISTORY]\n{formatted_history}\n[END HISTORY]\n\n [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n Available words: {', '.join(available_words)} """ } ] answer = await agent.model.ainvoke(new_message) logger.info(['BLUE AGENT 1 ANSWER']) logger.info(answer) chat_entry, _, _, _ = self._format_chat_entry(agent, answer) return { "messages": [answer], "chat_history": state.get("chat_history", []) + [chat_entry] } async def blue_agent_2(self, state: State): """Blue team agent 2 discusses the clue""" logger.info("---"*50) logger.info("[BLUE AGENT 2]") chat_history = state["chat_history"] # formatted_history = "\n".join([ # entry.get("content", "") # for entry in chat_history # ]) formatted_history, current_round_messages = self._split_chat_history(chat_history) captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] if not agents: return state agent = agents[1] board = state["board"] available_words = ( board["red"] + board["blue"] + board["neutral"] + [board["killer"]] ) random.shuffle(available_words) team_name = state["current_team"].upper() formatted_player_system_prompt = player_agent_system_prompt.format( team_name, captain.name, agents[0].name ) new_message = [ { "role": "system", "content": formatted_player_system_prompt }, { "role": "user", "content": f""" [HISTORY]\n{formatted_history}\n[END HISTORY]\n\n [CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n Available words: {', '.join(available_words)} """ } ] answer = await agent.model.ainvoke(new_message) logger.info(['BLUE AGENT 2 ANSWER']) logger.info(answer) chat_entry, _, _, _ = self._format_chat_entry(agent, answer) return { "messages": [answer], "chat_history": state.get("chat_history", []) + [chat_entry] } def update_turn(self, state: State): """Update turn counter""" turn = state.get("turn", 1) return { "turn": turn+1 } def red_boss_is_called(self, state: State): """Update turn counter""" _counter = state.get("red_boss_is_called_counter", 1) return { "red_boss_is_called_counter": _counter+1 } def blue_boss_is_called(self, state: State): """Update turn counter""" _counter = state.get("blue_boss_is_called_counter", 1) return { "blue_boss_is_called_counter": _counter+1 } def red_captain_is_called(self, state: State): """Update turn counter""" _counter = state.get("red_captain_is_called_counter", 1) return { "red_captain_is_called_counter": _counter+1 } def blue_captain_is_called(self, state: State): """Update turn counter""" _counter = state.get("blue_captain_is_called_counter", 1) return { "blue_captain_is_called_counter": _counter+1 } def judge(self, state: State): """Evaluate guesses, update board, check win/lose conditions.""" # Helper function to check win conditions def check_win_condition(): """Returns (is_game_over, winner, win_message) tuple""" red_remaining = len(board.get("red", [])) blue_remaining = len(board.get("blue", [])) if red_remaining == 0: return True, "red", (red_remaining, blue_remaining) if blue_remaining == 0: return True, "blue", (red_remaining, blue_remaining) return False, None, None # Helper function to create multiple messages def create_multi_message_state(messages_content_list, next_team=None, switch_role=False, winner_and_score=None): """Creates state with multiple messages""" messages = [] chat_entries = [] for content, title in messages_content_list: message = AIMessage( content=content, metadata={"title": title, "sender": "judge"} ) messages.append(message) if title == "⚖️ Judge Decision": info = "chat_history" else: info = None chat_entry, _, _, _ = self._format_chat_entry(None, message, info=info) chat_entries.append(chat_entry) logger.info("****" * 50) filtered_chat = self._filter_important_messages(state.get("chat_history", [])) logger.info("**"*50) logger.info("FILTERED CHAT") logger.info(filtered_chat) logger.info("**"*50) end_state = { "messages": messages, "chat_history": filtered_chat + chat_entries, "board": board, "guesses": [], "history_guessed_words": history_guessed_words, "teams_reviewed": teams_reviewed, "end_round": end_round, "winner_and_score": winner_and_score } if next_team: end_state.update({ "current_team": next_team, "current_role": "boss" if switch_role else state.get("current_role"), "turn": state.get("turn", 1) + 1, "next_team": next_team, }) return end_state # Helper function to create turn end messages def create_turn_end_messages(results_list, next_team): """Creates separate messages for results and turn transition""" team_emoji = "🔵" if next_team == "blue" else "🔴" results_text = "\n".join(results_list) red_remaining = len(board.get("red", [])) blue_remaining = len(board.get("blue", [])) # Message 1: Results results_message = results_text # Message 2: Turn transition transition_message = ( f"🔄 **TURN COMPLETE**\n\n" f"{team_emoji} **{next_team.upper()} TEAM'S TURN** now begins!\n\n" f"**Remaining Words:**\n" f"🔴 Red: {red_remaining}\n" f"🔵 Blue: {blue_remaining}" ) return [ (results_message, "⚖️ Judge Decision"), (transition_message, "🔄 Turn Transition") ] # Helper function to create round end messages def create_round_end_messages(results_list, next_team): """Creates separate messages for results and round transition""" team_emoji = "🔵" if next_team == "blue" else "🔴" results_text = "\n".join(results_list) red_remaining = len(board.get("red", [])) blue_remaining = len(board.get("blue", [])) # Message 1: Results results_message = results_text # Message 2: Round transition transition_message = ( f"🎯 **ROUND COMPLETE!**\n\n" f"Both teams have played their turn.\n\n" f"{team_emoji} **{next_team.upper()} TEAM** starts the next round!\n\n" f"**Score Update:**\n" f"🔴 Red Team: {red_remaining} words remaining\n" f"🔵 Blue Team: {blue_remaining} words remaining\n\n" f"Let's keep going! 💪" ) return [ (results_message, "⚖️ Judge Decision"), (transition_message, "🎯 Round Complete") ] # Helper function to create game over messages def create_game_over_messages(results_list, winner, scores, reason="normal"): """Creates separate messages for results and game over""" results_text = "\n".join(results_list) red_remaining, blue_remaining = scores winner_emoji = "🔴" if winner == "red" else "🔵" # Message 1: Results results_message = results_text # Message 2: Game over if reason == "killer": loser = "red" if winner == "blue" else "blue" game_over_message = ( f"🏆 **GAME OVER!**\n\n" f"💀 {loser.upper()} team hit the KILLER WORD!\n\n" f"{winner_emoji} **{winner.upper()} TEAM WINS!** 🎉\n\n" f"**Final Score:**\n" f"🔴 Red: {red_remaining} words remaining\n" f"🔵 Blue: {blue_remaining} words remaining\n\n" f"Better luck next time! 😅" ) else: game_over_message = ( f"🏆 **GAME OVER!**\n\n" f"{winner_emoji} **{winner.upper()} TEAM WINS!** 🎉\n\n" f"All {winner} words have been found!\n\n" f"**Final Score:**\n" f"🔴 Red: {red_remaining} words remaining\n" f"🔵 Blue: {blue_remaining} words remaining\n\n" f"Congratulations to the {winner.title()} Team! 🥳" ) return [ (results_message, "⚖️ Judge Decision"), (game_over_message, "🏆 Game Over") ] logger.info("IS HUMAN PLAYING") logger.info(self.IS_HUMAN_PLAYING) logger.info("****" * 50) logger.info("[JUDGE]") logger.info("CHAT HISTORY") logger.info(state['chat_history']) # Initialization: first turn - GAME START MESSAGE if state.get("turn", 0) == 0: logger.info("[JUDGE INITIALIZING GAME STATE]") team_emoji = "🔴" if state["current_team"] == "red" else "🔵" # Game start message game_start_content = ( f"🎮 **CODENAMES GAME STARTED!**\n\n" f"{team_emoji} **{state['current_team'].upper()} TEAM** goes first!\n\n" f"**Game Rules:**\n" f"- Teams alternate turns to guess words\n" f"- Match words to your team's color\n" f"- Avoid the opponent's words, neutral words, and the killer word ☠️\n" f"- First team to find all their words wins!\n\n" f"Good luck! 🍀" ) message = AIMessage( content=game_start_content, metadata={"title": "🎮 Game Start", "sender": "judge"} ) logger.info(f"[JUDGE MESSAGE]: {message}") chat_entry, _, _, _ = self._format_chat_entry(None, message, info="Game start") filtered_chat = self._filter_important_messages(state.get("chat_history", [])) return { "messages": [message], "turn": 1, "chat_history": filtered_chat + [chat_entry], } # Check if there are guesses to process guesses = state.get("guesses", []) if not guesses: logger.info("[JUDGE] No guesses made.") return state # Initialize variables board = state["board"] self.board = board current_team = state["current_team"] other_team = "red" if current_team == "blue" else "blue" history_guessed_words = state.get("history_guessed_words", []) teams_reviewed = state.get('teams_reviewed', []) teams_reviewed.append(current_team) if len(set(teams_reviewed)) == 2: end_round = True teams_reviewed = [] else: end_round = False results = ["📋 **Turn Results:**"] logger.info(f"[JUDGE] Team {current_team.upper()} guesses: {guesses}") # Process each guess for word in guesses: history_guessed_words.append(word) self.guessed_words = history_guessed_words logger.info(f"[JUDGE] Evaluating word: {word}") if word == "STOP_TURN": results.append(f"🚧 **{word}** The {current_team.upper()} team stops the turn.") break # ✅ Correct team word elif word in board.get(current_team, []): board[current_team].remove(word) self.board = board results.append(f"✅ **{word}** - Correct! ({current_team.upper()} team word)") # ❌ Other team's word — stop immediately and check win condition elif word in board.get(other_team, []): board[other_team].remove(word) self.board = board results.append(f"❌ **{word}** - Oh no! You selected a {other_team.upper()} team word!") # Check if this caused the other team to win is_game_over, winner, scores = check_win_condition() if is_game_over: messages_list = create_game_over_messages(results, winner, scores) return create_multi_message_state(messages_list, winner_and_score=(winner, scores)) # Check if round is complete if end_round: messages_list = create_round_end_messages(results, other_team) else: messages_list = create_turn_end_messages(results, other_team) return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) # ⚪ Neutral card — stop guessing for this turn elif word in board.get("neutral", []): board["neutral"].remove(word) self.board = board results.append(f"⚪ **{word}** - Neutral word. Turn ends!") # Check if round is complete if end_round: messages_list = create_round_end_messages(results, other_team) else: messages_list = create_turn_end_messages(results, other_team) return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) # ☠️ Killer word — game over elif word == board.get("killer"): results.append(f"☠️ **{word}** - KILLER WORD! 💀") # loser = current_team winner = other_team board["killer"] = None self.board = board red_remaining = len(board.get("red", [])) blue_remaining = len(board.get("blue", [])) scores = (red_remaining, blue_remaining) messages_list = create_game_over_messages( results, winner, scores, reason="killer" ) return create_multi_message_state(messages_list, winner_and_score=(winner, scores, "killer")) # ✅ All guesses processed successfully - check win conditions is_game_over, winner, scores = check_win_condition() if is_game_over: messages_list = create_game_over_messages(results, winner, scores) return create_multi_message_state(messages_list, winner_and_score=(winner, scores)) # Check if round is complete before normal turn switch if end_round: messages_list = create_round_end_messages(results, other_team) else: messages_list = create_turn_end_messages(results, other_team) return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) def route_after_judge(self, state: State) -> str: """Route to the appropriate team or end the game.""" logger.info("***" * 50) logger.info(f"HUMAN PLAYING? {self.IS_HUMAN_PLAYING}") logger.info("***" * 50) logger.info("\n\n") logger.info("***" * 50) logger.info("[ROUTING AFTER JUDGE]") board = state.get("board", {}) logger.info("BOARD ROUTING") logger.info(board) self.board = board self.chat_history = state['chat_history'] self.winners = state['winner_and_score'] # Check if the game is over if board["killer"] is None: logger.info("KILLER HIT, END GAME") logger.info("***" * 50) return END if not all(board.get(key) for key in ("red", "blue")): logger.info("END GAME") logger.info("***" * 50) return END # Human vs. Non-human handling # if self.IS_HUMAN_PLAYING: if state['end_round']: logger.info("ROUND ENDS") logger.info("***" * 50) return END next_team = state.get("next_team") or state.get("current_team", "red") return f"{next_team}_team" def boss_choice(self, state: State) -> str: """Determine whether to continue to tools or return a final answer.""" logger.info("###"*50) logger.info("[BOSS CHOICE]") messages = state["messages"] last_message = messages[-1] if hasattr(last_message, "tool_calls") and last_message.tool_calls: logger.info("[TOOL CALL]") tool_name = last_message.tool_calls[0]["name"] logger.info(tool_name) if tool_name == "ChooseWord": return "choose_word_tool" else: logger.warning("[WARNING] NESSUN TOOL E' STATO RICHIAMATO, TORNIAMO DAL BOSS") current_team = state['current_team'] logger.info(f"[CURRENT TEAM: {current_team}]") return f"{current_team}_boss_is_called" def should_continue(self, state: State) -> str: """Determine whether to continue to tools or return a final answer.""" logger.info("###"*50) logger.info("[SHOULD CONTINUE]") messages = state["messages"] last_message = messages[-1] if hasattr(last_message, "tool_calls") and last_message.tool_calls: logger.info("[TOOL CALL]") tool_name = last_message.tool_calls[0]["name"] logger.info(tool_name) if tool_name == "Call_Agent_1": return "transfer_to_agent_1" elif tool_name == "Call_Agent_2": return "transfer_to_agent_2" elif tool_name == "TeamFinalChoice": return "final_choice" else: logger.warning("[WARNING] NESSUN TOOL E' STATO RICHIAMATO, TORNIAMO DAL CAPITANO") current_team = state['current_team'] logger.info(f"[CURRENT TEAM: {current_team}]") return f"{current_team}_captain_is_called" def _convert_to_string(self, value): """Helper to convert any value to string""" if isinstance(value, str): return value elif isinstance(value, (dict, list)): return str(value) else: return str(value) def _filter_important_messages(self, chat_history): """ Filter chat history to keep only important messages: - Boss choices (ChooseWord) - Captain choices (TeamFinalChoice) - All Judge messages """ return [ entry for entry in chat_history if entry.get("tool_name") in ["ChooseWord", "TeamFinalChoice"] or entry.get("sender_type") == "judge" and entry.get("info") == "chat_history" ] def _format_chat_entry(self, player, message, info=None): """Helper to create a structured chat history entry""" tool_name = None clue = None clue_number = None guesses = [] # Extract content based on message type if hasattr(message, 'tool_calls') and message.tool_calls: tool_call = message.tool_calls[0] tool_name = tool_call.get('name', '') args = tool_call.get('args', {}) if tool_name == 'Call_Agent_1': content = f"Called Agent 1: {self._convert_to_string(args.get('message', 'N/A'))}" elif tool_name == 'Call_Agent_2': content = f"Called Agent 2: {self._convert_to_string(args.get('message', 'N/A'))}" elif tool_name == 'ChooseWord': clue = self._convert_to_string(args.get('clue', 'N/A')) clue_number = args.get('clue_number', 'N/A') content = f"Clue: '{clue.upper()}' for {clue_number} word(s)" elif tool_name == 'TeamFinalChoice': guesses = args.get('guesses', []) content = f"Final choices: {', '.join(guesses)}" else: content = f"Used tool {tool_name}: {args}" elif hasattr(message, 'content') and message.content: content = message.content elif hasattr(message, 'text') and message.text: content = message.text else: content = str(message) # Return structured entry if player is None: return { "sender_type": "judge", "team": "", "role": "", "name": "JUDGE", "tool_name": None, "content": content, "info": info, }, clue, clue_number, guesses return { "sender_type": "player", "team": player.team, "role": player.role, "name": player.name, "tool_name": tool_name, "content": content, "info": info, }, clue, clue_number, guesses def _split_chat_history(self, chat_history): """ Splits chat_history into two parts: - formatted_history: all entries up to and including the last one with info == "chat_history" - current_round: all entries after that Both are formatted strings, where each line is: [name (team role)]: content """ last_index = None # Find the last entry with info == "chat_history" for i, entry in enumerate(chat_history): if entry.get("info") == "chat_history": last_index = i # Helper to format a single entry def format_entry(entry): name = entry.get("name", "Unknown") team = entry.get("team", "N/A") role = entry.get("role", "N/A") content = entry.get("content", "") return f"[{name} ({team} {role})]: {content}" # Build formatted strings if last_index is None: formatted_history = "" current_round = "\n".join(format_entry(e) for e in chat_history) else: formatted_history = "\n".join( format_entry(e) for e in chat_history[:last_index + 1] ) current_round = "\n".join( format_entry(e) for e in chat_history[last_index + 1:] ) return formatted_history, current_round async def stream_graph(self, input_message, messages, players, board, dropdown_clue_number, guessed_words, chat_history, is_human_playing, turn_): """Stream the graph execution.""" logger.info(f"Human Clue: {input_message}") logger.info(f"Human Clue number: {dropdown_clue_number}") logger.info(f"Board: {board}") logger.info(f"Starting team: {board['starting_team']}") logger.info(f"Turn: {turn_}") self.IS_HUMAN_PLAYING = is_human_playing self.board = board inputs = { "messages": [HumanMessage(content=input_message)] if input_message else [], "original_board": board, "board": board, "players": players, "current_team": board.get("starting_team", "red"), "current_role": "boss", "turn": turn_, "last_user_message": input_message or "", "guesses": [], "clue": None, "clue_number": None, "round_messages": [], "chat_history": chat_history, "human_clue": input_message, "human_clue_number": dropdown_clue_number, "teams_reviewed": [], "history_guessed_words": guessed_words, "end_round": False, "winner_and_score": None, "red_boss_is_called_counter": 0, "blue_boss_is_called_counter": 0, "red_captain_is_called_counter": 0, "blue_captain_is_called_counter": 0, } final_msg = "" messages = list(messages) if messages else [] previous_message_is_reasoning = False latest_lang_node = "" current_sender = "" # Track current sender last_message_sender = None # Track the sender of the last message last_reasoning_index = None # Track reasoning step index for OpenAI async for chunk in self.graph.astream( inputs, {"recursion_limit": 100}, stream_mode=["messages", "updates"], subgraphs=True, ): if chunk[1] == "messages": msg = chunk[2][0] lang_node = chunk[2][1]['langgraph_node'] if latest_lang_node != lang_node: latest_lang_node = lang_node current_sender = lang_node # Update sender final_msg = "" last_message_sender = None # Reset when node changes last_reasoning_index = None # Reset reasoning index else: agent_update = chunk[2] msg_from = next(iter(agent_update.keys())) if msg_from in [ 'red_boss', 'red_captain', 'red_agent_1', 'red_agent_2', 'blue_boss', 'blue_captain', 'blue_agent_1', 'blue_agent_2', 'judge', 'final_choice' ]: current_sender = msg_from # Update sender msg = None else: try: msg = next(iter(agent_update.values()))['messages'][-1] except Exception as e: logger.error(f"[EXCEPTION]: {e}") logger.error(f"[CHUNK]: {chunk}") logger.error(f"[Agent update]: {agent_update}") msg = None # logger.info(f"[MSG]: {msg}") if isinstance(msg, AIMessage) and msg.tool_calls: final_msg = "" last_message_sender = None # Reset after tool calls last_reasoning_index = None # Reset reasoning index yield messages, self.guessed_words, self.board, self.chat_history, self.winners elif isinstance(msg, ToolMessage): logger.info(f"[TOOL MESSAGE: {msg}]") tool_name = msg.name if tool_name == "TeamFinalChoice": try: command_data = json.loads(msg.content) update_data = command_data.get("update", {}) guesses = update_data.get("guesses") new_message = ChatMessage( role="assistant", content="I made my final choices: " + ", ".join(guesses), metadata={ "title": "🧠 Guesses", "sender": f"{self.current_team}_captain" } ) except json.JSONDecodeError as e: logger.error(f"Error parsing tool message: {e}") new_message = ChatMessage( role="assistant", content=msg.content, metadata={ "title": "🧠 Guesses", "sender": f"{self.current_team}_captain" } ) elif tool_name == "ChooseWord": try: command_data = json.loads(msg.content) update_data = command_data.get("update", {}) clue = update_data.get("clue") clue_number = update_data.get("clue_number") logger.info(f"Clue: {clue}, Clue Number: {clue_number}") new_message = ChatMessage( role="assistant", content=f"{clue}, {clue_number}", metadata={ "title": "🕵️‍♂️ Clue", "sender": f"{self.current_team}_boss" } ) except json.JSONDecodeError as e: logger.error(f"Error parsing tool message: {e}") new_message = ChatMessage( role="assistant", content=msg.content, metadata={ "title": "🕵️‍♂️ Clue", "sender": f"{self.current_team}_boss" } ) elif tool_name == "Call_Agent_1" or tool_name == "Call_Agent_2": if tool_name == "Call_Agent_1": title_ = "💭 Asking opinion of Agent 1" else: title_ = "💭 Asking opinion of Agent 2" try: command_data = json.loads(msg.content) update_data = command_data.get("update", {}) message = update_data.get("message") new_message = ChatMessage( role="assistant", content=message, metadata={ "title": title_, "sender": f"{self.current_team}_captain" } ) except json.JSONDecodeError as e: logger.error(f"Error parsing tool message: {e}") new_message = ChatMessage( role="assistant", content=msg.content, metadata={ "title": title_, "sender": f"{self.current_team}_captain" } ) else: new_message = ChatMessage( role="assistant", content=msg.content, metadata={ "title": f"""🛠️ {tool_name}""", "sender": current_sender } ) if not messages or messages[-1] != new_message: messages.append(new_message) last_message_sender = new_message.metadata.get("sender") # else: # logger.info("*******"*50) # logger.info("SKIP") final_msg = "" yield messages, self.guessed_words, self.board, self.chat_history, self.winners elif isinstance(msg, AIMessageChunk): # Handle OpenAI format (both reasoning and text) model_provider = msg.response_metadata.get('model_provider') if (model_provider in ["openai", "google_genai", "anthropic"] and isinstance(msg.content, list)): for item in msg.content: if not isinstance(item, dict): continue item_type = item.get('type') # Handle openai reasoning content if item_type == 'reasoning': reasoning_text = "" summary = item.get('summary', []) for summary_item in summary: if isinstance(summary_item, dict) and summary_item.get('type') == 'summary_text': text = summary_item.get('text', '') item_index = summary_item.get('index') if text: reasoning_text += text if reasoning_text: # Check if reasoning index changed (new step) if last_reasoning_index is not None and item_index != last_reasoning_index: reasoning_text = "\n\n" + reasoning_text last_reasoning_index = item_index # Create or update reasoning message if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: final_msg = reasoning_text messages.append(ChatMessage( role="assistant", content=final_msg, metadata={ "title": "🧠 Thinking...", "sender": current_sender } )) last_message_sender = current_sender else: final_msg += reasoning_text if len(messages) > 0: messages[-1].content = final_msg previous_message_is_reasoning = True yield messages, self.guessed_words, self.board, self.chat_history, self.winners # Handle Google and Anthropic thinking content elif item_type == 'thinking': # reasoning_text = "" reasoning_text = item.get('thinking', []) if reasoning_text: # Create or update reasoning message if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: final_msg = reasoning_text messages.append(ChatMessage( role="assistant", content=final_msg, metadata={ "title": "🧠 Thinking...", "sender": current_sender } )) last_message_sender = current_sender else: final_msg += reasoning_text if len(messages) > 0: messages[-1].content = final_msg previous_message_is_reasoning = True yield messages, self.guessed_words, self.board, self.chat_history, self.winners # Handle text content (regular response after reasoning) elif item_type == 'text': text_content = item.get('text', '') if text_content: # If we were in reasoning mode, start fresh if previous_message_is_reasoning: final_msg = "" previous_message_is_reasoning = False last_message_sender = None last_reasoning_index = None # Check if we need a new message (empty OR different sender) if final_msg == "" or last_message_sender != current_sender: final_msg = text_content messages.append(ChatMessage( role="assistant", content=final_msg, metadata={"sender": current_sender} )) last_message_sender = current_sender else: # Same sender, continue streaming to last message final_msg += text_content messages[-1].content = final_msg yield messages, self.guessed_words, self.board, self.chat_history, self.winners elif msg.additional_kwargs.get('reasoning_content'): reasoning_text = msg.additional_kwargs.get("reasoning_content") if reasoning_text: if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: final_msg = reasoning_text messages.append(ChatMessage( role="assistant", content=final_msg, metadata={ "title": "🧠 Thinking...", "sender": current_sender } )) last_message_sender = current_sender else: final_msg += reasoning_text if len(messages) > 0: messages[-1].content = final_msg previous_message_is_reasoning = True elif msg.content and isinstance(msg.content, str): if previous_message_is_reasoning: final_msg = "" previous_message_is_reasoning = False last_message_sender = None # Reset after reasoning # Check if we need a new message (empty OR different sender) if final_msg == "" or last_message_sender != current_sender: final_msg = msg.content # Start fresh for new messages # if current_sender not in ['red_captain', 'blue_captain']: messages.append(ChatMessage( role="assistant", content=final_msg, metadata={"sender": current_sender} )) last_message_sender = current_sender # Track sender else: # Same sender, continue streaming to last message final_msg += msg.content messages[-1].content = final_msg yield messages, self.guessed_words, self.board, self.chat_history, self.winners else: reasoning = msg.additional_kwargs.get("reasoning_content") if reasoning: # Check if we need a new message for reasoning if final_msg == "" or last_message_sender != current_sender: final_msg = reasoning messages.append(ChatMessage( role="assistant", content=final_msg, metadata={ "title": """🧠 Thinking...""", "sender": current_sender } )) last_message_sender = current_sender else: if len(messages) > 0: final_msg += reasoning messages[-1].content = final_msg previous_message_is_reasoning = True # elif () # skip streaming tool call chunks # if final_msg == "" or last_message_sender != current_sender: # final_msg = msg.tool_call_chunks[0]['args'] # messages.append(ChatMessage( # role="assistant", # content=final_msg, # metadata={"sender": current_sender} # )) # last_message_sender = current_sender # Track sender # else: # # Same sender, continue streaming to last message # final_msg += msg.tool_call_chunks[0]['args'] # messages[-1].content = final_msg yield messages, self.guessed_words, self.board, self.chat_history, self.winners elif isinstance(msg, AIMessage): if msg.content and msg.content.strip(): logger.info("***"*50) logger.info(f"msg.content: {msg.content}") logger.info(f"msg: {msg}") try: if msg.metadata.get("sender"): new_sender = msg.metadata.get("sender") current_sender = new_sender except Exception as e: logger.error(f"[EXCEPTION]: {e} - {msg}") if msg.metadata.get("title"): new_title = msg.metadata.get("title") messages.append(ChatMessage( role="assistant", content=msg.content, metadata={ "sender": new_sender, "title": new_title } )) last_message_sender = current_sender # Track sender yield messages, self.guessed_words, self.board, self.chat_history, self.winners yield messages, self.guessed_words, self.board, self.chat_history, self.winners # async def create_graph(): # boss_tools = await load_boss_tools() # captain_tools = await load_captain_tools() # return MyGraph(boss_tools, captain_tools)