|
|
|
|
|
"""
|
|
|
Prepare GD level data for modded-nanogpt training.
|
|
|
Converts tokenized levels to .bin format compatible with the data loader.
|
|
|
Uses multiprocessing for fast tokenization.
|
|
|
|
|
|
Usage (local files):
|
|
|
python prepare_gd_data.py --input data/gd_raw --output data/gd_levels --tokenizer tokenizer.model
|
|
|
|
|
|
Usage (HuggingFace dataset):
|
|
|
python prepare_gd_data.py --hf-repo tldne/gd-levels --hf-file levels_deduped.jsonl --output data/gd_levels --tokenizer tokenizer.model
|
|
|
"""
|
|
|
|
|
|
import argparse
|
|
|
import json
|
|
|
import numpy as np
|
|
|
from pathlib import Path
|
|
|
import sentencepiece as spm
|
|
|
from tqdm import tqdm
|
|
|
from huggingface_hub import hf_hub_download
|
|
|
from multiprocessing import Pool, cpu_count
|
|
|
from functools import partial
|
|
|
import os
|
|
|
|
|
|
|
|
|
INCLUDE_FIELDS = [
|
|
|
'level_id',
|
|
|
'level_name',
|
|
|
'level_type',
|
|
|
'binary_version',
|
|
|
'description_decoded',
|
|
|
'song_id',
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
def format_training_sample(record: dict) -> str:
|
|
|
"""
|
|
|
Format a level record as a training string.
|
|
|
Only includes specific metadata fields + level_string.
|
|
|
Must match train_tokenizer.py exactly!
|
|
|
"""
|
|
|
parts = []
|
|
|
|
|
|
|
|
|
source_file = record.get('source_file', '')
|
|
|
if source_file:
|
|
|
level_id = source_file.lstrip("Level_").rstrip(".gmd2")
|
|
|
parts.append(f"<level_id>{level_id}")
|
|
|
|
|
|
|
|
|
for key in INCLUDE_FIELDS:
|
|
|
if key == 'level_id':
|
|
|
continue
|
|
|
value = record.get(key)
|
|
|
if value is not None and value != "":
|
|
|
parts.append(f"<{key}>{value}")
|
|
|
|
|
|
|
|
|
if record.get('level_string'):
|
|
|
parts.append(f"<level_string>{record['level_string']}")
|
|
|
|
|
|
return "".join(parts)
|
|
|
|
|
|
|
|
|
def write_bin_file(tokens: np.ndarray, output_path: Path):
|
|
|
"""Write tokens to .bin file with modded-nanogpt header format."""
|
|
|
header = np.zeros(256, dtype=np.int32)
|
|
|
header[0] = 20240520
|
|
|
header[1] = 1
|
|
|
header[2] = len(tokens)
|
|
|
|
|
|
with output_path.open("wb") as f:
|
|
|
f.write(header.tobytes())
|
|
|
f.write(tokens.astype(np.uint16).tobytes())
|
|
|
|
|
|
return len(tokens)
|
|
|
|
|
|
|
|
|
|
|
|
def tokenize_record(record_json: str, tokenizer_path: str) -> np.ndarray | None:
|
|
|
"""Tokenize a single record. Returns None if should be skipped."""
|
|
|
try:
|
|
|
|
|
|
if not hasattr(tokenize_record, '_sp'):
|
|
|
tokenize_record._sp = spm.SentencePieceProcessor()
|
|
|
tokenize_record._sp.load(tokenizer_path)
|
|
|
sp = tokenize_record._sp
|
|
|
|
|
|
record = json.loads(record_json)
|
|
|
if not record.get('level_string'):
|
|
|
return None
|
|
|
|
|
|
text = format_training_sample(record)
|
|
|
if not text or len(text) > 5_000_000:
|
|
|
return None
|
|
|
|
|
|
tokens = [sp.bos_id()] + sp.encode(text) + [sp.eos_id()]
|
|
|
if len(tokens) > 10_000_000:
|
|
|
return None
|
|
|
|
|
|
return np.array(tokens, dtype=np.uint16)
|
|
|
except:
|
|
|
return None
|
|
|
|
|
|
|
|
|
def process_levels_parallel(
|
|
|
input_dir: Path,
|
|
|
output_dir: Path,
|
|
|
tokenizer_path: Path,
|
|
|
shard_size: int = 100_000_000,
|
|
|
val_ratio: float = 0.01,
|
|
|
num_workers: int = None,
|
|
|
):
|
|
|
"""Process all levels using multiprocessing and create train/val shards."""
|
|
|
|
|
|
if num_workers is None:
|
|
|
num_workers = max(1, cpu_count() - 1)
|
|
|
|
|
|
print(f"Using {num_workers} workers for tokenization")
|
|
|
|
|
|
|
|
|
print(f"Loading tokenizer from {tokenizer_path}")
|
|
|
sp = spm.SentencePieceProcessor()
|
|
|
sp.load(str(tokenizer_path))
|
|
|
print(f"BOS ID: {sp.bos_id()}, EOS ID: {sp.eos_id()}")
|
|
|
|
|
|
|
|
|
jsonl_files = list(input_dir.glob("*.jsonl"))
|
|
|
json_files = list(input_dir.glob("*.json"))
|
|
|
level_files = jsonl_files + json_files
|
|
|
|
|
|
if not level_files:
|
|
|
jsonl_files = list(input_dir.glob("**/*.jsonl"))
|
|
|
json_files = list(input_dir.glob("**/*.json"))
|
|
|
level_files = jsonl_files + json_files
|
|
|
|
|
|
print(f"Found {len(level_files)} input files")
|
|
|
|
|
|
|
|
|
print("Loading records...")
|
|
|
all_records = []
|
|
|
for lf in tqdm(level_files, desc="Reading files"):
|
|
|
try:
|
|
|
if lf.suffix == ".jsonl":
|
|
|
with open(lf, "r", encoding="utf-8") as f:
|
|
|
for line in f:
|
|
|
line = line.strip()
|
|
|
if line:
|
|
|
all_records.append(line)
|
|
|
else:
|
|
|
with open(lf, "r", encoding="utf-8") as f:
|
|
|
data = json.load(f)
|
|
|
records = data if isinstance(data, list) else [data]
|
|
|
for record in records:
|
|
|
all_records.append(json.dumps(record))
|
|
|
except Exception as e:
|
|
|
print(f"Error reading {lf}: {e}")
|
|
|
|
|
|
print(f"Loaded {len(all_records):,} records")
|
|
|
|
|
|
|
|
|
print(f"Tokenizing with {num_workers} workers...")
|
|
|
tokenize_fn = partial(tokenize_record, tokenizer_path=str(tokenizer_path))
|
|
|
|
|
|
all_levels = []
|
|
|
total_tokens = 0
|
|
|
|
|
|
with Pool(num_workers) as pool:
|
|
|
results = list(tqdm(
|
|
|
pool.imap(tokenize_fn, all_records, chunksize=100),
|
|
|
total=len(all_records),
|
|
|
desc="Tokenizing"
|
|
|
))
|
|
|
|
|
|
for tokens in results:
|
|
|
if tokens is not None:
|
|
|
all_levels.append(tokens)
|
|
|
total_tokens += len(tokens)
|
|
|
|
|
|
print(f"Tokenized {len(all_levels):,} levels with {total_tokens:,} total tokens")
|
|
|
|
|
|
|
|
|
np.random.seed(42)
|
|
|
indices = np.random.permutation(len(all_levels))
|
|
|
|
|
|
val_count = max(1, int(len(all_levels) * val_ratio))
|
|
|
val_indices = indices[:val_count]
|
|
|
train_indices = indices[val_count:]
|
|
|
|
|
|
print(f"Train: {len(train_indices):,} levels, Val: {len(val_indices):,} levels")
|
|
|
|
|
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
train_tokens = np.concatenate([all_levels[i] for i in tqdm(train_indices, desc="Concatenating train")])
|
|
|
num_train_shards = max(1, len(train_tokens) // shard_size)
|
|
|
|
|
|
print(f"Writing {num_train_shards} train shards...")
|
|
|
shard_size_actual = len(train_tokens) // num_train_shards
|
|
|
for i in tqdm(range(num_train_shards), desc="Writing train shards"):
|
|
|
start = i * shard_size_actual
|
|
|
end = start + shard_size_actual if i < num_train_shards - 1 else len(train_tokens)
|
|
|
shard = train_tokens[start:end]
|
|
|
|
|
|
output_path = output_dir / f"train_{i:04d}.bin"
|
|
|
n = write_bin_file(shard, output_path)
|
|
|
|
|
|
|
|
|
val_tokens = np.concatenate([all_levels[i] for i in tqdm(val_indices, desc="Concatenating val")])
|
|
|
num_val_shards = max(1, len(val_tokens) // shard_size)
|
|
|
|
|
|
print(f"Writing {num_val_shards} val shards...")
|
|
|
shard_size_actual = len(val_tokens) // num_val_shards
|
|
|
for i in range(num_val_shards):
|
|
|
start = i * shard_size_actual
|
|
|
end = start + shard_size_actual if i < num_val_shards - 1 else len(val_tokens)
|
|
|
shard = val_tokens[start:end]
|
|
|
|
|
|
output_path = output_dir / f"val_{i:04d}.bin"
|
|
|
n = write_bin_file(shard, output_path)
|
|
|
|
|
|
|
|
|
print("\n=== Summary ===")
|
|
|
print(f"Train tokens: {len(train_tokens):,}")
|
|
|
print(f"Val tokens: {len(val_tokens):,}")
|
|
|
print(f"Total tokens: {len(train_tokens) + len(val_tokens):,}")
|
|
|
print(f"Output dir: {output_dir}")
|
|
|
|
|
|
|
|
|
def main():
|
|
|
parser = argparse.ArgumentParser(description="Prepare GD levels for modded-nanogpt")
|
|
|
|
|
|
|
|
|
input_group = parser.add_mutually_exclusive_group(required=True)
|
|
|
input_group.add_argument("--input", "-i", type=Path, help="Input directory/file with level JSONs/JSONL")
|
|
|
input_group.add_argument("--hf-repo", type=str, help="HuggingFace repo ID (e.g., tldne/gd-levels)")
|
|
|
|
|
|
parser.add_argument("--hf-file", type=str, default="levels_deduped.jsonl", help="File to download from HF repo")
|
|
|
parser.add_argument("--output", "-o", type=Path, required=True, help="Output directory for .bin files")
|
|
|
parser.add_argument("--tokenizer", "-t", type=Path, required=True, help="Path to tokenizer.model")
|
|
|
parser.add_argument("--shard-size", type=int, default=100_000_000, help="Tokens per shard")
|
|
|
parser.add_argument("--val-ratio", type=float, default=0.01, help="Validation split ratio")
|
|
|
parser.add_argument("--workers", "-w", type=int, default=None, help="Number of workers (default: cpu_count - 1)")
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
if args.hf_repo:
|
|
|
print(f"Downloading {args.hf_file} from {args.hf_repo}...")
|
|
|
downloaded_path = hf_hub_download(
|
|
|
repo_id=args.hf_repo,
|
|
|
filename=args.hf_file,
|
|
|
repo_type="dataset",
|
|
|
)
|
|
|
print(f"Downloaded to: {downloaded_path}")
|
|
|
import tempfile
|
|
|
import shutil
|
|
|
temp_dir = Path(tempfile.mkdtemp())
|
|
|
shutil.copy(downloaded_path, temp_dir / args.hf_file)
|
|
|
input_dir = temp_dir
|
|
|
else:
|
|
|
input_dir = args.input
|
|
|
temp_dir = None
|
|
|
|
|
|
process_levels_parallel(
|
|
|
input_dir=input_dir,
|
|
|
output_dir=args.output,
|
|
|
tokenizer_path=args.tokenizer,
|
|
|
shard_size=args.shard_size,
|
|
|
val_ratio=args.val_ratio,
|
|
|
num_workers=args.workers,
|
|
|
)
|
|
|
|
|
|
|
|
|
if temp_dir:
|
|
|
import shutil
|
|
|
shutil.rmtree(temp_dir)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|