Source code for thesis.workflows.qc.operations

"""QC workflow operations and helpers.

Contains the post-workflow QC hook, ROI discovery logic, and background
image resolution that support both the ``thesis run -w qc`` workflow
and the automatic post-HCP QC generation.
"""

from __future__ import annotations

import sys
from pathlib import Path
from typing import Callable, Dict, List, Optional, Union

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.logging import get_logger
from thesis.core.output import EventLevel, get_event_bus

logger = get_logger(__name__)

__all__ = [
    "discover_roi_files",
    "resolve_background_image",
    "run_post_workflow_qc",
    "run_qc_for_patient",
]


[docs] def discover_roi_files(tractography_dir: Union[str, Path]) -> Dict[str, Path]: """Discover ROI NIfTI masks from patient tractography output. Searches ``rois_merged/``, ``rois_transformed/``, and ``rois/`` (in that order), returning the first non-empty set found. Descends one level into per-atlas-source subdirectories (e.g. ``rois/main/``) so naming-convention runs with ``atlas_sources[].name != "atlas"`` are picked up. Args: tractography_dir: The ``<output_dir>/tractography/probtrackx2/`` directory. Returns: Mapping of ROI name (derived from filename, prefixed with source subdir when present) to :class:`Path`. """ from thesis.workflows.qc.statistics import discover_roi_dir_files _, nii_files = discover_roi_dir_files(Path(tractography_dir)) rois: Dict[str, Path] = {} for f in nii_files: name = f.name.replace(".nii.gz", "").replace(".nii", "") rois[name] = f return rois
[docs] def resolve_background_image( config: PipelineConfig, context: ProcessingContext, ) -> Optional[Path]: """Resolve the T1w background image for a patient. Uses the same resolution logic as the HCP workflow (``resolve_t1_path``) so paths that work for ``thesis run`` also work for QC. Args: config: Loaded pipeline config. context: Processing context (carries patient_id, input_dir, etc.). Returns: Path to the T1w image, or ``None`` if it cannot be resolved. """ try: from thesis.workflows.hcp.common import resolve_t1_path t1 = resolve_t1_path(config, context) if t1.exists(): return t1 except Exception: pass # Fallback 1: the registration sub-workflow writes <pid>_T1_brain.nii.gz # under <output_dir>/registration/. When running qc standalone after a # full_pipeline run, this is the natural QC background even though # hcp.t1_image is unset. output_dir = getattr(context, "output_dir", None) if output_dir is not None: reg_t1 = Path(output_dir) / "registration" / f"{context.patient_id}_T1_brain.nii.gz" if reg_t1.exists(): return reg_t1 # Fallback 2: try hcp.t1_image with manual substitution. raw = getattr(config.hcp, "t1_image", None) if not raw: return None if "{patient_id}" in raw: raw = raw.format(patient_id=context.patient_id) path = Path(raw) if path.is_absolute() and path.exists(): return path for base in [context.input_dir, context.data_dir, output_dir]: if base is not None: candidate: Path = Path(base) / raw if candidate.exists(): return candidate return path if path.exists() else None
def _resolve_patient_path( context: ProcessingContext, raw_value: Optional[object], predicate: Callable[[Path], bool], ) -> Optional[Path]: """Resolve a per-patient input path against ``raw_value``. Formats ``{patient_id}`` placeholders, then accepts the path either as an existing absolute path or as a path relative to ``context.input_dir``. ``predicate`` selects the validity check (e.g. :meth:`Path.exists` for files, :meth:`Path.is_dir` for directories). Args: context: Processing context (provides ``patient_id`` / ``input_dir``). raw_value: Raw config value (may be ``None``); coerced to ``str``. predicate: Callable returning ``True`` when a candidate path is valid. Returns: The resolved :class:`Path`, or ``None`` when it cannot be resolved. """ if not raw_value: return None try: from thesis.workflows.hcp.common import format_patient_path formatted = format_patient_path(str(raw_value), context.patient_id) except (ImportError, AttributeError, ValueError, KeyError): return None path = Path(formatted) if path.is_absolute() and predicate(path): return path if context.input_dir: candidate = Path(context.input_dir) / formatted if predicate(candidate): return candidate return None def _find_brain_mask( config: PipelineConfig, context: ProcessingContext, ) -> Optional[Path]: """Resolve the brain mask path for a patient.""" hcp = getattr(config, "hcp", None) return _resolve_patient_path(context, getattr(hcp, "brain_mask", None), Path.exists) def _find_warp_field( config: PipelineConfig, context: ProcessingContext, ) -> Optional[Path]: """Resolve the warp field (template-to-patient) path.""" transforms = getattr(config, "transforms", None) warp = getattr(transforms, "template_to_patient_warp", None) return _resolve_patient_path(context, warp, Path.exists) def _find_bedpostx_dir( config: PipelineConfig, context: ProcessingContext, ) -> Optional[Path]: """Resolve the BedpostX output directory.""" hcp = getattr(config, "hcp", None) return _resolve_patient_path(context, getattr(hcp, "bedpostx_dir", None), Path.is_dir)
[docs] def run_post_workflow_qc( cfg: PipelineConfig, ctx_obj: ProcessingContext, ) -> None: """Generate QC overlays after workflow completion if configured. This is a best-effort step — QC failures are logged as warnings but do not cause the pipeline run to fail. Intended to be called by the CLI after a successful ``thesis run`` when ``qc.generate_overlays`` is ``True``. Args: cfg: Loaded pipeline configuration. ctx_obj: Processing context for the patient run. """ qc_cfg = getattr(cfg, "qc", None) if qc_cfg is None or not getattr(qc_cfg, "generate_overlays", False): return patient_id = ctx_obj.patient_id patient_output = Path(ctx_obj.output_dir) # type: ignore[arg-type] bus = get_event_bus() tractography_relpath = getattr( getattr(cfg, "atlas", None), "tractography_relpath", "tractography/probtrackx2" ) tractography_dir = patient_output / tractography_relpath roi_files = discover_roi_files(tractography_dir) if not roi_files: logger.debug("No ROI files found for QC overlays, skipping") return bg_image = resolve_background_image(cfg, ctx_obj) if bg_image is None or not bg_image.exists(): logger.debug("Background image not found for QC overlays, skipping") return try: from thesis.workflows.qc.roi_overlay import generate_roi_overlays roi_qc_dir = patient_output / "qc" / "roi_overlays" pngs = generate_roi_overlays( roi_paths=roi_files, background_image=bg_image, output_dir=roi_qc_dir, ) bus.emit( f"Generated {len(pngs)} QC overlay(s) for {patient_id}", level=EventLevel.INFO, category="qc", patient_id=patient_id, ) except Exception as exc: logger.warning("QC overlay generation failed for {}: {}", patient_id, exc) bus.emit( f"QC overlay generation failed for {patient_id}: {exc}", level=EventLevel.WARNING, category="qc", patient_id=patient_id, )
[docs] def run_qc_for_patient( config: PipelineConfig, context: ProcessingContext, background: Optional[Path] = None, track_density: bool = True, stats: bool = True, ) -> List[str]: """Run full QC suite for a single patient. This is the core logic used by both the ``qc`` workflow's Nipype Function node and any future CLI invocation. Args: config: Loaded pipeline configuration. context: Processing context. background: Override background image. If ``None``, resolved from config. track_density: Generate track density overlay figures. stats: Print tractography statistics. Returns: List of output file paths (PNGs) generated. """ patient_out = Path(context.output_dir) # type: ignore[arg-type] tractography_relpath = getattr( getattr(config, "atlas", None), "tractography_relpath", "tractography/probtrackx2" ) base_tract_dir = patient_out / tractography_relpath # Detect hemisphere subdirectories (left/, right/) for both-separately runs hemisphere_dirs: List[Path] = [] for hemi in ("left", "right"): hemi_dir = base_tract_dir / hemi if hemi_dir.is_dir(): hemisphere_dirs.append(hemi_dir) if hemisphere_dirs: # Run QC for each hemisphere subdirectory independently all_generated: List[str] = [] for hemi_dir in hemisphere_dirs: hemi_label = hemi_dir.name logger.info("Running QC for hemisphere: {}", hemi_label) hemi_generated = _run_qc_for_tractography_dir( config=config, context=context, patient_out=patient_out, tractography_dir=hemi_dir, background=background, track_density=track_density, stats=stats, label=hemi_label, ) all_generated.extend(hemi_generated) return all_generated # Standard single-directory case if not base_tract_dir.is_dir(): raise FileNotFoundError( f"Patient tractography output not found: {base_tract_dir}. " "Run the matching tractography workflow first." ) return _run_qc_for_tractography_dir( config=config, context=context, patient_out=patient_out, tractography_dir=base_tract_dir, background=background, track_density=track_density, stats=stats, )
def _run_qc_for_tractography_dir( config: PipelineConfig, context: ProcessingContext, patient_out: Path, tractography_dir: Path, background: Optional[Path] = None, track_density: bool = True, stats: bool = True, label: str = "", ) -> List[str]: """Run QC suite for a single tractography output directory. Args: config: Loaded pipeline configuration. context: Processing context. patient_out: Patient output root directory. tractography_dir: Tractography output directory to analyse. background: Override background image. track_density: Generate track density overlay figures. stats: Print tractography statistics. label: Optional label for QC output sub-directories (e.g. ``"left"``). Returns: List of output file paths (PNGs) generated. """ from thesis.workflows.qc.roi_overlay import generate_roi_overlays from thesis.workflows.qc.statistics import ( collect_tractography_stats, format_stats_table, ) from thesis.workflows.qc.track_density import generate_track_density_figures # Discover ROIs roi_files = discover_roi_files(tractography_dir) if not roi_files: raise FileNotFoundError( f"No ROI mask files found in {tractography_dir}. " f"Expected NIfTI masks in rois_merged/, rois_transformed/, or rois/." ) # Resolve background image bg_image = background if bg_image is None: bg_image = resolve_background_image(config, context) if bg_image is None or not bg_image.exists(): raise FileNotFoundError( "Could not resolve background image from config (hcp.t1_image)." ) # Build QC output subdirectory suffix qc_suffix = f"_{label}" if label else "" generated: List[str] = [] if label: sys.stderr.write(f"\n=== QC: hemisphere '{label}' ===\n") sys.stderr.flush() # ROI overlays roi_qc_dir = patient_out / "qc" / f"roi_overlays{qc_suffix}" logger.info( "Generating ROI overlays for {}{}", context.patient_id, f" ({label})" if label else "" ) roi_pngs = generate_roi_overlays( roi_paths=roi_files, background_image=bg_image, output_dir=roi_qc_dir, ) generated.extend(str(p) for p in roi_pngs) # Track density figures if track_density: td_qc_dir = patient_out / "qc" / f"normtracks{qc_suffix}" thresholds = list(config.qc.track_density_thresholds) # Subject-space fdt_paths subject_fdt = tractography_dir / "fdt_paths.nii.gz" if subject_fdt.exists(): logger.info("Generating subject-space track density figures") try: td_pngs = generate_track_density_figures( fdt_paths=subject_fdt, template_image=bg_image, output_dir=td_qc_dir, thresholds=thresholds, prefix=f"subject_track_density{qc_suffix}", ) generated.extend(str(p) for p in td_pngs) except Exception as e: logger.warning("Subject-space track density failed: {}", e) # Template-space fdt_paths template_fdt = tractography_dir / "warped_streamlines" / "fdt_paths.nii.gz" if template_fdt.exists(): template_bg = config.transforms.template_reference_image if template_bg and Path(template_bg).exists(): logger.info("Generating template-space track density figures") try: td_pngs = generate_track_density_figures( fdt_paths=template_fdt, template_image=template_bg, output_dir=td_qc_dir, thresholds=thresholds, prefix=f"template_track_density{qc_suffix}", ) generated.extend(str(p) for p in td_pngs) except Exception as e: logger.warning("Template-space track density failed: {}", e) # Per-target connectivity maps try: from thesis.workflows.qc.checks import generate_connectivity_map_figures conn_qc_dir = patient_out / "qc" # Subject-space conn_pngs = generate_connectivity_map_figures( tractography_dir, bg_image, conn_qc_dir, space=f"subject{qc_suffix}" ) generated.extend(str(p) for p in conn_pngs) # Template-space warped_dir = tractography_dir / "warped_streamlines" if warped_dir.is_dir(): template_bg = config.transforms.template_reference_image if template_bg and Path(template_bg).exists(): conn_t_pngs = generate_connectivity_map_figures( warped_dir, template_bg, conn_qc_dir, space=f"template{qc_suffix}" ) generated.extend(str(p) for p in conn_t_pngs) except Exception as e: logger.warning("Connectivity map QC failed: {}", e) # SynthSeg segmentation overlay (only run once, not per hemisphere) if not label: try: from thesis.workflows.qc.checks import generate_synthseg_overlay ss_pngs = generate_synthseg_overlay(patient_out, bg_image, patient_out / "qc") generated.extend(str(p) for p in ss_pngs) except Exception as e: logger.warning("SynthSeg overlay failed: {}", e) # ROI transform comparison try: from thesis.workflows.qc.checks import generate_roi_transform_comparison comp_pngs, comp_data = generate_roi_transform_comparison( tractography_dir, bg_image, patient_out / "qc" ) generated.extend(str(p) for p in comp_pngs) if comp_data: header = f"\nROI Transform Comparison{f' ({label})' if label else ''}:\n" sys.stderr.write(header) for c in comp_data: flag = " [!]" if c["volume_ratio"] < 0.5 or c["volume_ratio"] > 1.5 else "" sys.stderr.write( f" {c['roi']:<25} vol_ratio={c['volume_ratio']:.3f} " f"shift={c['centroid_shift_mm']:.1f}mm{flag}\n" ) sys.stderr.flush() except Exception as e: logger.warning("ROI transform comparison failed: {}", e) # Waypoints validation try: from thesis.workflows.qc.checks import validate_waypoints_file wp_result = validate_waypoints_file(tractography_dir) if wp_result.get("missing"): sys.stderr.write(f"\n[WARN] {len(wp_result['missing'])} missing waypoint path(s):\n") for m in wp_result["missing"][:5]: sys.stderr.write(f" - {m}\n") sys.stderr.flush() if wp_result.get("dimension_mismatches"): sys.stderr.write( f"\n[WARN] {len(wp_result['dimension_mismatches'])} dimension mismatch(es)\n" ) sys.stderr.flush() except Exception as e: logger.warning("Waypoints validation failed: {}", e) # Brain mask overlay (only run once, not per hemisphere) if not label: try: from thesis.workflows.qc.checks import generate_brain_mask_overlay brain_mask = _find_brain_mask(config, context) if brain_mask and brain_mask.exists(): bm_pngs = generate_brain_mask_overlay(brain_mask, bg_image, patient_out / "qc") generated.extend(str(p) for p in bm_pngs) except Exception as e: logger.warning("Brain mask overlay failed: {}", e) # Template-space waytotal overlay if track_density: try: from thesis.workflows.qc.checks import generate_waytotal_overlay template_bg = config.transforms.template_reference_image if template_bg and Path(template_bg).exists(): wt_pngs = generate_waytotal_overlay( tractography_dir, template_bg, patient_out / "qc" ) generated.extend(str(p) for p in wt_pngs) except Exception as e: logger.warning("Waytotal overlay failed: {}", e) # Warp field Jacobian (only run once, not per hemisphere) if not label: try: from thesis.workflows.qc.checks import compute_jacobian_stats warp_field = _find_warp_field(config, context) if warp_field and warp_field.exists(): jac_stats = compute_jacobian_stats(warp_field, output_dir=patient_out / "qc") if jac_stats.get("negative_voxels", 0) > 0: sys.stderr.write( f"\n[WARN] Warp field has {jac_stats['negative_voxels']} " f"negative Jacobian voxels ({jac_stats['negative_fraction']:.4%})\n" ) sys.stderr.flush() except Exception as e: logger.warning("Jacobian analysis failed: {}", e) # BedpostX fibre quality (only run once, not per hemisphere) if not label: try: from thesis.workflows.qc.checks import generate_bedpostx_overlay bpx_dir = _find_bedpostx_dir(config, context) if bpx_dir and bpx_dir.is_dir(): bpx_pngs, bpx_stats = generate_bedpostx_overlay( bpx_dir, bg_image, patient_out / "qc" ) generated.extend(str(p) for p in bpx_pngs) if bpx_stats: sys.stderr.write( f"\nBedpostX f1: mean={bpx_stats['mean_f1']:.3f}, " f"low_f1_fraction={bpx_stats['low_f1_fraction']:.3f}\n" ) sys.stderr.flush() except Exception as e: logger.warning("BedpostX overlay failed: {}", e) # SynthSeg QC score check (only run once, not per hemisphere) if not label: try: from thesis.workflows.qc.checks import parse_synthseg_qc_csv threshold = config.qc.synthseg_qc_threshold ss_qc = parse_synthseg_qc_csv(patient_out, threshold=threshold) if ss_qc and not ss_qc.get("passed", True): sys.stderr.write( f"\n[WARN] SynthSeg QC score {ss_qc['mean_score']:.3f} " f"below threshold {threshold}\n" ) sys.stderr.flush() except Exception as e: logger.warning("SynthSeg QC parsing failed: {}", e) # SynthSeg volumes sanity check (only run once, not per hemisphere) if not label: try: from thesis.workflows.qc.checks import parse_synthseg_volumes_csv vol_result = parse_synthseg_volumes_csv(patient_out) if vol_result.get("warnings"): sys.stderr.write("\n[WARN] SynthSeg volume anomalies:\n") for w in vol_result["warnings"]: sys.stderr.write(f" - {w}\n") sys.stderr.flush() except Exception as e: logger.warning("SynthSeg volumes check failed: {}", e) # Statistics if stats: try: patient_stats = collect_tractography_stats(patient_out, patient_id=context.patient_id) table = format_stats_table([patient_stats], include_roi_counts=True) # Print in Function node context (loguru unavailable in Nipype workers) sys.stderr.write(f"\n{table}\n") sys.stderr.flush() except Exception as e: logger.warning("Statistics collection failed: {}", e) return generated