Source code for thesis.workflows.full_pipeline.workflow

"""Composite full-pipeline meta-workflow (backend-parameterized).

Chains preprocess → registration → tract_synthseg → atlas_to_patient →
tract_similarity in one Nipype graph, wiring stages only through their
``inputnode`` / ``outputnode`` contracts (plus the pre-existing ``entry_gate``
/ ``exit_gate`` anchors on transform / tract_similarity). No node-name
introspection of nested sub-workflows.

``config.tractography.method`` selects the tractography backend via a
:class:`~thesis.workflows.full_pipeline._core.BackendDescriptor`; the unified
``tract_synthseg`` meta-workflow reads the same config value to build the
matching backend:

* ``probtrackx2`` / ``fsl`` → ``hcp`` backend (BedpostX enabled)
* ``mrtrix3`` / ``tckgen`` → ``mrtrix3`` backend (BedpostX disabled)

The backend-agnostic edges (preprocess→registration, registration→atlas, the
preprocess→atlas anchor, the FireANTs transform fan-out into atlas) live in
:mod:`thesis.workflows.full_pipeline._core`.
"""

from __future__ import annotations

import shutil
from importlib.util import find_spec
from pathlib import Path
from typing import Callable, List, cast

import nipype.pipeline.engine as pe

# Side-effect imports populate WORKFLOW_REGISTRY before .get() lookups below.
import thesis.workflows.atlas_to_patient.workflow  # noqa: F401
import thesis.workflows.registration.workflow  # noqa: F401
import thesis.workflows.tract_similarity.workflow  # noqa: F401
import thesis.workflows.tract_synthseg.workflow  # noqa: F401
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import requires, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import ExternalFile
from thesis.core.registry import WORKFLOW_REGISTRY
from thesis.workflows.full_pipeline._core import (
    BackendDescriptor,
    apply_preproc_override,
    finalize_tract_similarity_merge,
    select_backend,
    wire_core_scaffold,
)
from thesis.workflows.mrtrix3.workflow import REQUIRED_BINARIES as _MRTRIX3_BINARIES
from thesis.workflows.preprocess.workflow import _load_preprocess_config
from thesis.workflows.registration.paths import resolve_registration_jobs

logger = get_logger(__name__)

_WorkflowFactory = Callable[[PipelineConfig, ProcessingContext], pe.Workflow]

__all__ = ["build_workflow", "verify_requirements"]


# Module-level builder references so tests can monkeypatch a single name per
# sub-workflow.  ``build_workflow`` calls THESE names (never the registry
# directly) so a patched stub is honoured.
_build_preprocess = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("preprocess").factory)
_build_registration = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("registration").factory)
_build_atlas_to_patient = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("atlas_to_patient").factory)
_build_tract_similarity = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("tract_similarity").factory)


def _build_synthseg_meta(config: PipelineConfig, context: ProcessingContext) -> pe.Workflow:
    """Build the unified ``tract_synthseg`` meta-workflow.

    The tractography backend is selected inside ``tract_synthseg`` from
    ``config.tractography.method`` (the same value used here to pick the
    :class:`BackendDescriptor`). Resolved at call time (not import time) so
    tests can monkeypatch this single function.
    """
    factory = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("tract_synthseg").factory)
    return factory(config, context)


[docs] def verify_requirements( config: PipelineConfig, context: ProcessingContext, **_: object ) -> List[str]: """Preflight checks for the unified full pipeline. Delegates raw-input checks to the ``preprocess`` registered verifier (so it gains preprocess's implicit ``@requires`` existence checks), then adds the backend-driven tool / config availability checks. Downstream stages consume runtime intermediates, so their file-existence checks are intentionally excluded (they would always be false positives). Args: config: Validated pipeline configuration. context: Processing context. Returns: Deduplicated list of human-readable error strings. """ seen: set[str] = set() errors: List[str] = [] def _add(err: str) -> None: if err not in seen: errors.append(err) seen.add(err) desc = select_backend(getattr(config.tractography, "method", "probtrackx2")) # Mirror build_workflow's structural override so the verifier sees the same # effective config (e.g. the MRtrix3 backend never invokes BedpostX, so the # preprocess verifier must not demand it on PATH). effective = apply_preproc_override(config, {"run_bedpostx": desc.run_bedpostx}) preprocess_verifier = WORKFLOW_REGISTRY.get("preprocess").verifier if preprocess_verifier is not None: for error in preprocess_verifier(effective, context): _add(error) # NOTE: preprocess.run_coregistration is OPTIONAL. When on, build_workflow # wires the dwi_to_t1 transform into the tractography stage so atlas ROIs are # mapped onto the DWI grid. When off, roi_dwi_resampler regrids instead — # valid only when T1 and DWI already share a world (build_workflow logs this). # Resolve registration jobs so per-job method overrides are checked (the # default single-registration config resolves to one implicit job mirroring # the top-level method). Dedupe is handled by _add. for reg_job in resolve_registration_jobs(config): if reg_job.method == "ants": if shutil.which("antsRegistration") is None: _add("ANTs 'antsRegistration' not found on PATH.") elif reg_job.method == "fireants": if find_spec("torch") is None: _add("Python package 'torch' is required for registration.method='fireants'.") if find_spec("fireants") is None: _add("Python package 'fireants' is required for registration.method='fireants'.") # registration.fixed_image is validated declaratively via # @requires(template=ExternalFile(...)) — its implicit existence check runs in # the composite verifier. It is NOT a PatientFile: the template is an # out-of-tree (often absolute) cohort asset, and PatientFile anchors to # input_dir and rejects anything escaping it. backend_binaries = desc.required_binaries or ( tuple(_MRTRIX3_BINARIES) if desc.name == "mrtrix3" else tuple() ) for binary in backend_binaries: if shutil.which(binary) is None: _add(f"Required binary '{binary}' not found on PATH.") if shutil.which("antsApplyTransforms") is None: _add("ANTs 'antsApplyTransforms' not found on PATH.") if not config.transforms.jobs: _add( "No transform jobs defined. Add at least one entry under " "'transforms.jobs' in your configuration." ) return errors
[docs] @workflow( name="full_pipeline", description=( "Full-pipeline meta-workflow: preprocess -> registration -> " "tract_synthseg -> atlas_to_patient -> " "tract_similarity, in one Nipype graph wired through inputnode/" "outputnode contracts. Backend selected by tractography.method " "(probtrackx2 default; mrtrix3 disables BedpostX)." ), protocol="full_pipeline", ) @requires( template=ExternalFile(config_path="registration.fixed_image"), ) @verify(verify_requirements) def build_workflow( *, template: Path, config: PipelineConfig, context: ProcessingContext ) -> pe.Workflow: """Build the unified full-pipeline meta-workflow for one patient. The backend is selected from ``config.tractography.method``; BedpostX is enabled/disabled structurally to match. Args: template: Registration template (``registration.fixed_image``), declared via :class:`~thesis.core.path_declarations.ExternalFile` so an out-of-tree absolute template path is allowed and existence-checked. config: Validated pipeline configuration. context: Processing context carrying the patient ID, paths, and config. Returns: Nipype Workflow ready for ``NipypeExecutor`` or ``.run()``. """ del template # validated by @requires pid = context.patient_id desc: BackendDescriptor = select_backend(getattr(config.tractography, "method", "probtrackx2")) config = apply_preproc_override(config, {"run_bedpostx": desc.run_bedpostx}) meta = pe.Workflow(name=f"full_pipeline_{pid}") if context.working_dir: meta.base_dir = str(context.working_dir) # -- Build sub-workflows ------------------------------------------------- preprocess_wf = _build_preprocess(config, context) registration_wf = _build_registration(config, context) synthseg_meta_wf = _build_synthseg_meta(config, context) atlas_to_patient_wf = _build_atlas_to_patient(config, context) tract_similarity_wf = _build_tract_similarity(config, context) # -- Backend-agnostic edges (preprocess→registration, reg→atlas, etc.) -- scaffold = wire_core_scaffold( meta, config, preprocess_wf=preprocess_wf, registration_wf=registration_wf, atlas_to_patient_wf=atlas_to_patient_wf, tract_similarity_wf=tract_similarity_wf, ) # -- Stage 1 → Stage 3: contract edges (backend-driven) ------------------ for src, dst in desc.tractography_edges: meta.connect(preprocess_wf, f"outputnode.{src}", synthseg_meta_wf, f"inputnode.{dst}") # The intra-subject DWI->T1 transform is OPTIONAL: preprocess.outputnode only # exposes t1_to_dwi_transform when run_coregistration is on, so wire it only # then. When off, roi_dwi_resampler regrids the ROIs onto the DWI grid (valid # only when T1 and DWI already share a world). if _load_preprocess_config(config).run_coregistration: meta.connect( preprocess_wf, "outputnode.t1_to_dwi_transform", synthseg_meta_wf, "inputnode.t1_to_dwi_transform", ) else: logger.info( "full_pipeline ({}): preprocess.run_coregistration is off — atlas ROIs " "will be regridded onto the DWI grid (no DWI->T1 transform); ensure T1 " "and DWI share a world.", pid, ) # NOTE: no fireants warp_field injection into the tractography sub-workflow. # atlas_transform resolves its per-source template->patient warp from # config.transforms at build time; overriding a MapNode per-source iterfield # with a single runtime warp is not supported (pre-existing limitation). The # fireants reverse/forward transforms still feed atlas_to_patient.entry_gate # via wire_core_scaffold (that gate is a plain IdentityInterface). # # Because atlas_transform reads those transforms by *path* (not data-wired), # it must not run until registration has written them. Order it after # registration via the tract_synthseg entry_gate ordering anchor. The meta # always exposes entry_gate; only the ProbTrackX2 backend forwards it to its # ROI warp (the MRtrix3 backend's gate is inert, so this is a harmless no-op # there). meta.connect( registration_wf, "outputnode.transform", synthseg_meta_wf, "entry_gate._ordering_signal", ) # Runtime T1 brain into atlas_to_patient reference_image, so transform jobs # that fall back to the global reference get the live path. meta.connect( preprocess_wf, "outputnode.t1_brain", atlas_to_patient_wf, "entry_gate.reference_image", ) # -- Stage 3 + 4 → Stage 5 barrier: tract_similarity --------------------- # The tractography outputnode.fdt_paths is a single field (a JoinNode- # collected list under both-separately), so there is exactly one fdt source. finalize_tract_similarity_merge(scaffold, [(synthseg_meta_wf, "outputnode.fdt_paths")]) logger.info( "Built full_pipeline ({}) for {} | reg={} | bedpostx={}", desc.name, pid, scaffold.reg_method, desc.run_bedpostx, ) return meta