Source code for thesis.workflows.atlas.qc_metrics

"""Reusable pure QC metrics for tractography atlas evaluation.

These helpers operate on in-memory subject masks or tractography volumes and do
not depend on workflow orchestration or reference-atlas comparisons.
"""

from __future__ import annotations

from typing import Optional

import numpy as np

from thesis.core.logging import get_logger

logger = get_logger(__name__)

__all__ = [
    "build_occupancy_map",
    "threshold_core_mask",
    "dice_score",
    "compute_subject_dice_against_core",
    "compute_leave_one_out_dice",
    "compute_map_coefficient_of_variation",
    "summarize_dice_scores",
]


def _validate_subject_stack(subject_masks: np.ndarray) -> np.ndarray:
    """Validate and coerce a subject-first voxel stack.

    Args:
        subject_masks: Array with shape ``(n_subjects, X, Y, Z[, ...])``.

    Returns:
        Boolean array with the same shape.

    Raises:
        ValueError: If fewer than two dimensions are provided or no subjects exist.
    """
    stack = np.asarray(subject_masks, dtype=bool)
    if stack.ndim < 2:
        raise ValueError("subject_masks must have shape (n_subjects, ...)")
    if stack.shape[0] < 1:
        raise ValueError("subject_masks must contain at least one subject")
    return stack


def _validate_threshold(threshold: float, *, name: str = "threshold") -> None:
    """Validate an inclusive probability-like threshold.

    Args:
        threshold: Threshold expected in the inclusive range [0.0, 1.0].
        name: Field name used in validation messages.

    Raises:
        ValueError: If the threshold is outside [0.0, 1.0].
    """
    if not 0.0 <= threshold <= 1.0:
        raise ValueError(f"{name} must be between 0.0 and 1.0 inclusive")


[docs] def build_occupancy_map(subject_masks: np.ndarray) -> np.ndarray: """Compute voxelwise occupancy fraction across subjects. Args: subject_masks: Boolean-like array of shape ``(n_subjects, X, Y, Z[, ...])``. Returns: Float occupancy map in the range [0.0, 1.0]. Raises: ValueError: If the input stack is empty or has invalid dimensionality. """ stack = _validate_subject_stack(subject_masks) result = np.mean(stack, axis=0, dtype=np.float32) return np.asarray(result)
[docs] def threshold_core_mask(occupancy_map: np.ndarray, threshold: float) -> np.ndarray: """Create a group core mask by thresholding occupancy. Args: occupancy_map: Voxelwise occupancy map with values typically in [0.0, 1.0]. threshold: Inclusive occupancy threshold for membership in the core mask. Returns: Boolean core mask. Raises: ValueError: If the threshold is outside [0.0, 1.0]. """ _validate_threshold(threshold) occupancy = np.asarray(occupancy_map, dtype=np.float32) return occupancy >= threshold
[docs] def dice_score(mask_a: np.ndarray, mask_b: np.ndarray, empty_value: float = 1.0) -> float: """Compute the Dice similarity coefficient between two masks. Args: mask_a: First boolean-like mask. mask_b: Second boolean-like mask with identical shape. empty_value: Value returned when both masks are empty. Returns: Dice coefficient in the range [0.0, 1.0]. Raises: ValueError: If mask shapes do not match. """ first = np.asarray(mask_a, dtype=bool) second = np.asarray(mask_b, dtype=bool) if first.shape != second.shape: raise ValueError("mask_a and mask_b must have identical shapes") first_sum = int(np.count_nonzero(first)) second_sum = int(np.count_nonzero(second)) total = first_sum + second_sum if total == 0: return empty_value intersection = int(np.count_nonzero(first & second)) return float((2.0 * intersection) / total)
[docs] def compute_subject_dice_against_core( subject_masks: np.ndarray, core_threshold: float, core_mask: Optional[np.ndarray] = None, ) -> np.ndarray: """Compute per-subject Dice scores against a group core mask. Args: subject_masks: Boolean-like array of shape ``(n_subjects, X, Y, Z[, ...])``. core_threshold: Inclusive occupancy threshold used to define the group core. Ignored when ``core_mask`` is supplied. core_mask: Optional precomputed group core mask of shape ``subject_masks.shape[1:]``. When provided, the occupancy map and core mask are not rebuilt, avoiding a duplicate full-stack reduction. Returns: One Dice score per subject. Raises: ValueError: If the input stack is invalid or the threshold is out of range. """ stack = _validate_subject_stack(subject_masks) if core_mask is None: core_mask = threshold_core_mask(build_occupancy_map(stack), core_threshold) return np.asarray( [dice_score(subject_mask, core_mask) for subject_mask in stack], dtype=np.float32, )
[docs] def compute_leave_one_out_dice( subject_masks: np.ndarray, core_threshold: float, minimum_subjects: int = 2, ) -> np.ndarray: """Compute leave-one-out Dice scores against cohort cores. For each subject, the comparison core is rebuilt from all remaining subjects. Args: subject_masks: Boolean-like array of shape ``(n_subjects, X, Y, Z[, ...])``. core_threshold: Inclusive occupancy threshold used to define each leave-one-out core. minimum_subjects: Minimum cohort size required for leave-one-out evaluation. Returns: One leave-one-out Dice score per subject. Raises: ValueError: If the input stack is invalid, the threshold is out of range, or there are fewer than ``minimum_subjects`` subjects. """ stack = _validate_subject_stack(subject_masks) if stack.shape[0] < minimum_subjects: raise ValueError( f"leave-one-out Dice requires at least {minimum_subjects} subjects; " f"got {stack.shape[0]}" ) scores = [] for index in range(stack.shape[0]): leave_out_stack = np.delete(stack, index, axis=0) core_mask = threshold_core_mask(build_occupancy_map(leave_out_stack), core_threshold) scores.append(dice_score(stack[index], core_mask)) return np.asarray(scores, dtype=np.float32)
[docs] def compute_map_coefficient_of_variation( value_maps: np.ndarray, mean_threshold: float = 0.0, ) -> np.ndarray: """Compute voxelwise coefficient of variation across subjects. Args: value_maps: Numeric array of shape ``(n_subjects, X, Y, Z[, ...])``. mean_threshold: Minimum mean required before reporting non-zero variation. Returns: Float voxelwise coefficient of variation map. Raises: ValueError: If the input stack is empty, has invalid dimensionality, or if ``mean_threshold`` is negative. """ if mean_threshold < 0.0: raise ValueError("mean_threshold must be non-negative") values = np.asarray(value_maps, dtype=np.float32) if values.ndim < 2: raise ValueError("value_maps must have shape (n_subjects, ...)") if values.shape[0] < 1: raise ValueError("value_maps must contain at least one subject") mean_map = np.mean(values, axis=0, dtype=np.float32) std_map = np.std(values, axis=0, dtype=np.float32) valid_mask = mean_map > mean_threshold safe_mean = np.where(valid_mask, mean_map, 1.0) return np.where(valid_mask, std_map / safe_mean, 0.0).astype(np.float32)
[docs] def summarize_dice_scores( dice_scores: np.ndarray, leave_one_out_scores: Optional[np.ndarray] = None, ) -> dict[str, float]: """Summarize Dice-based QC scores for lightweight reporting. Args: dice_scores: Per-subject Dice scores against the group core. leave_one_out_scores: Optional leave-one-out Dice scores. Returns: Dictionary with summary statistics suitable for logs or reports. Raises: ValueError: If any provided score array is empty. """ direct_scores = np.asarray(dice_scores, dtype=np.float32) if direct_scores.size == 0: raise ValueError("dice_scores must not be empty") summary: dict[str, float] = { "n_subjects": float(direct_scores.size), "dice_mean": float(np.mean(direct_scores)), "dice_std": float(np.std(direct_scores)), "dice_min": float(np.min(direct_scores)), "dice_max": float(np.max(direct_scores)), } if leave_one_out_scores is not None: loo_scores = np.asarray(leave_one_out_scores, dtype=np.float32) if loo_scores.size == 0: raise ValueError("leave_one_out_scores must not be empty when provided") summary.update( { "leave_one_out_mean": float(np.mean(loo_scores)), "leave_one_out_std": float(np.std(loo_scores)), "leave_one_out_min": float(np.min(loo_scores)), "leave_one_out_max": float(np.max(loo_scores)), } ) logger.debug("Summarized atlas QC Dice scores for {} subjects", direct_scores.size) return summary