"""Composite full-pipeline meta-workflow (backend-parameterized).
Chains preprocess → registration → tract_synthseg → atlas_to_patient →
tract_similarity in one Nipype graph, wiring stages only through their
``inputnode`` / ``outputnode`` contracts (plus the pre-existing ``entry_gate``
/ ``exit_gate`` anchors on transform / tract_similarity). No node-name
introspection of nested sub-workflows.
``config.tractography.method`` selects the tractography backend via a
:class:`~thesis.workflows.full_pipeline._core.BackendDescriptor`; the unified
``tract_synthseg`` meta-workflow reads the same config value to build the
matching backend:
* ``probtrackx2`` / ``fsl`` → ``hcp`` backend (BedpostX enabled)
* ``mrtrix3`` / ``tckgen`` → ``mrtrix3`` backend (BedpostX disabled)
The backend-agnostic edges (preprocess→registration, registration→atlas, the
preprocess→atlas anchor, the FireANTs transform fan-out into atlas) live in
:mod:`thesis.workflows.full_pipeline._core`.
"""
from __future__ import annotations
import shutil
from importlib.util import find_spec
from pathlib import Path
from typing import Callable, List, cast
import nipype.pipeline.engine as pe
# Side-effect imports populate WORKFLOW_REGISTRY before .get() lookups below.
import thesis.workflows.atlas_to_patient.workflow # noqa: F401
import thesis.workflows.registration.workflow # noqa: F401
import thesis.workflows.tract_similarity.workflow # noqa: F401
import thesis.workflows.tract_synthseg.workflow # noqa: F401
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import requires, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import ExternalFile
from thesis.core.registry import WORKFLOW_REGISTRY
from thesis.workflows.full_pipeline._core import (
BackendDescriptor,
apply_preproc_override,
finalize_tract_similarity_merge,
select_backend,
wire_core_scaffold,
)
from thesis.workflows.mrtrix3.workflow import REQUIRED_BINARIES as _MRTRIX3_BINARIES
from thesis.workflows.preprocess.workflow import _load_preprocess_config
from thesis.workflows.registration.paths import resolve_registration_jobs
logger = get_logger(__name__)
_WorkflowFactory = Callable[[PipelineConfig, ProcessingContext], pe.Workflow]
__all__ = ["build_workflow", "verify_requirements"]
# Module-level builder references so tests can monkeypatch a single name per
# sub-workflow. ``build_workflow`` calls THESE names (never the registry
# directly) so a patched stub is honoured.
_build_preprocess = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("preprocess").factory)
_build_registration = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("registration").factory)
_build_atlas_to_patient = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("atlas_to_patient").factory)
_build_tract_similarity = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("tract_similarity").factory)
def _build_synthseg_meta(config: PipelineConfig, context: ProcessingContext) -> pe.Workflow:
"""Build the unified ``tract_synthseg`` meta-workflow.
The tractography backend is selected inside ``tract_synthseg`` from
``config.tractography.method`` (the same value used here to pick the
:class:`BackendDescriptor`). Resolved at call time (not import time) so
tests can monkeypatch this single function.
"""
factory = cast(_WorkflowFactory, WORKFLOW_REGISTRY.get("tract_synthseg").factory)
return factory(config, context)
[docs]
def verify_requirements(
config: PipelineConfig, context: ProcessingContext, **_: object
) -> List[str]:
"""Preflight checks for the unified full pipeline.
Delegates raw-input checks to the ``preprocess`` registered verifier (so it
gains preprocess's implicit ``@requires`` existence checks), then adds the
backend-driven tool / config availability checks. Downstream stages consume
runtime intermediates, so their file-existence checks are intentionally
excluded (they would always be false positives).
Args:
config: Validated pipeline configuration.
context: Processing context.
Returns:
Deduplicated list of human-readable error strings.
"""
seen: set[str] = set()
errors: List[str] = []
def _add(err: str) -> None:
if err not in seen:
errors.append(err)
seen.add(err)
desc = select_backend(getattr(config.tractography, "method", "probtrackx2"))
# Mirror build_workflow's structural override so the verifier sees the same
# effective config (e.g. the MRtrix3 backend never invokes BedpostX, so the
# preprocess verifier must not demand it on PATH).
effective = apply_preproc_override(config, {"run_bedpostx": desc.run_bedpostx})
preprocess_verifier = WORKFLOW_REGISTRY.get("preprocess").verifier
if preprocess_verifier is not None:
for error in preprocess_verifier(effective, context):
_add(error)
# NOTE: preprocess.run_coregistration is OPTIONAL. When on, build_workflow
# wires the dwi_to_t1 transform into the tractography stage so atlas ROIs are
# mapped onto the DWI grid. When off, roi_dwi_resampler regrids instead —
# valid only when T1 and DWI already share a world (build_workflow logs this).
# Resolve registration jobs so per-job method overrides are checked (the
# default single-registration config resolves to one implicit job mirroring
# the top-level method). Dedupe is handled by _add.
for reg_job in resolve_registration_jobs(config):
if reg_job.method == "ants":
if shutil.which("antsRegistration") is None:
_add("ANTs 'antsRegistration' not found on PATH.")
elif reg_job.method == "fireants":
if find_spec("torch") is None:
_add("Python package 'torch' is required for registration.method='fireants'.")
if find_spec("fireants") is None:
_add("Python package 'fireants' is required for registration.method='fireants'.")
# registration.fixed_image is validated declaratively via
# @requires(template=ExternalFile(...)) — its implicit existence check runs in
# the composite verifier. It is NOT a PatientFile: the template is an
# out-of-tree (often absolute) cohort asset, and PatientFile anchors to
# input_dir and rejects anything escaping it.
backend_binaries = desc.required_binaries or (
tuple(_MRTRIX3_BINARIES) if desc.name == "mrtrix3" else tuple()
)
for binary in backend_binaries:
if shutil.which(binary) is None:
_add(f"Required binary '{binary}' not found on PATH.")
if shutil.which("antsApplyTransforms") is None:
_add("ANTs 'antsApplyTransforms' not found on PATH.")
if not config.transforms.jobs:
_add(
"No transform jobs defined. Add at least one entry under "
"'transforms.jobs' in your configuration."
)
return errors
[docs]
@workflow(
name="full_pipeline",
description=(
"Full-pipeline meta-workflow: preprocess -> registration -> "
"tract_synthseg -> atlas_to_patient -> "
"tract_similarity, in one Nipype graph wired through inputnode/"
"outputnode contracts. Backend selected by tractography.method "
"(probtrackx2 default; mrtrix3 disables BedpostX)."
),
protocol="full_pipeline",
)
@requires(
template=ExternalFile(config_path="registration.fixed_image"),
)
@verify(verify_requirements)
def build_workflow(
*, template: Path, config: PipelineConfig, context: ProcessingContext
) -> pe.Workflow:
"""Build the unified full-pipeline meta-workflow for one patient.
The backend is selected from ``config.tractography.method``; BedpostX is
enabled/disabled structurally to match.
Args:
template: Registration template (``registration.fixed_image``), declared
via :class:`~thesis.core.path_declarations.ExternalFile` so an
out-of-tree absolute template path is allowed and existence-checked.
config: Validated pipeline configuration.
context: Processing context carrying the patient ID, paths, and config.
Returns:
Nipype Workflow ready for ``NipypeExecutor`` or ``.run()``.
"""
del template # validated by @requires
pid = context.patient_id
desc: BackendDescriptor = select_backend(getattr(config.tractography, "method", "probtrackx2"))
config = apply_preproc_override(config, {"run_bedpostx": desc.run_bedpostx})
meta = pe.Workflow(name=f"full_pipeline_{pid}")
if context.working_dir:
meta.base_dir = str(context.working_dir)
# -- Build sub-workflows -------------------------------------------------
preprocess_wf = _build_preprocess(config, context)
registration_wf = _build_registration(config, context)
synthseg_meta_wf = _build_synthseg_meta(config, context)
atlas_to_patient_wf = _build_atlas_to_patient(config, context)
tract_similarity_wf = _build_tract_similarity(config, context)
# -- Backend-agnostic edges (preprocess→registration, reg→atlas, etc.) --
scaffold = wire_core_scaffold(
meta,
config,
preprocess_wf=preprocess_wf,
registration_wf=registration_wf,
atlas_to_patient_wf=atlas_to_patient_wf,
tract_similarity_wf=tract_similarity_wf,
)
# -- Stage 1 → Stage 3: contract edges (backend-driven) ------------------
for src, dst in desc.tractography_edges:
meta.connect(preprocess_wf, f"outputnode.{src}", synthseg_meta_wf, f"inputnode.{dst}")
# The intra-subject DWI->T1 transform is OPTIONAL: preprocess.outputnode only
# exposes t1_to_dwi_transform when run_coregistration is on, so wire it only
# then. When off, roi_dwi_resampler regrids the ROIs onto the DWI grid (valid
# only when T1 and DWI already share a world).
if _load_preprocess_config(config).run_coregistration:
meta.connect(
preprocess_wf,
"outputnode.t1_to_dwi_transform",
synthseg_meta_wf,
"inputnode.t1_to_dwi_transform",
)
else:
logger.info(
"full_pipeline ({}): preprocess.run_coregistration is off — atlas ROIs "
"will be regridded onto the DWI grid (no DWI->T1 transform); ensure T1 "
"and DWI share a world.",
pid,
)
# NOTE: no fireants warp_field injection into the tractography sub-workflow.
# atlas_transform resolves its per-source template->patient warp from
# config.transforms at build time; overriding a MapNode per-source iterfield
# with a single runtime warp is not supported (pre-existing limitation). The
# fireants reverse/forward transforms still feed atlas_to_patient.entry_gate
# via wire_core_scaffold (that gate is a plain IdentityInterface).
#
# Because atlas_transform reads those transforms by *path* (not data-wired),
# it must not run until registration has written them. Order it after
# registration via the tract_synthseg entry_gate ordering anchor. The meta
# always exposes entry_gate; only the ProbTrackX2 backend forwards it to its
# ROI warp (the MRtrix3 backend's gate is inert, so this is a harmless no-op
# there).
meta.connect(
registration_wf,
"outputnode.transform",
synthseg_meta_wf,
"entry_gate._ordering_signal",
)
# Runtime T1 brain into atlas_to_patient reference_image, so transform jobs
# that fall back to the global reference get the live path.
meta.connect(
preprocess_wf,
"outputnode.t1_brain",
atlas_to_patient_wf,
"entry_gate.reference_image",
)
# -- Stage 3 + 4 → Stage 5 barrier: tract_similarity ---------------------
# The tractography outputnode.fdt_paths is a single field (a JoinNode-
# collected list under both-separately), so there is exactly one fdt source.
finalize_tract_similarity_merge(scaffold, [(synthseg_meta_wf, "outputnode.fdt_paths")])
logger.info(
"Built full_pipeline ({}) for {} | reg={} | bedpostx={}",
desc.name,
pid,
scaffold.reg_method,
desc.run_bedpostx,
)
return meta