| | from __future__ import annotations |
| |
|
| | import os |
| | import platform |
| | import shlex |
| | import subprocess |
| | from typing import Annotated |
| |
|
| | import gradio as gr |
| |
|
| | from app import _log_call_end, _log_call_start, _truncate_for_log |
| | from ._docstrings import autodoc |
| | from .File_System import _resolve_path, ROOT_DIR, _display_path |
| | import shutil |
| |
|
| |
|
| | def _detect_shell(prefer_powershell: bool = True) -> tuple[list[str], str]: |
| | """ |
| | Pick an appropriate shell for the host OS. |
| | - Windows: use PowerShell by default, fall back to cmd.exe. |
| | - POSIX: use /bin/bash if available, else /bin/sh. |
| | Returns (shell_cmd_prefix, shell_name) where shell_cmd_prefix is the command list to launch the shell. |
| | """ |
| | system = platform.system().lower() |
| | if system == "windows": |
| | if prefer_powershell: |
| | pwsh = shutil.which("pwsh") |
| | candidates = [pwsh, shutil.which("powershell"), shutil.which("powershell.exe")] |
| | for cand in candidates: |
| | if cand: |
| | return [cand, "-NoLogo", "-NoProfile", "-Command"], "powershell" |
| | |
| | comspec = os.environ.get("ComSpec", r"C:\\Windows\\System32\\cmd.exe") |
| | return [comspec, "/C"], "cmd" |
| | |
| | bash = shutil.which("bash") |
| | if bash: |
| | return [bash, "-lc"], "bash" |
| | sh = os.environ.get("SHELL", "/bin/sh") |
| | return [sh, "-lc"], "sh" |
| |
|
| |
|
| | |
| | _DETECTED_SHELL_PREFIX, _DETECTED_SHELL_NAME = _detect_shell() |
| |
|
| |
|
| | |
| | TOOL_SUMMARY = ( |
| | "Execute a shell command within a safe working directory under the tool root ('/'). " |
| | "Paths must be relative to '/'. " |
| | "Set workdir to '.' to use the root. " |
| | "Absolute paths are disabled." |
| | f"Detected shell: {_DETECTED_SHELL_NAME}." |
| | ) |
| |
|
| |
|
| | def _run_command(command: str, cwd: str, timeout: int) -> tuple[str, str, int]: |
| | shell_prefix, shell_name = _detect_shell() |
| | full_cmd = shell_prefix + [command] |
| | try: |
| | proc = subprocess.run( |
| | full_cmd, |
| | cwd=cwd, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | encoding="utf-8", |
| | errors="replace", |
| | timeout=timeout if timeout and timeout > 0 else None, |
| | ) |
| | return proc.stdout, proc.stderr, proc.returncode |
| | except subprocess.TimeoutExpired as exc: |
| | return exc.stdout or "", (exc.stderr or "") + "\n[timeout]", 124 |
| | except Exception as exc: |
| | return "", f"Execution failed: {exc}", 1 |
| |
|
| |
|
| | @autodoc(summary=TOOL_SUMMARY) |
| | def Shell_Command( |
| | command: Annotated[str, "Shell command to execute. Accepts multi-part pipelines as a single string."], |
| | workdir: Annotated[str, "Working directory (relative to root unless UNSAFE_ALLOW_ABS_PATHS=1)."] = ".", |
| | timeout: Annotated[int, "Timeout in seconds (0 = no timeout, be careful on public hosting)."] = 60, |
| | ) -> str: |
| | _log_call_start("Shell_Command", command=command, workdir=workdir, timeout=timeout) |
| | if not command or not command.strip(): |
| | result = "No command provided." |
| | _log_call_end("Shell_Command", _truncate_for_log(result)) |
| | return result |
| |
|
| | abs_cwd, err = _resolve_path(workdir) |
| | if err: |
| | _log_call_end("Shell_Command", _truncate_for_log(err)) |
| | return err |
| | if not os.path.exists(abs_cwd): |
| | result = f"Working directory not found: {abs_cwd}" |
| | _log_call_end("Shell_Command", _truncate_for_log(result)) |
| | return result |
| |
|
| | |
| | _, shell_name = _detect_shell() |
| | stdout, stderr, code = _run_command(command, cwd=abs_cwd, timeout=timeout) |
| | display_cwd = _display_path(abs_cwd) |
| | header = ( |
| | f"Command: {command}\n" |
| | f"CWD: {display_cwd}\n" |
| | f"Root: /\n" |
| | f"Shell: {shell_name}\n" |
| | f"Exit code: {code}\n" |
| | f"--- STDOUT ---\n" |
| | ) |
| | output = header + (stdout or "<empty>") + "\n--- STDERR ---\n" + (stderr or "<empty>") |
| | _log_call_end("Shell_Command", _truncate_for_log(f"exit={code} stdout={len(stdout)} stderr={len(stderr)}")) |
| | return output |
| |
|
| |
|
| | def build_interface() -> gr.Interface: |
| | return gr.Interface( |
| | fn=Shell_Command, |
| | inputs=[ |
| | gr.Textbox(label="Command", placeholder="echo hello || dir", lines=2), |
| | gr.Textbox(label="Workdir", value=".", max_lines=1), |
| | gr.Slider(minimum=0, maximum=600, step=5, value=60, label="Timeout (seconds)"), |
| | ], |
| | outputs=gr.Textbox(label="Output", lines=20), |
| | title="Shell Command", |
| | description=( |
| | "<div style=\"text-align:center; overflow:hidden;\">" |
| | "Run a shell command under the same safe root as File System. " |
| | "Absolute paths are disabled, use relative paths. " |
| | f"Detected shell: {_DETECTED_SHELL_NAME}. " |
| | "</div>" |
| | ), |
| | api_description=TOOL_SUMMARY, |
| | flagging_mode="never", |
| | submit_btn="Run", |
| | ) |
| |
|
| |
|
| | __all__ = ["Shell_Command", "build_interface"] |
| |
|