#!/usr/bin/env python3 from __future__ import annotations import argparse import hashlib import json import os import subprocess import sys import tempfile from pathlib import Path from typing import Dict, List ROOT = Path(__file__).resolve().parent REWRITE_DIR = ROOT / "scripts" / "rewrite" FULL_VECTORS = ROOT / "lifter" / "test" / "test_vectors" / "oracle_vectors_full_handlers.json" DEFAULT_VECTORS = ROOT / "lifter" / "test" / "test_vectors" / "oracle_vectors.json" IR_OUTPUT_DIR = ROOT.parent / "rewrite-regression-work" / "ir_outputs" GOLDEN_HASHES_FILE = ROOT / "lifter" / "test" / "test_vectors" / "golden_ir_hashes.json" SEMANTIC_SCRIPT = REWRITE_DIR / "check_semantic.py" THEMIDA_SCRIPT = REWRITE_DIR / "check_themida_equivalence.py" # C-compiled samples produce toolchain-dependent IR (different addresses across # compiler versions/machines). Exclude them from golden hash determinism checks; # their correctness is validated by semantic tests instead. C_SOURCES_DIR = ROOT / "testcases" / "rewrite_smoke" def _c_compiled_prefixes() -> set: """Return base names of C/C++-compiled test samples (without extension).""" if not C_SOURCES_DIR.is_dir(): return set() result = {p.stem for p in C_SOURCES_DIR.glob("*.c")} result |= {p.stem for p in C_SOURCES_DIR.glob("*.cpp")} return result def _is_golden_eligible(ll_name: str) -> bool: """Return True if an IR file should be tracked in the golden hash set. Excludes all files derived from C-compiled samples (optimized, no_opts, and semantic variants) since they are toolchain-dependent. """ prefixes = _c_compiled_prefixes() base = ll_name.removesuffix(".ll") for suffix in ("", "_no_opts", "_semantic"): candidate = base.removesuffix(suffix) if suffix else base if candidate in prefixes: return False return True def _run(argv: List[str], extra_env: Dict[str, str] | None = None) -> None: env = os.environ.copy() if extra_env: env.update(extra_env) print("+", " ".join(argv)) result = subprocess.run(argv, cwd=ROOT, env=env) if result.returncode != 0: raise SystemExit(result.returncode) def _run_capture( argv: List[str], *, cwd: Path | None = None, extra_env: Dict[str, str] | None = None, ) -> subprocess.CompletedProcess[str]: env = os.environ.copy() if extra_env: env.update(extra_env) cmd = [str(arg) for arg in argv] print("+", " ".join(cmd)) return subprocess.run( cmd, cwd=cwd or ROOT, env=env, text=True, capture_output=True, ) def _assert_failure_contains( result: subprocess.CompletedProcess[str], *, check_name: str, required_substrings: List[str], ) -> None: output = (result.stdout or "") + (result.stderr or "") if result.returncode == 0: raise SystemExit( f"Negative check '{check_name}' unexpectedly succeeded. Output:\n{output}" ) missing = [token for token in required_substrings if token not in output] if missing: raise SystemExit( f"Negative check '{check_name}' failed to emit required markers {missing}. " f"Output:\n{output}" ) print(f"[OK] {check_name}") def _run_cmd(script: Path, args: List[str] | None = None, extra_env: Dict[str, str] | None = None) -> None: _run(["cmd", "/c", str(script), *(args or [])], extra_env=extra_env) def _resolve_repo_path(user_path: Path, label: str) -> Path: resolved_root = ROOT.resolve() resolved_path = (user_path if user_path.is_absolute() else ROOT / user_path).resolve() try: resolved_path.relative_to(resolved_root) except ValueError as exc: raise SystemExit( f"{label} must be inside repository root '{resolved_root}', got '{resolved_path}'" ) from exc return resolved_path def compute_ir_hashes(ir_dir: Path) -> Dict[str, str]: hashes: Dict[str, str] = {} if not ir_dir.is_dir(): return hashes for ll_file in sorted(ir_dir.rglob("*.ll")): content = ll_file.read_text(encoding="utf-8", errors="replace") normalized = "\n".join(line.rstrip() for line in content.splitlines()) + "\n" digest = hashlib.sha256(normalized.encode("utf-8")).hexdigest() rel_key = ll_file.relative_to(ir_dir).as_posix() hashes[rel_key] = digest return dict(sorted(hashes.items())) def check_determinism(ir_dir: Path, golden_file: Path) -> None: hashes = compute_ir_hashes(ir_dir) if not hashes: raise SystemExit( f"Determinism check FAILED — no .ll files found in {ir_dir}" ) if not golden_file.exists(): raise SystemExit( f"Determinism check FAILED — golden hash file is missing: {golden_file}. " "Run `python test.py update-golden` to regenerate it." ) golden = json.loads(golden_file.read_text(encoding="utf-8")) mismatches: List[str] = [] # Only check files tracked in the golden set. C-compiled samples produce # toolchain-dependent IR (different addresses) and are excluded from golden # tracking — their correctness is validated by semantic tests instead. for key in sorted(golden): expected = golden[key] actual = hashes.get(key) if expected != actual: mismatches.append( f" {key}: expected={expected} actual={actual or '(missing)'}" ) if mismatches: print("Determinism check FAILED — mismatched files:") for m in mismatches: print(m) raise SystemExit(1) unchecked = sorted(set(hashes) - set(golden)) checked = len(golden) print(f"Determinism check passed: {checked} golden files match", end="") if unchecked: print(f" ({len(unchecked)} untracked files skipped)") else: print() def update_golden(ir_dir: Path, golden_file: Path) -> None: all_hashes = compute_ir_hashes(ir_dir) if not all_hashes: print("WARNING: no .ll files found in", ir_dir, "— nothing to write") return # Exclude C-compiled samples — their IR is toolchain-dependent. hashes = {k: v for k, v in all_hashes.items() if _is_golden_eligible(k)} golden_file.parent.mkdir(parents=True, exist_ok=True) golden_file.write_text(json.dumps(hashes, indent=2) + "\n", encoding="utf-8") excluded = len(all_hashes) - len(hashes) msg = f"Golden hashes updated: {golden_file} ({len(hashes)} files)" if excluded: msg += f" ({excluded} C-compiled files excluded)" print(msg) def run_baseline() -> None: _run_cmd(REWRITE_DIR / "run.cmd") check_determinism(IR_OUTPUT_DIR, GOLDEN_HASHES_FILE) def run_micro(filter_tokens: List[str], check_flags: bool, regenerate_oracle: bool) -> None: env: Dict[str, str] = {} if not regenerate_oracle: env["SKIP_ORACLE_GENERATION"] = "1" if check_flags: env["MERGEN_TEST_CHECK_FLAGS"] = "1" args: List[str] = [] args.extend(filter_tokens) _run_cmd(REWRITE_DIR / "run_microtests.cmd", args=args, extra_env=env) def run_full(check_flags: bool) -> None: env: Dict[str, str] | None = None if check_flags: env = {"MERGEN_TEST_CHECK_FLAGS": "1"} _run_cmd(REWRITE_DIR / "run_all_handlers.cmd", extra_env=env) def run_flagstress(filter_tokens: List[str]) -> None: _run_cmd(REWRITE_DIR / "run_flagstress.cmd", args=filter_tokens) def run_coverage(vectors_file: Path) -> None: vectors_arg = vectors_file.relative_to(ROOT) _run_cmd(REWRITE_DIR / "collect_instruction_tests.cmd", args=["--vectors-file", str(vectors_arg)]) def run_report(vectors_file: Path, as_json: bool) -> None: args = ["--vectors", str(vectors_file)] if as_json: args.append("--json") _run([sys.executable, str(REWRITE_DIR / "report_coverage.py")] + args) def run_semantic(filters: List[str] | None = None, input_ir: Path | None = None) -> None: args = [sys.executable, str(SEMANTIC_SCRIPT), "--ir-dir", str(IR_OUTPUT_DIR)] if filters: args.extend(["--filter"] + filters) if input_ir is not None: args.extend(["--input-ir", str(input_ir)]) _run(args) def run_vmp(filter_tokens: List[str]) -> None: args = [ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", str(ROOT / "scripts" / "dev" / "profile_simple_vmp.ps1"), "-Validate", ] if filter_tokens: args.extend(["-Filter", *filter_tokens]) _run(args) def run_themida(filter_tokens: List[str], update: bool) -> None: args = [sys.executable, str(THEMIDA_SCRIPT)] if update: args.append("--update") if filter_tokens: args.extend(filter_tokens) _run(args) def run_negative_checks() -> None: lifter_path = ROOT / "build_iced" / "lifter.exe" if not lifter_path.exists(): raise SystemExit( f"Negative checks require a built lifter at '{lifter_path}'. " "Run `cmd /c scripts\\dev\\build_iced.cmd` first." ) no_args_result = _run_capture(["cmd", "/c", str(lifter_path)]) _assert_failure_contains( no_args_result, check_name="lifter rejects missing positional args", required_substrings=["Usage:"], ) rewrite_workdir = ROOT.parent / "rewrite-regression-work" verify_script = REWRITE_DIR / "verify.ps1" with tempfile.TemporaryDirectory(prefix="mergen-negative-") as temp_dir: temp_root = Path(temp_dir) bad_name_manifest = temp_root / "bad_manifest_name.json" bad_name_manifest.write_text( json.dumps({"samples": [{"name": "..\\\\evil", "patterns": ["ret"]}]}, indent=2), encoding="utf-8", ) bad_name_result = _run_capture( [ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", str(verify_script), "-WorkDir", str(rewrite_workdir), "-ManifestPath", str(bad_name_manifest), ] ) _assert_failure_contains( bad_name_result, check_name="verify rejects path-traversal manifest sample name", required_substrings=["invalid name", "path traversal"], ) bad_patterns_manifest = temp_root / "bad_manifest_patterns.json" bad_patterns_manifest.write_text( json.dumps({"samples": [{"name": "branch", "patterns": "ret"}]}, indent=2), encoding="utf-8", ) bad_patterns_result = _run_capture( [ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", str(verify_script), "-WorkDir", str(rewrite_workdir), "-ManifestPath", str(bad_patterns_manifest), ] ) _assert_failure_contains( bad_patterns_result, check_name="verify rejects string patterns descriptors", required_substrings=["patterns must be an array"], ) bad_vectors_path = temp_root / "bad_vectors_skip.json" bad_vectors_path.write_text( json.dumps( {"cases": [{"name": "x", "handler": "add", "skip": "false"}]}, indent=2, ), encoding="utf-8", ) bad_vectors_result = _run_capture( [ sys.executable, str(REWRITE_DIR / "collect_instruction_tests.py"), "--vectors-file", str(bad_vectors_path), "--json", ] ) _assert_failure_contains( bad_vectors_result, check_name="coverage collector rejects non-boolean skip values", required_substrings=["invalid 'skip' value", "expected boolean"], ) outside_repo_vectors_result = _run_capture( [ sys.executable, str(ROOT / "test.py"), "coverage", "--vectors", "C:/Windows/win.ini", ] ) _assert_failure_contains( outside_repo_vectors_result, check_name="test runner rejects vectors paths outside repository", required_substrings=["must be inside repository root"], ) print("Negative checks passed") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Convenience test runner for Mergen rewrite gates" ) sub = parser.add_subparsers(dest="command") sub.add_parser("quick", help="baseline + microtests (skip oracle regen)") sub.add_parser("baseline", help="run scripts/rewrite/run.cmd") sub.add_parser("update-golden", help="run baseline then regenerate golden IR hashes") sub.add_parser("negative", help="run explicit negative/failure contract checks") full = sub.add_parser("full", help="run scripts/rewrite/run_all_handlers.cmd") full.add_argument( "--check-flags", action="store_true", help="enforce strict oracle flag comparisons during full-handler run", ) coverage = sub.add_parser("coverage", help="run handler coverage report") coverage.add_argument( "--full", action="store_true", help="use full-handler vectors (oracle_vectors_full_handlers.json)", ) coverage.add_argument( "--vectors", type=Path, default=None, help="explicit vectors file path", ) micro = sub.add_parser("micro", help="run in-process instruction microtests") micro.add_argument("--check-flags", action="store_true", help="enable strict oracle flag checking") micro.add_argument( "--regen-oracle", action="store_true", help="regenerate oracle vectors before running", ) micro.add_argument("filter", nargs="*", help="optional test name filter tokens") flags = sub.add_parser( "flags", help="generate expanded flag-stress vectors and run strict microtests", ) flags.add_argument("filter", nargs="*", help="optional test name filter tokens") all_cmd = sub.add_parser("all", help="baseline + full-handler + full coverage") all_cmd.add_argument("--no-coverage", action="store_true", help="skip final coverage report") report_cmd = sub.add_parser("report", help="print handler test coverage report") report_cmd.add_argument("--json", action="store_true", help="output as JSON") report_cmd.add_argument("--vectors", type=Path, default=None, help="explicit vectors file") semantic = sub.add_parser("semantic", help="run runtime semantic regression for all samples") semantic.add_argument("--input-ir", type=Path, default=None, help="override IR file (single sample)") semantic.add_argument("filter", nargs="*", help="optional sample name filter tokens") vmp = sub.add_parser( "vmp", help="attempt local VMP target lifts (recommended for big control-flow/semantics changes)", ) vmp.add_argument("filter", nargs="*", help="optional VMP target name filter tokens") themida = sub.add_parser( "themida", help="run Themida devirtualization import-equivalence checks", ) themida.add_argument( "--update", action="store_true", help="regenerate required_imports from reference binaries", ) themida.add_argument("filter", nargs="*", help="optional sample name filter tokens") return parser.parse_args() def main() -> None: args = parse_args() command = args.command or "quick" if command == "baseline": run_baseline() return if command == "update-golden": _run_cmd(REWRITE_DIR / "run.cmd") update_golden(IR_OUTPUT_DIR, GOLDEN_HASHES_FILE) return if command == "negative": run_negative_checks() return if command == "micro": run_micro(args.filter, args.check_flags, args.regen_oracle) return if command == "full": run_full(args.check_flags) return if command == "coverage": if args.vectors is not None: vectors_file = _resolve_repo_path(args.vectors, "Coverage vectors path") elif args.full: vectors_file = FULL_VECTORS else: vectors_file = DEFAULT_VECTORS if not vectors_file.exists(): raise SystemExit(f"Vectors file does not exist: {vectors_file}") run_coverage(vectors_file) return if command == "report": if args.vectors is not None: vectors_file = _resolve_repo_path(args.vectors, "Report vectors path") else: vectors_file = DEFAULT_VECTORS if not vectors_file.exists(): raise SystemExit(f"Vectors file does not exist: {vectors_file}") run_report(vectors_file, args.json) return if command == "semantic": run_semantic(args.filter, args.input_ir) return if command == "vmp": run_vmp(args.filter) return if command == "themida": run_themida(args.filter, args.update) return if command == "flags": run_flagstress(args.filter) return if command == "all": run_baseline() run_semantic() run_full(check_flags=True) if not args.no_coverage: run_coverage(FULL_VECTORS) return if command == "quick": run_baseline() run_semantic() run_micro([], check_flags=True, regenerate_oracle=False) return raise SystemExit(f"Unknown command: {command}") if __name__ == "__main__": main()