""" Compute metrics from Gemini enterprise run results. No special dependencies — just numpy and sklearn. Usage: python compute_metrics.py --results gemini_results.json python compute_metrics.py --results gemini_results.json --output metrics.json """ import json import re import argparse import numpy as np from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score # ───────────────────────────────────────────────────────────── # Typology mapping (normalize LLM output → standard names) # ───────────────────────────────────────────────────────────── TYPOLOGY_MAP = { "fan-out": "fan-out", "fan out": "fan-out", "fanout": "fan-out", "fan-in": "fan-in", "fan in": "fan-in", "fanin": "fan-in", "scatter-gather": "scatter-gather", "scatter gather": "scatter-gather", "gather-scatter": "gather-scatter", "gather scatter": "gather-scatter", "cycle": "cycle", "circular": "cycle", "random": "random", "bipartite": "bipartite", "stack": "stack", "layering": "stack", "gather": "fan-in", "scatter": "fan-out", } def parse_response(text): """Parse LLM response into structured fields. Supports two formats: ICL-AML format (text): - Conclusion: Suspicious or Not Suspicious - Explanation: (2-3 sentences) - Observed Pattern: (e.g., gather-scatter) ZS format (JSON): {"illicit": true, "typology_program": "fan-in", "confidence": 0.8, ...} """ if not text: return {"illicit": False, "typology": None, "rationale": "", "parse_success": False} text = text.strip() # Try JSON first (ZS format) json_match = re.search(r'\{[^{}]*"illicit"\s*:.*?\}', text, re.DOTALL) if json_match: try: obj = json.loads(json_match.group()) typology = obj.get("typology_program") if typology and typology.lower() in ("null", "none", ""): typology = None if typology: typology = TYPOLOGY_MAP.get(typology.lower(), typology.lower()) return { "illicit": bool(obj.get("illicit", False)), "typology": typology, "rationale": obj.get("rationale", ""), "parse_success": True, } except (json.JSONDecodeError, KeyError): pass # Fall through to text parsing # Text format (ICL-AML) illicit = False conclusion_match = re.search( r"Conclusion:\s*(Suspicious|Not Suspicious)", text, re.IGNORECASE ) if conclusion_match: illicit = "not" not in conclusion_match.group(1).lower() # Parse Observed Pattern typology = None pattern_match = re.search( r"Observed Pattern:\s*(.+)", text, re.IGNORECASE ) if pattern_match: raw_pattern = pattern_match.group(1).strip().lower() raw_pattern = re.sub(r"[^a-z\s\-]", "", raw_pattern).strip() for key, val in TYPOLOGY_MAP.items(): if key in raw_pattern: typology = val break # Parse Explanation rationale = "" exp_match = re.search( r"Explanation:\s*(.+?)(?:\n-|\Z)", text, re.IGNORECASE | re.DOTALL ) if exp_match: rationale = exp_match.group(1).strip() return { "illicit": illicit, "typology": typology, "rationale": rationale, "parse_success": conclusion_match is not None, } def compute(args): # Load results with open(args.results, encoding="utf-8") as f: data = json.load(f) results = data["results"] model = data.get("model", "unknown") token_stats = data.get("token_stats", {}) print(f"Loaded {len(results)} predictions from {model}") if token_stats: print(f" Avg input: {token_stats.get('avg_input_tokens', 0):,.0f} tokens") print(f" Avg output: {token_stats.get('avg_output_tokens', 0):,.0f} tokens") # Parse and collect y_true, y_pred = [], [] typo_true, typo_pred = [], [] n_parsed, n_failed, n_errors = 0, 0, 0 parse_failures = [] for r in results: if r.get("error"): n_errors += 1 continue parsed = parse_response(r["raw_response"]) if not parsed["parse_success"]: n_failed += 1 parse_failures.append(r["case_id"]) continue n_parsed += 1 label = r["label"] y_true.append(label) y_pred.append(1 if parsed["illicit"] else 0) # Typology (only for ground-truth illicit cases) if label == 1 and r.get("typology_gt"): typo_true.append(r["typology_gt"]) typo_pred.append(parsed["typology"] or "none") if not y_true: print("ERROR: No valid predictions to evaluate.") return y_true = np.array(y_true) y_pred = np.array(y_pred) # Detection metrics det_f1 = f1_score(y_true, y_pred, pos_label=1, zero_division=0) det_p = precision_score(y_true, y_pred, pos_label=1, zero_division=0) det_r = recall_score(y_true, y_pred, pos_label=1, zero_division=0) det_acc = accuracy_score(y_true, y_pred) # Typology metrics if typo_true: typo_f1 = f1_score(typo_true, typo_pred, average="macro", zero_division=0) typo_acc = accuracy_score(typo_true, typo_pred) else: typo_f1 = typo_acc = 0.0 # Valid ratio total_attempted = n_parsed + n_failed + n_errors valid_ratio = n_parsed / total_attempted * 100 if total_attempted else 0 # Print print(f"\n{'='*55}") print(f" {model} — Results") print(f"{'='*55}") print(f" Total: {total_attempted}") print(f" Parsed: {n_parsed} ({valid_ratio:.1f}%)") print(f" Failed: {n_failed} (parse error)") print(f" Errors: {n_errors} (API error)") print(f"") print(f" Det-F1: {det_f1*100:.1f}%") print(f" Det-P: {det_p*100:.1f}%") print(f" Det-R: {det_r*100:.1f}%") print(f" Det-Acc: {det_acc*100:.1f}%") print(f"") print(f" Typ-F1: {typo_f1*100:.1f}%") print(f" Typ-Acc: {typo_acc*100:.1f}%") if token_stats: print(f"") print(f" Avg Input: {token_stats.get('avg_input_tokens', 0):,.0f}") print(f" Avg Output: {token_stats.get('avg_output_tokens', 0):,.0f}") print(f"{'='*55}") if parse_failures[:5]: print(f"\n First 5 parse failures: {parse_failures[:5]}") # Save metrics = { "model": model, "n_predictions": n_parsed, "n_failed": n_failed, "n_errors": n_errors, "valid_ratio": valid_ratio, "detection_f1": float(det_f1), "detection_precision": float(det_p), "detection_recall": float(det_r), "detection_accuracy": float(det_acc), "typology_macro_f1": float(typo_f1), "typology_accuracy": float(typo_acc), "avg_input_tokens": token_stats.get("avg_input_tokens", 0), "avg_output_tokens": token_stats.get("avg_output_tokens", 0), } output_file = args.output or args.results.replace(".json", "_metrics.json") with open(output_file, "w") as f: json.dump(metrics, f, indent=2) print(f"\nMetrics saved to {output_file}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Compute metrics from Gemini results") parser.add_argument("--results", required=True, help="Path to results JSON file") parser.add_argument("--output", default=None, help="Output metrics file (default: *_metrics.json)") args = parser.parse_args() compute(args)