Source code for thesis.workflows.hcp.nodes.tractography

"""Tractography node builders."""

from pathlib import Path
from typing import Dict, Optional

from nipype import Node
from nipype.interfaces.base import isdefined
from nipype.interfaces.utility import Function

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.nipype.interfaces.fsl import get_probtrackx2_interface
from thesis.core.resources import cpu, gpu
from thesis.core.utils import resolve_path

from ..common import format_patient_path
from ..config.params import TractographyParams
from ..config.paths import HCPPaths
from ..operations import inverse_warp_streamlines_task, write_probtrackx2_params_task


def _scope_outdir_for_hemisphere_task(base_dir: str, hemisphere: str) -> str:
    """Return ``<base_dir>/<hemisphere>`` when hemisphere is left/right, else base_dir.

    Defined at module scope (not as a closure) so Nipype's Function interface
    can pickle it when running under MultiProc.
    """
    from pathlib import Path

    out = Path(base_dir)
    if hemisphere in ("left", "right"):
        out = out / hemisphere
    out.mkdir(parents=True, exist_ok=True)
    return str(out)


[docs] def prepare_outdir_router(base_out_dir: Path, name: str = "outdir_router") -> Node: """Return a Function node that scopes a tractography out_dir per hemisphere. Backend-neutral: used by both the HCP (ProbTrackX2) and MRtrix3 workflows. Under ``--hemisphere both-separately`` the workflow sets ``inputs.iterables = ("hemisphere", ["left", "right"])``; connecting this node between the hemisphere iterable and each output-writing node's ``out_dir``/``output_dir`` input ensures every iteration writes into ``<base>/<hemisphere>/`` instead of colliding on the shared parent directory. For ``hemisphere == "both"`` the base directory is returned unchanged (flat layout). Args: base_out_dir: Absolute path to the un-scoped tractography directory. name: Nipype node name. Returns: Configured Function node with input ``hemisphere`` and output ``out_dir``. """ node = Node( Function( input_names=["base_dir", "hemisphere"], output_names=["out_dir"], function=_scope_outdir_for_hemisphere_task, ), name=name, ) node.inputs.base_dir = str(base_out_dir) node.inputs.hemisphere = "both" return node
[docs] def prepare_probtrackx2_outdir_router(base_out_dir: Path, name: str = "probtrackx2_outdir") -> Node: """Backward-compatible alias of :func:`prepare_outdir_router` for the HCP workflow. Kept so existing HCP wiring and tests that reference the ``probtrackx2_outdir`` node name continue to work unchanged. """ return prepare_outdir_router(base_out_dir, name=name)
[docs] def prepare_probtrackx2_node( tract_params: TractographyParams, hcp_paths: HCPPaths, out_dir: str, use_gpu: bool = False, gpu_runtime_env: Optional[Dict[str, str]] = None, gpu_slot_cost: int = 1, name: str = "probtrackx2", ) -> Node: """ Create and configure FSL ProbTrackX2 node. When ``use_gpu=True`` the function attempts to use the GPU-accelerated binary (``probtrackx2_gpu11.0`` or ``probtrackx2_gpu``). If neither binary is found on ``$FSLDIR/bin`` or ``$PATH``, it automatically falls back to the standard CPU ``probtrackx2`` and logs a warning. Args: tract_params: Dictionary with tractography parameters. hcp_paths: Dictionary with HCP paths. out_dir: Output directory. use_gpu: Prefer GPU-accelerated binary when available. gpu_runtime_env: When ``use_gpu`` is true, environment variables merged into THIS node's command ``environ`` only. Nipype passes that env to the probtrackx2 subprocess (the Neurodesk ``probtrackx2_gpu`` wrapper), whose own ``singularity exec --cleanenv`` then forwards the ``SINGULARITYENV_``/``APPTAINERENV_``-prefixed keys into the FSL container. Scoped to this node alone — the ANTs/SynthSeg nodes never see it. Ignored on CPU runs and when ``None``. gpu_slot_cost: How many of the scheduler's ``n_gpu_procs`` GPU slots this GPU node reserves (MultiProc charges ``min(node.n_procs, n_gpu_procs)`` slots per GPU node). Pass ``n_gpu_procs`` so probtrackx2 claims ALL slots and runs exclusively on the card: it is fast but balloons its buffer to fill GPU memory, so it must not co-locate. Light GPU nodes (e.g. SynthSeg) keep the default cost of 1 and pack up to ``n_gpu_procs`` concurrently around it. Only applied on GPU runs. name: Nipype node name (must be unique within a workflow). Returns: Configured Nipype Node for ProbTrackX2. """ interface_cls = get_probtrackx2_interface(use_gpu=use_gpu) probtrack = Node(interface_cls(), name=name) probtrack.inputs.samples_base_name = str(hcp_paths["samples_base_name"]) thsamples = hcp_paths["thsamples"] phsamples = hcp_paths["phsamples"] fsamples = hcp_paths["fsamples"] # Only set samples statically when all files exist on disk and the lists # have matching lengths. In a meta-workflow (e.g. full_pipeline), BedpostX # runs at runtime and the meta-workflow connects preprocess.bedpostx output # ports directly to this node's sample inputs — a live connection bypasses # the File(exists=True) trait validation. Stale directories may contain # broken symlinks or mismatched counts (e.g. from a double .bedpostX # nesting), so we validate existence before assigning. all_exist = ( thsamples and phsamples and fsamples and len(thsamples) == len(phsamples) == len(fsamples) and all(Path(p).exists() for p in [*thsamples, *phsamples, *fsamples]) ) if all_exist: probtrack.inputs.thsamples = [str(p) for p in thsamples] probtrack.inputs.phsamples = [str(p) for p in phsamples] probtrack.inputs.fsamples = [str(p) for p in fsamples] # Only set mask statically when the file already exists on disk. # In a meta-workflow (e.g. full_pipeline), the mask is created by an # upstream preprocess stage at runtime, and the meta-workflow connects # preprocess.eddy_brain_extraction.<port> directly to this node's # ``mask`` input — a live connection bypasses the File(exists=True) # trait validation. mask_path = Path(hcp_paths["mask_path"]) if mask_path.exists(): probtrack.inputs.mask = str(mask_path) # Ensure out_dir exists as Nipype ProbTrackX2 validates it on assignment out_path = Path(out_dir) out_path.mkdir(parents=True, exist_ok=True) probtrack.inputs.out_dir = str(out_path) probtrack.inputs.n_samples = int(tract_params["n_samples"]) probtrack.inputs.n_steps = int(tract_params["n_steps"]) probtrack.inputs.step_length = float(tract_params["step_length"]) probtrack.inputs.c_thresh = float(tract_params["curvature_threshold"]) probtrack.inputs.force_dir = bool(tract_params["force_dir"]) probtrack.inputs.opd = bool(tract_params["opd"]) probtrack.inputs.loop_check = bool(tract_params["loop_check"]) probtrack.inputs.dist_thresh = float(tract_params["dist_thresh"]) probtrack.inputs.fibst = int(tract_params["fibst"]) probtrack.inputs.rand_fib = int(tract_params["rand_fib"]) probtrack.inputs.mod_euler = bool(tract_params["mod_euler"]) if use_gpu: gpu(probtrack, mem_gb=float(tract_params["mem_gb_gpu"]), est_minutes=90) # Reserve all GPU slots so probtrackx2 runs exclusively on the card. # MultiProc charges min(node.n_procs, n_gpu_procs) GPU slots per GPU node; # setting n_procs = n_gpu_procs drains the pool so no other GPU node (or a # second probtrackx2) co-locates while it runs. probtrackx2 is fast but # sizes its streamline buffer to total GPU memory, so co-location OOMs; # light GPU nodes (SynthSeg) keep n_procs=1 and pack around it. probtrack.n_procs = max(1, int(gpu_slot_cost)) else: cpu(probtrack, mem_gb=float(tract_params["mem_gb_cpu"]), est_minutes=240) # Node-scoped runtime env for the GPU binary only. Nipype merges this into a # copy of os.environ and passes it as the subprocess env, so it reaches the # Neurodesk ``probtrackx2_gpu`` wrapper (whose ``singularity exec --cleanenv`` # forwards SINGULARITYENV_/APPTAINERENV_ keys into the FSL container) and # nothing else. Merge (not clobber) so the interface's own environ defaults # (e.g. FSLOUTPUTTYPE) are preserved. if use_gpu and gpu_runtime_env: existing = dict(probtrack.inputs.environ) if isdefined(probtrack.inputs.environ) else {} existing.update({k: str(val) for k, val in gpu_runtime_env.items()}) probtrack.inputs.environ = existing return probtrack
[docs] def prepare_probtrackx2_params_writer( out_dir: Path, n_samples: int, name: str = "probtrackx2_params_writer", ) -> Node: """Write ``tractography_params.json`` (backend, n_samples) for stats. The stats collector needs ``n_samples`` to compute the per-sample Normalized Connection Strength. We persist a tiny JSON so the collector doesn't have to re-read the YAML config. Args: out_dir: Per-hemisphere tractography output directory. n_samples: ``--nsamples`` passed to ``probtrackx2``. name: Nipype node name. Returns: Configured Function node with output ``params_file``. """ out_dir.mkdir(parents=True, exist_ok=True) node = Node( Function( input_names=["output_dir", "n_samples"], output_names=["params_file"], function=write_probtrackx2_params_task, ), name=name, ) node.inputs.output_dir = str(out_dir) node.inputs.n_samples = int(n_samples) return node
[docs] def prepare_streamline_warper( config: PipelineConfig, context: ProcessingContext, out_dir: Path, name: str = "streamline_warper", ) -> Optional[Node]: """ Create and configure a streamline warping node if transform config exists. Reads ``config.transforms.patient_to_template`` and ``config.transforms.template_reference_image``. Both must be non-empty strings for the node to be created. Args: config: PipelineConfig context: ProcessingContext with patient_id and data_dir attributes out_dir: Output directory for warped files name: Nipype node name (must be unique within a workflow). Returns: Configured Nipype Node wrapping :func:`inverse_warp_streamlines_task`, or ``None`` if the required transform paths are not configured. """ transforms_cfg = getattr(config, "transforms", None) warp_field_raw = getattr(transforms_cfg, "patient_to_template", "") if transforms_cfg else "" ref_image_raw = ( getattr(transforms_cfg, "template_reference_image", "") if transforms_cfg else "" ) if not (warp_field_raw and ref_image_raw): return None patient_id = getattr(context, "patient_id", "") warp_field_raw = format_patient_path(warp_field_raw, patient_id) # Resolve warp field relative to context.data_dir as fallback warp_field = resolve_path(context.data_dir, warp_field_raw) node = Node( Function( input_names=[ "fdt_paths", "warp_field", "reference_image", "output_dir", "_ordering_signal", ], output_names=["warped_files"], function=inverse_warp_streamlines_task, ), name=name, ) node.inputs.warp_field = str(warp_field) node.inputs.reference_image = ref_image_raw # ``output_dir`` and ``fdt_paths`` are intentionally left unset: the HCP # workflow connects them from ``probtrackx2_outdir_router.out_dir`` # (output_dir via ``_warped_dir_from_outdir``) so the warper writes inside # whichever per-hemisphere subdir ProbTrackX2 produced. ``_ordering_signal`` # is connected from ``probtrackx2.fdt_paths`` so the warper runs only after # ProbTrackX2 has populated that directory. Static assignments here would be # overwritten by those edges. return node