Source code for thesis.workflows.qc.workflow
"""QC visualisation and statistics workflow.
Produces ROI overlay PNGs, track density figures, and tractography
statistics for a patient whose HCP workflow has already completed.
Usage::
thesis run -w qc -p 114823 -c default
"""
from __future__ import annotations
from pathlib import Path
from typing import List
import nipype.pipeline.engine as pe
from nipype.interfaces.utility import Function
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import OutputDir
logger = get_logger(__name__)
_QC_INPUTS = ["patient_output", "patient_id", "config_dict", "track_density", "stats"]
def _qc_task(
patient_output: str,
patient_id: str,
config_dict: dict,
track_density: bool,
stats: bool,
) -> list:
"""Nipype Function node body: run QC for one patient.
Imports are local because the thesis logger is unavailable inside
Nipype Function workers.
"""
import os
os.environ.setdefault("MPLBACKEND", "Agg")
from thesis.core.config import PipelineConfig
from thesis.core.context import create_context
from thesis.workflows.qc.operations import run_qc_for_patient
cfg = PipelineConfig.model_validate(config_dict)
ctx = create_context(patient_id=patient_id, config=cfg)
ctx.output_dir = patient_output # type: ignore[assignment]
return run_qc_for_patient(config=cfg, context=ctx, track_density=track_density, stats=stats)
[docs]
def verify_requirements(config: PipelineConfig, context: ProcessingContext) -> List[str]:
"""Verify the patient has tractography output from a prior backend run."""
if context.output_dir is None:
return ["output_dir is not set in the processing context"]
tractography_relpath = getattr(
getattr(config, "atlas", None), "tractography_relpath", "tractography/probtrackx2"
)
tractography_dir = Path(context.output_dir) / tractography_relpath
if not tractography_dir.is_dir():
return [
f"Tractography output not found: {tractography_dir}. "
"Run the matching tractography workflow first."
]
return []
[docs]
@workflow(
name="qc",
description=(
"QC visualisation workflow: ROI overlays, track density figures, "
"and tractography statistics. Requires a prior HCP workflow run."
),
)
@produces(qc_dir=OutputDir("qc"))
@verify(verify_requirements)
def build_workflow(
*, qc_dir: Path, config: PipelineConfig, context: ProcessingContext
) -> pe.Workflow:
"""Build a single-Function-node QC workflow.
The Function node deserialises the config, builds a fresh context, and
delegates to ``run_qc_for_patient`` which has its own path-discovery
fallbacks for T1 and tractography outputs.
"""
del qc_dir # resolved purely for its mkdir side-effect; run_qc_for_patient
# builds its own per-figure subdirectories under <output_dir>/qc.
wf = pe.Workflow(name=f"qc_{context.patient_id}")
if context.working_dir:
wf.base_dir = str(context.working_dir)
qc_node = pe.Node(
Function(input_names=_QC_INPUTS, output_names=["generated_files"], function=_qc_task),
name="qc_generate",
)
qc_node.inputs.patient_output = str(context.output_dir)
qc_node.inputs.patient_id = context.patient_id
qc_node.inputs.config_dict = config.model_dump()
qc_node.inputs.track_density = True
qc_node.inputs.stats = True
wf.add_nodes([qc_node])
logger.info("Built QC workflow for {} ({} output dir)", context.patient_id, context.output_dir)
return wf