"""QC workflow operations and helpers.
Contains the post-workflow QC hook, ROI discovery logic, and background
image resolution that support both the ``thesis run -w qc`` workflow
and the automatic post-HCP QC generation.
"""
from __future__ import annotations
import sys
from pathlib import Path
from typing import Callable, Dict, List, Optional, Union
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.logging import get_logger
from thesis.core.output import EventLevel, get_event_bus
logger = get_logger(__name__)
__all__ = [
"discover_roi_files",
"resolve_background_image",
"run_post_workflow_qc",
"run_qc_for_patient",
]
[docs]
def discover_roi_files(tractography_dir: Union[str, Path]) -> Dict[str, Path]:
"""Discover ROI NIfTI masks from patient tractography output.
Searches ``rois_merged/``, ``rois_transformed/``, and ``rois/``
(in that order), returning the first non-empty set found. Descends
one level into per-atlas-source subdirectories (e.g. ``rois/main/``)
so naming-convention runs with ``atlas_sources[].name != "atlas"``
are picked up.
Args:
tractography_dir: The ``<output_dir>/tractography/probtrackx2/`` directory.
Returns:
Mapping of ROI name (derived from filename, prefixed with source
subdir when present) to :class:`Path`.
"""
from thesis.workflows.qc.statistics import discover_roi_dir_files
_, nii_files = discover_roi_dir_files(Path(tractography_dir))
rois: Dict[str, Path] = {}
for f in nii_files:
name = f.name.replace(".nii.gz", "").replace(".nii", "")
rois[name] = f
return rois
[docs]
def resolve_background_image(
config: PipelineConfig,
context: ProcessingContext,
) -> Optional[Path]:
"""Resolve the T1w background image for a patient.
Uses the same resolution logic as the HCP workflow
(``resolve_t1_path``) so paths that work for ``thesis run``
also work for QC.
Args:
config: Loaded pipeline config.
context: Processing context (carries patient_id, input_dir, etc.).
Returns:
Path to the T1w image, or ``None`` if it cannot be resolved.
"""
try:
from thesis.workflows.hcp.common import resolve_t1_path
t1 = resolve_t1_path(config, context)
if t1.exists():
return t1
except Exception:
pass
# Fallback 1: the registration sub-workflow writes <pid>_T1_brain.nii.gz
# under <output_dir>/registration/. When running qc standalone after a
# full_pipeline run, this is the natural QC background even though
# hcp.t1_image is unset.
output_dir = getattr(context, "output_dir", None)
if output_dir is not None:
reg_t1 = Path(output_dir) / "registration" / f"{context.patient_id}_T1_brain.nii.gz"
if reg_t1.exists():
return reg_t1
# Fallback 2: try hcp.t1_image with manual substitution.
raw = getattr(config.hcp, "t1_image", None)
if not raw:
return None
if "{patient_id}" in raw:
raw = raw.format(patient_id=context.patient_id)
path = Path(raw)
if path.is_absolute() and path.exists():
return path
for base in [context.input_dir, context.data_dir, output_dir]:
if base is not None:
candidate: Path = Path(base) / raw
if candidate.exists():
return candidate
return path if path.exists() else None
def _resolve_patient_path(
context: ProcessingContext,
raw_value: Optional[object],
predicate: Callable[[Path], bool],
) -> Optional[Path]:
"""Resolve a per-patient input path against ``raw_value``.
Formats ``{patient_id}`` placeholders, then accepts the path either as
an existing absolute path or as a path relative to ``context.input_dir``.
``predicate`` selects the validity check (e.g. :meth:`Path.exists` for
files, :meth:`Path.is_dir` for directories).
Args:
context: Processing context (provides ``patient_id`` / ``input_dir``).
raw_value: Raw config value (may be ``None``); coerced to ``str``.
predicate: Callable returning ``True`` when a candidate path is valid.
Returns:
The resolved :class:`Path`, or ``None`` when it cannot be resolved.
"""
if not raw_value:
return None
try:
from thesis.workflows.hcp.common import format_patient_path
formatted = format_patient_path(str(raw_value), context.patient_id)
except (ImportError, AttributeError, ValueError, KeyError):
return None
path = Path(formatted)
if path.is_absolute() and predicate(path):
return path
if context.input_dir:
candidate = Path(context.input_dir) / formatted
if predicate(candidate):
return candidate
return None
def _find_brain_mask(
config: PipelineConfig,
context: ProcessingContext,
) -> Optional[Path]:
"""Resolve the brain mask path for a patient."""
hcp = getattr(config, "hcp", None)
return _resolve_patient_path(context, getattr(hcp, "brain_mask", None), Path.exists)
def _find_warp_field(
config: PipelineConfig,
context: ProcessingContext,
) -> Optional[Path]:
"""Resolve the warp field (template-to-patient) path."""
transforms = getattr(config, "transforms", None)
warp = getattr(transforms, "template_to_patient_warp", None)
return _resolve_patient_path(context, warp, Path.exists)
def _find_bedpostx_dir(
config: PipelineConfig,
context: ProcessingContext,
) -> Optional[Path]:
"""Resolve the BedpostX output directory."""
hcp = getattr(config, "hcp", None)
return _resolve_patient_path(context, getattr(hcp, "bedpostx_dir", None), Path.is_dir)
[docs]
def run_post_workflow_qc(
cfg: PipelineConfig,
ctx_obj: ProcessingContext,
) -> None:
"""Generate QC overlays after workflow completion if configured.
This is a best-effort step — QC failures are logged as warnings
but do not cause the pipeline run to fail.
Intended to be called by the CLI after a successful ``thesis run``
when ``qc.generate_overlays`` is ``True``.
Args:
cfg: Loaded pipeline configuration.
ctx_obj: Processing context for the patient run.
"""
qc_cfg = getattr(cfg, "qc", None)
if qc_cfg is None or not getattr(qc_cfg, "generate_overlays", False):
return
patient_id = ctx_obj.patient_id
patient_output = Path(ctx_obj.output_dir) # type: ignore[arg-type]
bus = get_event_bus()
tractography_relpath = getattr(
getattr(cfg, "atlas", None), "tractography_relpath", "tractography/probtrackx2"
)
tractography_dir = patient_output / tractography_relpath
roi_files = discover_roi_files(tractography_dir)
if not roi_files:
logger.debug("No ROI files found for QC overlays, skipping")
return
bg_image = resolve_background_image(cfg, ctx_obj)
if bg_image is None or not bg_image.exists():
logger.debug("Background image not found for QC overlays, skipping")
return
try:
from thesis.workflows.qc.roi_overlay import generate_roi_overlays
roi_qc_dir = patient_output / "qc" / "roi_overlays"
pngs = generate_roi_overlays(
roi_paths=roi_files,
background_image=bg_image,
output_dir=roi_qc_dir,
)
bus.emit(
f"Generated {len(pngs)} QC overlay(s) for {patient_id}",
level=EventLevel.INFO,
category="qc",
patient_id=patient_id,
)
except Exception as exc:
logger.warning("QC overlay generation failed for {}: {}", patient_id, exc)
bus.emit(
f"QC overlay generation failed for {patient_id}: {exc}",
level=EventLevel.WARNING,
category="qc",
patient_id=patient_id,
)
[docs]
def run_qc_for_patient(
config: PipelineConfig,
context: ProcessingContext,
background: Optional[Path] = None,
track_density: bool = True,
stats: bool = True,
) -> List[str]:
"""Run full QC suite for a single patient.
This is the core logic used by both the ``qc`` workflow's Nipype
Function node and any future CLI invocation.
Args:
config: Loaded pipeline configuration.
context: Processing context.
background: Override background image. If ``None``, resolved
from config.
track_density: Generate track density overlay figures.
stats: Print tractography statistics.
Returns:
List of output file paths (PNGs) generated.
"""
patient_out = Path(context.output_dir) # type: ignore[arg-type]
tractography_relpath = getattr(
getattr(config, "atlas", None), "tractography_relpath", "tractography/probtrackx2"
)
base_tract_dir = patient_out / tractography_relpath
# Detect hemisphere subdirectories (left/, right/) for both-separately runs
hemisphere_dirs: List[Path] = []
for hemi in ("left", "right"):
hemi_dir = base_tract_dir / hemi
if hemi_dir.is_dir():
hemisphere_dirs.append(hemi_dir)
if hemisphere_dirs:
# Run QC for each hemisphere subdirectory independently
all_generated: List[str] = []
for hemi_dir in hemisphere_dirs:
hemi_label = hemi_dir.name
logger.info("Running QC for hemisphere: {}", hemi_label)
hemi_generated = _run_qc_for_tractography_dir(
config=config,
context=context,
patient_out=patient_out,
tractography_dir=hemi_dir,
background=background,
track_density=track_density,
stats=stats,
label=hemi_label,
)
all_generated.extend(hemi_generated)
return all_generated
# Standard single-directory case
if not base_tract_dir.is_dir():
raise FileNotFoundError(
f"Patient tractography output not found: {base_tract_dir}. "
"Run the matching tractography workflow first."
)
return _run_qc_for_tractography_dir(
config=config,
context=context,
patient_out=patient_out,
tractography_dir=base_tract_dir,
background=background,
track_density=track_density,
stats=stats,
)
def _run_qc_for_tractography_dir(
config: PipelineConfig,
context: ProcessingContext,
patient_out: Path,
tractography_dir: Path,
background: Optional[Path] = None,
track_density: bool = True,
stats: bool = True,
label: str = "",
) -> List[str]:
"""Run QC suite for a single tractography output directory.
Args:
config: Loaded pipeline configuration.
context: Processing context.
patient_out: Patient output root directory.
tractography_dir: Tractography output directory to analyse.
background: Override background image.
track_density: Generate track density overlay figures.
stats: Print tractography statistics.
label: Optional label for QC output sub-directories (e.g. ``"left"``).
Returns:
List of output file paths (PNGs) generated.
"""
from thesis.workflows.qc.roi_overlay import generate_roi_overlays
from thesis.workflows.qc.statistics import (
collect_tractography_stats,
format_stats_table,
)
from thesis.workflows.qc.track_density import generate_track_density_figures
# Discover ROIs
roi_files = discover_roi_files(tractography_dir)
if not roi_files:
raise FileNotFoundError(
f"No ROI mask files found in {tractography_dir}. "
f"Expected NIfTI masks in rois_merged/, rois_transformed/, or rois/."
)
# Resolve background image
bg_image = background
if bg_image is None:
bg_image = resolve_background_image(config, context)
if bg_image is None or not bg_image.exists():
raise FileNotFoundError(
"Could not resolve background image from config (hcp.t1_image)."
)
# Build QC output subdirectory suffix
qc_suffix = f"_{label}" if label else ""
generated: List[str] = []
if label:
sys.stderr.write(f"\n=== QC: hemisphere '{label}' ===\n")
sys.stderr.flush()
# ROI overlays
roi_qc_dir = patient_out / "qc" / f"roi_overlays{qc_suffix}"
logger.info(
"Generating ROI overlays for {}{}", context.patient_id, f" ({label})" if label else ""
)
roi_pngs = generate_roi_overlays(
roi_paths=roi_files,
background_image=bg_image,
output_dir=roi_qc_dir,
)
generated.extend(str(p) for p in roi_pngs)
# Track density figures
if track_density:
td_qc_dir = patient_out / "qc" / f"normtracks{qc_suffix}"
thresholds = list(config.qc.track_density_thresholds)
# Subject-space fdt_paths
subject_fdt = tractography_dir / "fdt_paths.nii.gz"
if subject_fdt.exists():
logger.info("Generating subject-space track density figures")
try:
td_pngs = generate_track_density_figures(
fdt_paths=subject_fdt,
template_image=bg_image,
output_dir=td_qc_dir,
thresholds=thresholds,
prefix=f"subject_track_density{qc_suffix}",
)
generated.extend(str(p) for p in td_pngs)
except Exception as e:
logger.warning("Subject-space track density failed: {}", e)
# Template-space fdt_paths
template_fdt = tractography_dir / "warped_streamlines" / "fdt_paths.nii.gz"
if template_fdt.exists():
template_bg = config.transforms.template_reference_image
if template_bg and Path(template_bg).exists():
logger.info("Generating template-space track density figures")
try:
td_pngs = generate_track_density_figures(
fdt_paths=template_fdt,
template_image=template_bg,
output_dir=td_qc_dir,
thresholds=thresholds,
prefix=f"template_track_density{qc_suffix}",
)
generated.extend(str(p) for p in td_pngs)
except Exception as e:
logger.warning("Template-space track density failed: {}", e)
# Per-target connectivity maps
try:
from thesis.workflows.qc.checks import generate_connectivity_map_figures
conn_qc_dir = patient_out / "qc"
# Subject-space
conn_pngs = generate_connectivity_map_figures(
tractography_dir, bg_image, conn_qc_dir, space=f"subject{qc_suffix}"
)
generated.extend(str(p) for p in conn_pngs)
# Template-space
warped_dir = tractography_dir / "warped_streamlines"
if warped_dir.is_dir():
template_bg = config.transforms.template_reference_image
if template_bg and Path(template_bg).exists():
conn_t_pngs = generate_connectivity_map_figures(
warped_dir, template_bg, conn_qc_dir, space=f"template{qc_suffix}"
)
generated.extend(str(p) for p in conn_t_pngs)
except Exception as e:
logger.warning("Connectivity map QC failed: {}", e)
# SynthSeg segmentation overlay (only run once, not per hemisphere)
if not label:
try:
from thesis.workflows.qc.checks import generate_synthseg_overlay
ss_pngs = generate_synthseg_overlay(patient_out, bg_image, patient_out / "qc")
generated.extend(str(p) for p in ss_pngs)
except Exception as e:
logger.warning("SynthSeg overlay failed: {}", e)
# ROI transform comparison
try:
from thesis.workflows.qc.checks import generate_roi_transform_comparison
comp_pngs, comp_data = generate_roi_transform_comparison(
tractography_dir, bg_image, patient_out / "qc"
)
generated.extend(str(p) for p in comp_pngs)
if comp_data:
header = f"\nROI Transform Comparison{f' ({label})' if label else ''}:\n"
sys.stderr.write(header)
for c in comp_data:
flag = " [!]" if c["volume_ratio"] < 0.5 or c["volume_ratio"] > 1.5 else ""
sys.stderr.write(
f" {c['roi']:<25} vol_ratio={c['volume_ratio']:.3f} "
f"shift={c['centroid_shift_mm']:.1f}mm{flag}\n"
)
sys.stderr.flush()
except Exception as e:
logger.warning("ROI transform comparison failed: {}", e)
# Waypoints validation
try:
from thesis.workflows.qc.checks import validate_waypoints_file
wp_result = validate_waypoints_file(tractography_dir)
if wp_result.get("missing"):
sys.stderr.write(f"\n[WARN] {len(wp_result['missing'])} missing waypoint path(s):\n")
for m in wp_result["missing"][:5]:
sys.stderr.write(f" - {m}\n")
sys.stderr.flush()
if wp_result.get("dimension_mismatches"):
sys.stderr.write(
f"\n[WARN] {len(wp_result['dimension_mismatches'])} dimension mismatch(es)\n"
)
sys.stderr.flush()
except Exception as e:
logger.warning("Waypoints validation failed: {}", e)
# Brain mask overlay (only run once, not per hemisphere)
if not label:
try:
from thesis.workflows.qc.checks import generate_brain_mask_overlay
brain_mask = _find_brain_mask(config, context)
if brain_mask and brain_mask.exists():
bm_pngs = generate_brain_mask_overlay(brain_mask, bg_image, patient_out / "qc")
generated.extend(str(p) for p in bm_pngs)
except Exception as e:
logger.warning("Brain mask overlay failed: {}", e)
# Template-space waytotal overlay
if track_density:
try:
from thesis.workflows.qc.checks import generate_waytotal_overlay
template_bg = config.transforms.template_reference_image
if template_bg and Path(template_bg).exists():
wt_pngs = generate_waytotal_overlay(
tractography_dir, template_bg, patient_out / "qc"
)
generated.extend(str(p) for p in wt_pngs)
except Exception as e:
logger.warning("Waytotal overlay failed: {}", e)
# Warp field Jacobian (only run once, not per hemisphere)
if not label:
try:
from thesis.workflows.qc.checks import compute_jacobian_stats
warp_field = _find_warp_field(config, context)
if warp_field and warp_field.exists():
jac_stats = compute_jacobian_stats(warp_field, output_dir=patient_out / "qc")
if jac_stats.get("negative_voxels", 0) > 0:
sys.stderr.write(
f"\n[WARN] Warp field has {jac_stats['negative_voxels']} "
f"negative Jacobian voxels ({jac_stats['negative_fraction']:.4%})\n"
)
sys.stderr.flush()
except Exception as e:
logger.warning("Jacobian analysis failed: {}", e)
# BedpostX fibre quality (only run once, not per hemisphere)
if not label:
try:
from thesis.workflows.qc.checks import generate_bedpostx_overlay
bpx_dir = _find_bedpostx_dir(config, context)
if bpx_dir and bpx_dir.is_dir():
bpx_pngs, bpx_stats = generate_bedpostx_overlay(
bpx_dir, bg_image, patient_out / "qc"
)
generated.extend(str(p) for p in bpx_pngs)
if bpx_stats:
sys.stderr.write(
f"\nBedpostX f1: mean={bpx_stats['mean_f1']:.3f}, "
f"low_f1_fraction={bpx_stats['low_f1_fraction']:.3f}\n"
)
sys.stderr.flush()
except Exception as e:
logger.warning("BedpostX overlay failed: {}", e)
# SynthSeg QC score check (only run once, not per hemisphere)
if not label:
try:
from thesis.workflows.qc.checks import parse_synthseg_qc_csv
threshold = config.qc.synthseg_qc_threshold
ss_qc = parse_synthseg_qc_csv(patient_out, threshold=threshold)
if ss_qc and not ss_qc.get("passed", True):
sys.stderr.write(
f"\n[WARN] SynthSeg QC score {ss_qc['mean_score']:.3f} "
f"below threshold {threshold}\n"
)
sys.stderr.flush()
except Exception as e:
logger.warning("SynthSeg QC parsing failed: {}", e)
# SynthSeg volumes sanity check (only run once, not per hemisphere)
if not label:
try:
from thesis.workflows.qc.checks import parse_synthseg_volumes_csv
vol_result = parse_synthseg_volumes_csv(patient_out)
if vol_result.get("warnings"):
sys.stderr.write("\n[WARN] SynthSeg volume anomalies:\n")
for w in vol_result["warnings"]:
sys.stderr.write(f" - {w}\n")
sys.stderr.flush()
except Exception as e:
logger.warning("SynthSeg volumes check failed: {}", e)
# Statistics
if stats:
try:
patient_stats = collect_tractography_stats(patient_out, patient_id=context.patient_id)
table = format_stats_table([patient_stats], include_roi_counts=True)
# Print in Function node context (loguru unavailable in Nipype workers)
sys.stderr.write(f"\n{table}\n")
sys.stderr.flush()
except Exception as e:
logger.warning("Statistics collection failed: {}", e)
return generated