Source code for thesis.workflows.qc.workflow

"""QC visualisation and statistics workflow.

Produces ROI overlay PNGs, track density figures, and tractography
statistics for a patient whose HCP workflow has already completed.

Usage::

    thesis run -w qc -p 114823 -c default
"""

from __future__ import annotations

from pathlib import Path
from typing import List

import nipype.pipeline.engine as pe
from nipype.interfaces.utility import Function

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import OutputDir

logger = get_logger(__name__)

_QC_INPUTS = ["patient_output", "patient_id", "config_dict", "track_density", "stats"]


def _qc_task(
    patient_output: str,
    patient_id: str,
    config_dict: dict,
    track_density: bool,
    stats: bool,
) -> list:
    """Nipype Function node body: run QC for one patient.

    Imports are local because the thesis logger is unavailable inside
    Nipype Function workers.
    """
    import os

    os.environ.setdefault("MPLBACKEND", "Agg")

    from thesis.core.config import PipelineConfig
    from thesis.core.context import create_context
    from thesis.workflows.qc.operations import run_qc_for_patient

    cfg = PipelineConfig.model_validate(config_dict)
    ctx = create_context(patient_id=patient_id, config=cfg)
    ctx.output_dir = patient_output  # type: ignore[assignment]
    return run_qc_for_patient(config=cfg, context=ctx, track_density=track_density, stats=stats)


[docs] def verify_requirements(config: PipelineConfig, context: ProcessingContext) -> List[str]: """Verify the patient has tractography output from a prior backend run.""" if context.output_dir is None: return ["output_dir is not set in the processing context"] tractography_relpath = getattr( getattr(config, "atlas", None), "tractography_relpath", "tractography/probtrackx2" ) tractography_dir = Path(context.output_dir) / tractography_relpath if not tractography_dir.is_dir(): return [ f"Tractography output not found: {tractography_dir}. " "Run the matching tractography workflow first." ] return []
[docs] @workflow( name="qc", description=( "QC visualisation workflow: ROI overlays, track density figures, " "and tractography statistics. Requires a prior HCP workflow run." ), ) @produces(qc_dir=OutputDir("qc")) @verify(verify_requirements) def build_workflow( *, qc_dir: Path, config: PipelineConfig, context: ProcessingContext ) -> pe.Workflow: """Build a single-Function-node QC workflow. The Function node deserialises the config, builds a fresh context, and delegates to ``run_qc_for_patient`` which has its own path-discovery fallbacks for T1 and tractography outputs. """ del qc_dir # resolved purely for its mkdir side-effect; run_qc_for_patient # builds its own per-figure subdirectories under <output_dir>/qc. wf = pe.Workflow(name=f"qc_{context.patient_id}") if context.working_dir: wf.base_dir = str(context.working_dir) qc_node = pe.Node( Function(input_names=_QC_INPUTS, output_names=["generated_files"], function=_qc_task), name="qc_generate", ) qc_node.inputs.patient_output = str(context.output_dir) qc_node.inputs.patient_id = context.patient_id qc_node.inputs.config_dict = config.model_dump() qc_node.inputs.track_density = True qc_node.inputs.stats = True wf.add_nodes([qc_node]) logger.info("Built QC workflow for {} ({} output dir)", context.patient_id, context.output_dir) return wf