circle-ci-viz / app.py
ydshieh's picture
ydshieh HF Staff
Update app.py
a53a846 verified
raw
history blame
9.13 kB
import json
import os
from functools import lru_cache
from typing import List, Optional, Tuple
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import HfHubHTTPError
DATASET_ID = os.environ.get(
"CIRCLECI_RESULTS_DATASET_ID",
"transformers-community/circleci-test-results",
)
MAX_ROWS = 200
# Get token from environment variable
HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN")
API = HfApi(token=HF_TOKEN)
@lru_cache(maxsize=128)
def _list_collection_files(pr_number: str) -> Tuple[str, ...]:
"""
Return the `failure_summary.json` paths stored for a specific PR.
"""
prefix = f"pr-{pr_number}"
print(f"DEBUG: Looking for files with prefix: {prefix}")
print(f"DEBUG: Dataset ID: {DATASET_ID}")
try:
# List all files in the repo and filter by prefix
entries = API.list_repo_tree(
repo_id=DATASET_ID,
repo_type="dataset",
revision="main",
recursive=True,
)
print("DEBUG: Successfully called list_repo_tree")
except HfHubHTTPError as error:
print(f"ERROR: Failed to list repo tree: {error}")
return tuple()
except Exception as error:
print(f"ERROR: Unexpected error in list_repo_tree: {error}")
import traceback
traceback.print_exc()
return tuple()
files = []
matching_paths = []
all_entries = []
try:
for entry in entries:
all_entries.append(entry)
entry_type = getattr(entry, "type", type(entry).__name__)
entry_path = getattr(entry, "path", str(entry))
# Debug: show all entries
if len(all_entries) <= 10:
print(f"DEBUG: Entry {len(all_entries)}: {entry_path} (type: {entry_type})")
# Filter by prefix
if entry_path.startswith(prefix):
matching_paths.append(entry_path)
# Look for failure_summary.json files
if entry_path.startswith(prefix) and entry_path.endswith("failure_summary.json"):
if "file" in entry_type.lower() or entry_type == "RepoFile":
files.append(entry_path)
print(f"DEBUG: Found matching file: {entry_path}")
print(f"DEBUG: Total entries processed: {len(all_entries)}")
print(f"DEBUG: Entries with prefix '{prefix}': {len(matching_paths)}")
print(f"DEBUG: failure_summary.json files found: {len(files)}")
if matching_paths and len(files) == 0:
print(f"DEBUG: Sample matching paths (first 5): {matching_paths[:5]}")
# Check if we're only getting folders
folder_count = sum(1 for p in matching_paths if "RepoFolder" in str(type(p)))
print(f"DEBUG: Folders in matching paths: {folder_count}")
except Exception as error:
print(f"ERROR: Error processing entries: {error}")
import traceback
traceback.print_exc()
return tuple()
return tuple(files)
def _load_payload(path: str) -> Optional[dict]:
try:
local_path = hf_hub_download(
repo_id=DATASET_ID,
filename=path,
repo_type="dataset",
)
except Exception as error:
print(f"Failed to download {path}: {error}")
return None
try:
with open(local_path) as fp:
return json.load(fp)
except Exception as error:
print(f"Failed to load JSON for {path}: {error}")
return None
def _extract_commit_from_path(path: str) -> str:
parts = path.split("/")
if len(parts) >= 2 and parts[1].startswith("sha-"):
return parts[1][len("sha-") :]
return "unknown"
def _filter_records(repo: str, pr: str, sha: str) -> List[dict]:
repo = repo.strip().lower()
pr = pr.strip()
sha = sha.strip().lower()
if not pr:
return []
file_paths = _list_collection_files(pr)
records: List[dict] = []
for file_path in file_paths:
commit = _extract_commit_from_path(file_path)
if sha and not commit.lower().startswith(sha):
continue
payload = _load_payload(file_path)
if payload is None:
continue
metadata = payload.get("metadata") or {}
repository = (metadata.get("repository") or "").lower()
if repo and repo not in repository:
continue
payload["__source_path"] = file_path
payload["__commit"] = commit
records.append(payload)
def _sort_key(record: dict) -> str:
metadata = record.get("metadata") or {}
return metadata.get("collected_at") or ""
records.sort(key=_sort_key, reverse=True)
return records[:MAX_ROWS]
def query(repo: str, pr: str, sha: str) -> Tuple[List[List[str]], str, str]:
repo = repo.strip()
pr = pr.strip()
sha = sha.strip()
print(f"DEBUG: Query called with repo='{repo}', pr='{pr}', sha='{sha}'")
if not pr:
return [], json.dumps({"error": "PR number is required."}, indent=2), "Provide a PR number to search."
records = _filter_records(repo, pr, sha)
print(f"DEBUG: _filter_records returned {len(records)} records")
if not records:
return [], json.dumps({"error": "No records found."}, indent=2), f"No records found for PR {pr}."
table_rows = []
for record in records:
metadata = record.get("metadata") or {}
table_rows.append(
[
metadata.get("collected_at", ""),
metadata.get("repository", ""),
metadata.get("branch", ""),
metadata.get("pull_request_number", ""),
(metadata.get("commit_sha") or "")[:12],
metadata.get("workflow_id", ""),
str(len(record.get("failures", []))),
]
)
latest_payload = json.dumps(records[0], indent=2)
status = f"Showing {len(records)} record(s) for PR {pr}."
print(f"DEBUG: Returning {len(table_rows)} table rows")
return table_rows, latest_payload, status
def refresh_dataset() -> str:
_list_collection_files.cache_clear()
return "Cleared cached manifest. Data will be reloaded on next search."
with gr.Blocks(head="""
<script>
document.addEventListener('DOMContentLoaded', function() {
// Parse URL parameters
const params = new URLSearchParams(window.location.search);
const repo = params.get('repo') || '';
const pr = params.get('pr') || '';
const sha = params.get('sha') || '';
if (repo || pr || sha) {
// Wait for Gradio to initialize
setTimeout(() => {
// Find and populate the input fields
const inputs = document.querySelectorAll('input[type="text"]');
if (inputs.length >= 3) {
if (repo) inputs[0].value = repo;
if (pr) inputs[1].value = pr;
if (sha) inputs[2].value = sha;
// Trigger input events to update Gradio's state
inputs.forEach(input => {
input.dispatchEvent(new Event('input', { bubbles: true }));
});
// Auto-click search if PR is provided
if (pr) {
setTimeout(() => {
const buttons = document.querySelectorAll('button');
const searchBtn = Array.from(buttons).find(btn => btn.textContent.includes('Search'));
if (searchBtn) searchBtn.click();
}, 500);
}
}
}, 1000);
}
});
</script>
""") as demo:
gr.Markdown(
"""
# CircleCI Test Collection Helper
Use the filters below to inspect CircleCI test aggregation records for the Transformers repository (or any
repository that uploads data to the `transformers-community/circleci-test-results` dataset).
Files are named `failure_summary.json` and organized as `pr-{PR}/sha-{COMMIT}/failure_summary.json`.
"""
)
with gr.Row():
repo_box = gr.Textbox(label="Repository", placeholder="huggingface/transformers")
pr_box = gr.Textbox(label="PR number (required)")
sha_box = gr.Textbox(label="Commit SHA (prefix accepted)")
with gr.Row():
search_btn = gr.Button("Search")
refresh_btn = gr.Button("Clear cache")
table = gr.Dataframe(
headers=[
"Collected at",
"Repository",
"Branch",
"PR",
"Commit",
"Workflow ID",
"Failures",
],
wrap=True,
)
json_view = gr.Code(label="Latest entry details", language="json")
status = gr.Markdown("")
search_btn.click(query, inputs=[repo_box, pr_box, sha_box], outputs=[table, json_view, status])
refresh_btn.click(refresh_dataset, outputs=status)
if __name__ == "__main__":
demo.queue(max_size=20).launch(ssr_mode=False)