Last active
January 28, 2026 08:04
-
-
Save sepiabrown/73db4f809db1cd2786dd0f715439a4ff to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Prediction Analyzer Script for MVTec-style Prediction Results | |
| This script analyzes image predictions organized in folder structures, | |
| generates summary CSVs, confusion matrices, and performance metrics. | |
| Usage: | |
| python analyze_predictions.py --predictions-dir /path/to/predictions | |
| """ | |
| import argparse | |
| import csv | |
| import json | |
| import os | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import List, Optional, Tuple | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| # ============================================================================= | |
| # Data Structures | |
| # ============================================================================= | |
| @dataclass | |
| class ImageRecord: | |
| """Represents a single image prediction record.""" | |
| image_path: str # Relative path with forward slashes | |
| predicted_label: str # The predicted class (from folder structure) | |
| ground_truth_label: str # The actual class (from path structure) | |
| @dataclass | |
| class ConfusionMatrix: | |
| """Stores confusion matrix values.""" | |
| true_positive: int = 0 | |
| true_negative: int = 0 | |
| false_positive: int = 0 | |
| false_negative: int = 0 | |
| @dataclass | |
| class Metrics: | |
| """Calculated performance metrics.""" | |
| accuracy: float | |
| precision: float | |
| recall: float | |
| f1_score: float | |
| specificity: float | |
| # ============================================================================= | |
| # Path Utilities | |
| # ============================================================================= | |
| def normalize_path(path: str) -> str: | |
| """Convert path to use forward slashes for CSV consistency.""" | |
| return path.replace("\\", "/") | |
| def get_relative_path_without_original(full_path: Path, base_dir: Path) -> str: | |
| """ | |
| Get relative path from base directory, removing 'original/' prefix. | |
| Example: | |
| full_path: /predictions/Real/bottle/test/good/001.png | |
| base_dir: /predictions | |
| returns: Real/bottle/test/good/001.png | |
| """ | |
| try: | |
| rel_path = full_path.relative_to(base_dir) | |
| parts = rel_path.parts | |
| # Remove 'original' from the path if present | |
| if "original" in parts: | |
| idx = parts.index("original") | |
| # Reconstruct path without 'original' | |
| new_parts = parts[:idx] + parts[idx + 1 :] | |
| return normalize_path(str(Path(*new_parts))) | |
| return normalize_path(str(rel_path)) | |
| except ValueError: | |
| return normalize_path(str(full_path)) | |
| # ============================================================================= | |
| # Image Discovery | |
| # ============================================================================= | |
| def find_images_in_directory(directory: Path) -> List[Path]: | |
| """ | |
| Find all image files in a directory and its subdirectories. | |
| Supported formats: .png, .jpg, .jpeg, .bmp, .tiff, .tif | |
| """ | |
| image_extensions = {".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".tif"} | |
| images = [] | |
| if not directory.exists(): | |
| return images | |
| for root, _, files in os.walk(directory): | |
| for file in files: | |
| if Path(file).suffix.lower() in image_extensions: | |
| images.append(Path(root) / file) | |
| return sorted(images) | |
| def find_original_folders(base_dir: Path) -> List[Path]: | |
| """ | |
| Recursively find all 'original' folders within a directory. | |
| This handles nested structures like: | |
| {category}/test/{defect_type}/original/ | |
| {category}/original/ | |
| """ | |
| original_folders = [] | |
| if not base_dir.exists(): | |
| return original_folders | |
| for root, dirs, _ in os.walk(base_dir): | |
| for d in dirs: | |
| if d.lower() == "original": | |
| original_folders.append(Path(root) / d) | |
| return original_folders | |
| def discover_prediction_images( | |
| predictions_dir: Path, positive_folder: str, negative_folder: str | |
| ) -> Tuple[List[Path], List[Path]]: | |
| """ | |
| Discover images in the prediction folder structure. | |
| Supports both flat and nested structures: | |
| predictions_dir/ | |
| {positive_folder}/ | |
| {category}/ | |
| original/ # Flat structure | |
| test/{defect}/original/ # Nested structure | |
| {negative_folder}/ | |
| {category}/ | |
| original/ | |
| test/{defect}/original/ | |
| Returns: | |
| Tuple of (positive_images, negative_images) | |
| """ | |
| positive_dir = predictions_dir / positive_folder | |
| negative_dir = predictions_dir / negative_folder | |
| positive_images = [] | |
| negative_images = [] | |
| # Find all 'original' folders recursively in positive folder | |
| if positive_dir.exists(): | |
| original_folders = find_original_folders(positive_dir) | |
| for original_dir in original_folders: | |
| positive_images.extend(find_images_in_directory(original_dir)) | |
| # Find all 'original' folders recursively in negative folder | |
| if negative_dir.exists(): | |
| original_folders = find_original_folders(negative_dir) | |
| for original_dir in original_folders: | |
| negative_images.extend(find_images_in_directory(original_dir)) | |
| return positive_images, negative_images | |
| # ============================================================================= | |
| # Classification Logic | |
| # ============================================================================= | |
| def determine_ground_truth( | |
| image_path: Path, predictions_dir: Path, positive_folder: str, negative_folder: str | |
| ) -> str: | |
| """ | |
| Determine ground truth label based on folder structure. | |
| Ground truth is determined by whether the image is in a 'good' folder: | |
| - Images in 'good' folder -> negative_folder (e.g., "False") | |
| - Images NOT in 'good' folder -> positive_folder (e.g., "Real") | |
| """ | |
| try: | |
| rel_path = image_path.relative_to(predictions_dir) | |
| parts = [p.lower() for p in rel_path.parts] | |
| # Check if 'good' is in the path (case-insensitive) | |
| if "good" in parts: | |
| return negative_folder # "False" - normal/good | |
| else: | |
| return positive_folder # "Real" - anomaly/defective | |
| except ValueError: | |
| return "unknown" | |
| def create_image_records( | |
| positive_images: List[Path], | |
| negative_images: List[Path], | |
| predictions_dir: Path, | |
| positive_folder: str, | |
| negative_folder: str, | |
| ) -> List[ImageRecord]: | |
| """ | |
| Create ImageRecord objects for all discovered images. | |
| Args: | |
| positive_images: Images predicted as positive (anomaly/Real) | |
| negative_images: Images predicted as negative (normal/False) | |
| predictions_dir: Base directory for predictions | |
| positive_folder: Name of positive prediction folder | |
| negative_folder: Name of negative prediction folder | |
| Returns: | |
| List of ImageRecord objects | |
| """ | |
| records = [] | |
| # Process positive predictions | |
| for img_path in positive_images: | |
| rel_path = get_relative_path_without_original(img_path, predictions_dir) | |
| ground_truth = determine_ground_truth( | |
| img_path, predictions_dir, positive_folder, negative_folder | |
| ) | |
| records.append( | |
| ImageRecord( | |
| image_path=rel_path, | |
| predicted_label=positive_folder, | |
| ground_truth_label=ground_truth, | |
| ) | |
| ) | |
| # Process negative predictions | |
| for img_path in negative_images: | |
| rel_path = get_relative_path_without_original(img_path, predictions_dir) | |
| ground_truth = determine_ground_truth( | |
| img_path, predictions_dir, positive_folder, negative_folder | |
| ) | |
| records.append( | |
| ImageRecord( | |
| image_path=rel_path, | |
| predicted_label=negative_folder, | |
| ground_truth_label=ground_truth, | |
| ) | |
| ) | |
| return records | |
| # ============================================================================= | |
| # Confusion Matrix Calculation | |
| # ============================================================================= | |
| def filter_test_records(records: List[ImageRecord]) -> List[ImageRecord]: | |
| """Filter records to include only test folder images.""" | |
| return [r for r in records if "/test/" in r.image_path.lower()] | |
| def calculate_confusion_matrix( | |
| records: List[ImageRecord], positive_folder: str, negative_folder: str | |
| ) -> ConfusionMatrix: | |
| """ | |
| Calculate confusion matrix from image records. | |
| Positive class: anomaly/defective images (positive_folder, e.g., "Real") | |
| Negative class: good/normal images (negative_folder, e.g., "False") | |
| Predictions: | |
| - positive_folder (e.g., "Real") = predicted anomaly | |
| - negative_folder (e.g., "False") = predicted good | |
| Ground truth: | |
| - positive_folder (e.g., "Real") = actual anomaly | |
| - negative_folder (e.g., "False") = actual good | |
| """ | |
| cm = ConfusionMatrix() | |
| for record in records: | |
| predicted_positive = record.predicted_label == positive_folder | |
| actual_positive = record.ground_truth_label == positive_folder | |
| if predicted_positive and actual_positive: | |
| cm.true_positive += 1 | |
| elif not predicted_positive and not actual_positive: | |
| cm.true_negative += 1 | |
| elif predicted_positive and not actual_positive: | |
| cm.false_positive += 1 | |
| else: # not predicted_positive and actual_positive | |
| cm.false_negative += 1 | |
| return cm | |
| def calculate_metrics(cm: ConfusionMatrix) -> Metrics: | |
| """Calculate performance metrics from confusion matrix.""" | |
| total = cm.true_positive + cm.true_negative + cm.false_positive + cm.false_negative | |
| # Accuracy | |
| accuracy = (cm.true_positive + cm.true_negative) / total if total > 0 else 0.0 | |
| # Precision (PPV) | |
| precision_denom = cm.true_positive + cm.false_positive | |
| precision = cm.true_positive / precision_denom if precision_denom > 0 else 0.0 | |
| # Recall (Sensitivity, TPR) | |
| recall_denom = cm.true_positive + cm.false_negative | |
| recall = cm.true_positive / recall_denom if recall_denom > 0 else 0.0 | |
| # F1 Score | |
| f1_denom = precision + recall | |
| f1_score = 2 * (precision * recall) / f1_denom if f1_denom > 0 else 0.0 | |
| # Specificity (TNR) | |
| specificity_denom = cm.true_negative + cm.false_positive | |
| specificity = cm.true_negative / specificity_denom if specificity_denom > 0 else 0.0 | |
| return Metrics( | |
| accuracy=accuracy, | |
| precision=precision, | |
| recall=recall, | |
| f1_score=f1_score, | |
| specificity=specificity, | |
| ) | |
| # ============================================================================= | |
| # Output Generation | |
| # ============================================================================= | |
| def write_csv(records: List[ImageRecord], output_path: Path) -> None: | |
| """Write image records to CSV file.""" | |
| with open(output_path, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["image_path", "predicted_label", "ground_truth_label"]) | |
| for record in records: | |
| writer.writerow( | |
| [record.image_path, record.predicted_label, record.ground_truth_label] | |
| ) | |
| print(f"CSV saved: {output_path}") | |
| def plot_confusion_matrix( | |
| cm: ConfusionMatrix, | |
| metrics: Metrics, | |
| output_path: Path, | |
| positive_folder: str, | |
| negative_folder: str, | |
| ) -> None: | |
| """Generate and save confusion matrix visualization. | |
| Layout (standard format with positive class first): | |
| - Top-left: TP (True Positive) - Actual Real, Pred Real | |
| - Top-right: FN (False Negative) - Actual Real, Pred False | |
| - Bottom-left: FP (False Positive) - Actual False, Pred Real | |
| - Bottom-right: TN (True Negative) - Actual False, Pred False | |
| """ | |
| # Create matrix array with TP in top-left | |
| # Row 0: Actual Positive (Real/Anomaly) | |
| # Row 1: Actual Negative (False/Good) | |
| # Col 0: Pred Positive (Real) | |
| # Col 1: Pred Negative (False) | |
| matrix = np.array( | |
| [[cm.true_positive, cm.false_negative], [cm.false_positive, cm.true_negative]] | |
| ) | |
| # Create figure | |
| fig, ax = plt.subplots(figsize=(8, 6)) | |
| # Plot heatmap | |
| im = ax.imshow(matrix, cmap="Blues") | |
| # Add colorbar | |
| cbar = ax.figure.colorbar(im, ax=ax) | |
| cbar.ax.set_ylabel("Count", rotation=-90, va="bottom") | |
| # Set labels - use folder names for consistency | |
| ax.set_xticks([0, 1]) | |
| ax.set_yticks([0, 1]) | |
| ax.set_xticklabels([f"Pred: {positive_folder}", f"Pred: {negative_folder}"]) | |
| ax.set_yticklabels([f"Actual: {positive_folder}", f"Actual: {negative_folder}"]) | |
| # Add text annotations | |
| for i in range(2): | |
| for j in range(2): | |
| value = matrix[i, j] | |
| text_color = "white" if value > matrix.max() / 2 else "black" | |
| ax.text( | |
| j, | |
| i, | |
| str(value), | |
| ha="center", | |
| va="center", | |
| color=text_color, | |
| fontsize=14, | |
| fontweight="bold", | |
| ) | |
| # Add title with metrics | |
| title = ( | |
| f"Confusion Matrix\n" | |
| f"Accuracy: {metrics.accuracy:.3f} | " | |
| f"Precision: {metrics.precision:.3f} | " | |
| f"Recall: {metrics.recall:.3f} | " | |
| f"F1: {metrics.f1_score:.3f}" | |
| ) | |
| ax.set_title(title, fontsize=10) | |
| # Add labels for quadrants (TP top-left, FN top-right, FP bottom-left, TN bottom-right) | |
| ax.text( | |
| 0, -0.3, "TP", ha="center", transform=ax.transData, fontsize=9, style="italic" | |
| ) | |
| ax.text( | |
| 1, -0.3, "FN", ha="center", transform=ax.transData, fontsize=9, style="italic" | |
| ) | |
| ax.text( | |
| 0, 1.3, "FP", ha="center", transform=ax.transData, fontsize=9, style="italic" | |
| ) | |
| ax.text( | |
| 1, 1.3, "TN", ha="center", transform=ax.transData, fontsize=9, style="italic" | |
| ) | |
| plt.xlabel("Predicted Label") | |
| plt.ylabel("Actual Label") | |
| plt.tight_layout() | |
| # Save figure | |
| plt.savefig(output_path, dpi=150, bbox_inches="tight") | |
| plt.close() | |
| print(f"Confusion matrix plot saved: {output_path}") | |
| def write_metrics_json( | |
| cm: ConfusionMatrix, | |
| metrics: Metrics, | |
| output_path: Path, | |
| total_images: int, | |
| test_images: int, | |
| ) -> None: | |
| """Write confusion matrix and metrics to JSON file.""" | |
| data = { | |
| "confusion_matrix": { | |
| "true_positive": cm.true_positive, | |
| "true_negative": cm.true_negative, | |
| "false_positive": cm.false_positive, | |
| "false_negative": cm.false_negative, | |
| }, | |
| "metrics": { | |
| "accuracy": round(metrics.accuracy, 4), | |
| "precision": round(metrics.precision, 4), | |
| "recall": round(metrics.recall, 4), | |
| "f1_score": round(metrics.f1_score, 4), | |
| "specificity": round(metrics.specificity, 4), | |
| }, | |
| "summary": { | |
| "total_images": total_images, | |
| "test_images_used_for_matrix": test_images, | |
| }, | |
| } | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2) | |
| print(f"Metrics JSON saved: {output_path}") | |
| # ============================================================================= | |
| # Main Entry Point | |
| # ============================================================================= | |
| def main(): | |
| """Main entry point for the prediction analyzer.""" | |
| parser = argparse.ArgumentParser( | |
| description="Analyze MVTec-style prediction results and generate metrics.", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python analyze_predictions.py --predictions-dir ./predictions | |
| python analyze_predictions.py --predictions-dir ./results --output-dir ./analysis | |
| python analyze_predictions.py --positive-folder Real --negative-folder False | |
| """, | |
| ) | |
| parser.add_argument( | |
| "--predictions-dir", | |
| type=str, | |
| default=".", | |
| help="Path to predictions directory (default: current directory)", | |
| ) | |
| parser.add_argument( | |
| "--output-dir", | |
| type=str, | |
| default=None, | |
| help="Path for output files (default: same as predictions-dir)", | |
| ) | |
| parser.add_argument( | |
| "--positive-folder", | |
| type=str, | |
| default="Real", | |
| help="Name of positive/anomaly prediction folder (default: Real)", | |
| ) | |
| parser.add_argument( | |
| "--negative-folder", | |
| type=str, | |
| default="False", | |
| help="Name of negative/good prediction folder (default: False)", | |
| ) | |
| args = parser.parse_args() | |
| # Set up paths | |
| predictions_dir = Path(args.predictions_dir).resolve() | |
| output_dir = Path(args.output_dir).resolve() if args.output_dir else predictions_dir | |
| # Create output directory if needed | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| print(f"Predictions directory: {predictions_dir}") | |
| print(f"Output directory: {output_dir}") | |
| print(f"Positive folder: {args.positive_folder}") | |
| print(f"Negative folder: {args.negative_folder}") | |
| print("-" * 50) | |
| # Discover images | |
| print("Discovering images...") | |
| positive_images, negative_images = discover_prediction_images( | |
| predictions_dir, args.positive_folder, args.negative_folder | |
| ) | |
| print(f"Found {len(positive_images)} images in {args.positive_folder}/") | |
| print(f"Found {len(negative_images)} images in {args.negative_folder}/") | |
| if not positive_images and not negative_images: | |
| print("\nNo images found! Please check:") | |
| print( | |
| f" - Directory structure has {args.positive_folder}/ and {args.negative_folder}/ folders" | |
| ) | |
| print(" - Each category has an 'original/' subfolder containing images") | |
| return 1 | |
| # Create image records | |
| print("\nCreating image records...") | |
| records = create_image_records( | |
| positive_images, | |
| negative_images, | |
| predictions_dir, | |
| args.positive_folder, | |
| args.negative_folder, | |
| ) | |
| print(f"Total records: {len(records)}") | |
| # Write CSV | |
| csv_path = output_dir / "predictions_summary.csv" | |
| write_csv(records, csv_path) | |
| # Filter for test images and calculate confusion matrix | |
| print("\nCalculating confusion matrix (test images only)...") | |
| test_records = filter_test_records(records) | |
| print(f"Test images: {len(test_records)}") | |
| if not test_records: | |
| print("\nWarning: No test images found for confusion matrix calculation.") | |
| print("Confusion matrix will be generated with all images instead.") | |
| test_records = records | |
| cm = calculate_confusion_matrix( | |
| test_records, args.positive_folder, args.negative_folder | |
| ) | |
| metrics = calculate_metrics(cm) | |
| # Print confusion matrix | |
| print("\nConfusion Matrix:") | |
| print(f" True Positives: {cm.true_positive}") | |
| print(f" True Negatives: {cm.true_negative}") | |
| print(f" False Positives: {cm.false_positive}") | |
| print(f" False Negatives: {cm.false_negative}") | |
| print("\nMetrics:") | |
| print(f" Accuracy: {metrics.accuracy:.4f}") | |
| print(f" Precision: {metrics.precision:.4f}") | |
| print(f" Recall: {metrics.recall:.4f}") | |
| print(f" F1 Score: {metrics.f1_score:.4f}") | |
| print(f" Specificity: {metrics.specificity:.4f}") | |
| # Generate outputs | |
| print("\nGenerating outputs...") | |
| # Confusion matrix plot | |
| plot_path = output_dir / "confusion_matrix.png" | |
| plot_confusion_matrix( | |
| cm, metrics, plot_path, args.positive_folder, args.negative_folder | |
| ) | |
| # Metrics JSON | |
| json_path = output_dir / "confusion_matrix.json" | |
| write_metrics_json(cm, metrics, json_path, len(records), len(test_records)) | |
| print("\n" + "=" * 50) | |
| print("Analysis complete!") | |
| print(f" - {csv_path.name}") | |
| print(f" - {plot_path.name}") | |
| print(f" - {json_path.name}") | |
| return 0 | |
| if __name__ == "__main__": | |
| exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment