Source code for thesis.workflows.preprocess.nodes.distortion

"""Distortion correction node builders for preprocessing workflows."""

import os
import shutil
from pathlib import Path

from nipype import Node
from nipype.interfaces.fsl import BET, TOPUP, Eddy, ExtractROI, ImageMaths, Merge

from thesis.core.logging import get_logger
from thesis.core.nipype.interfaces import SynthStrip

# Unversioned eddy CUDA binary name (FSL ≥ 6.0.5).
# Versioned variants (e.g. eddy_cuda11.0) are discovered dynamically.
_EDDY_CUDA_PREFIX = "eddy_cuda"

# Binary names that Nipype's Eddy._run_interface knows how to select natively
# when use_cuda=True (checked in priority order).
_NIPYPE_EDDY_CUDA_NAMES = ("eddy_cuda10.2", "eddy_cuda")

logger = get_logger(__name__)

__all__ = [
    "prepare_extract_b0_node",
    "prepare_merge_b0_node",
    "prepare_topup_node",
    "prepare_brain_extraction_node",
    "prepare_eddy_node",
    "prepare_convert_to_short_node",
]


[docs] def prepare_extract_b0_node(input_dwi: str, output_path: str, name: str = "extract_b0") -> Node: """Create FSL ExtractROI node to extract first b0 volume. Args: input_dwi: Path to input 4D DWI NIfTI file output_path: Path for output b0 volume name: Nipype node name (must be unique within a workflow) Returns: Configured Nipype Node for ExtractROI Example: >>> node = prepare_extract_b0_node( ... input_dwi="dwi.nii.gz", ... output_path="b0.nii.gz" ... ) """ # Ensure output directory exists out_dir = Path(output_path).parent out_dir.mkdir(parents=True, exist_ok=True) extract = Node(ExtractROI(), name=name) if Path(input_dwi).exists(): extract.inputs.in_file = str(input_dwi) extract.inputs.roi_file = str(output_path) extract.inputs.t_min = 0 extract.inputs.t_size = 1 logger.debug(f"Configured ExtractROI node '{name}' to extract first volume from {input_dwi}") return extract
[docs] def prepare_merge_b0_node(output_path: str, name: str = "merge_b0") -> Node: """Create FSL Merge node to combine AP and PA b0 volumes. The caller is responsible for wiring ``in_files`` via a workflow connection (use ``nipype.interfaces.utility.Merge`` upstream to assemble the list). Never set ``in_files`` statically here: a conditional assignment that fires only when the files already exist produces a different hash on first run vs. subsequent runs, causing Nipype to re-execute the node even when no real inputs changed. Args: output_path: Path for output merged b0 file name: Nipype node name (must be unique within a workflow) Returns: Configured Nipype Node for Merge Example: >>> node = prepare_merge_b0_node(output_path="b0_merged.nii.gz") """ # Ensure output directory exists out_dir = Path(output_path).parent out_dir.mkdir(parents=True, exist_ok=True) merge = Node(Merge(), name=name) merge.inputs.dimension = "t" merge.inputs.merged_file = str(output_path) logger.debug(f"Configured Merge node '{name}' to combine AP/PA b0 volumes") return merge
[docs] def prepare_topup_node(acqparams_file: str, output_base: str, name: str = "topup") -> Node: """Create FSL TOPUP node for susceptibility distortion correction. Args: acqparams_file: Path to acquisition parameters file (--datain) output_base: Base name for output files (without extension) name: Nipype node name (must be unique within a workflow) Returns: Configured Nipype Node for TOPUP Example: >>> node = prepare_topup_node( ... acqparams_file="acqparams.txt", ... output_base="topup_results" ... ) Note: Uses standard FSL b02b0.cnf configuration file. The acquisition parameters file should contain one line per input volume with format: phase_encode_direction readout_time (e.g., "0 1 0 0.05" for PA). """ # Ensure output directory exists out_dir = Path(output_base).parent out_dir.mkdir(parents=True, exist_ok=True) topup = Node(TOPUP(), name=name) if Path(acqparams_file).exists(): topup.inputs.encoding_file = str(acqparams_file) topup.inputs.config = "b02b0.cnf" # Standard FSL config for b0 to b0 registration topup.inputs.out_base = str(output_base) logger.debug(f"Configured TOPUP node '{name}' with acqparams: {acqparams_file}") return topup
[docs] def prepare_brain_extraction_node( method: str = "synthstrip", frac: float = 0.3, name: str = "brain_extract", use_gpu: bool = False, gpu_device: int | None = None, robust: bool = False, padding: bool = False, radius: int | None = None, ) -> Node: """Create brain extraction node using SynthStrip or BET. Args: method: Brain extraction method ('synthstrip' or 'bet') frac: Fractional intensity threshold for BET (0-1, default 0.3) name: Nipype node name (must be unique within a workflow) use_gpu: Whether SynthStrip should request GPU execution. gpu_device: Optional CUDA device index for SynthStrip. robust: BET-only. Use robust brain-center estimation (FSL ``-R``). Ignored when method is 'synthstrip'. padding: BET-only. Apply slice padding to improve BET on small FOVs (FSL ``-Z``). Ignored when method is 'synthstrip'. radius: BET-only. Head radius in mm (FSL ``-r``); when ``None``, BET estimates it. Ignored when method is 'synthstrip'. Returns: Configured Nipype Node for brain extraction Raises: ValueError: If method is not 'synthstrip' or 'bet' Example: >>> node = prepare_brain_extraction_node(method="synthstrip") >>> node = prepare_brain_extraction_node(method="bet", frac=0.4) Note: SynthStrip (FreeSurfer) typically produces more accurate results than BET, especially for clinical data with pathology. The ``robust`` / ``padding`` / ``radius`` tuning options apply only to the BET path; SynthStrip ignores them. """ if method == "synthstrip": node = Node(SynthStrip(), name=name) node.inputs.use_gpu = use_gpu if gpu_device is not None: node.inputs.gpu_device = gpu_device logger.debug(f"Configured SynthStrip brain extraction node '{name}'") return node elif method == "bet": node = Node(BET(), name=name) node.inputs.mask = True node.inputs.frac = float(frac) # BET tuning options (config-driven). Nipype BET maps robust -> -R, # padding -> -Z, radius -> -r <mm>. ``robust`` and ``padding`` are # mutually exclusive in the FSL BET interface (xor), so only one may be # set; prefer robust (more impactful) and warn if both were requested. # Only set a trait when its option is on so unused ones stay undefined. if robust and padding: logger.warning( "BET 'robust' and 'padding' are mutually exclusive; using robust " "and ignoring padding for node '%s'.", name, ) if robust: node.inputs.robust = True elif padding: node.inputs.padding = True if radius is not None: node.inputs.radius = int(radius) logger.debug( f"Configured BET brain extraction node '{name}' with frac={frac}, " f"robust={robust}, padding={padding}, radius={radius}" ) return node else: raise ValueError(f"Unknown brain extraction method: {method}. Use 'synthstrip' or 'bet'.")
def _find_eddy_cuda() -> str | None: """Return an eddy CUDA binary name, or ``None`` if none is found. Searches ``$FSLDIR/bin`` for any file whose name starts with ``eddy_cuda`` (covering both the unversioned ``eddy_cuda`` and versioned variants such as ``eddy_cuda11.0`` or ``eddy_cuda10.2``). Also checks ``$PATH`` for the same prefix. The binary with the highest version suffix is preferred; the unversioned name wins when no versioned variant is found. """ import re fsl_bin = Path(os.environ.get("FSLDIR", "")) / "bin" candidates: list[str] = [] # Scan $FSLDIR/bin for eddy_cuda* files. if fsl_bin.is_dir(): for entry in fsl_bin.iterdir(): if entry.name.startswith(_EDDY_CUDA_PREFIX) and entry.is_file(): candidates.append(entry.name) # Also check PATH for the unversioned name and any versioned variant # that shutil.which can resolve (covers non-FSL installs). if shutil.which(_EDDY_CUDA_PREFIX) is not None: candidates.append(_EDDY_CUDA_PREFIX) if not candidates: return None # Sort so versioned names (eddy_cuda11.0 > eddy_cuda10.2) come first; # fall back to lexicographic order which puts higher version numbers last, # so reverse-sort puts the highest version first. def _version_key(name: str) -> tuple[int, int]: m = re.search(r"(\d+)\.(\d+)$", name) return (int(m.group(1)), int(m.group(2))) if m else (0, 0) candidates = sorted(set(candidates), key=_version_key, reverse=True) return candidates[0]
[docs] def prepare_eddy_node( acqparams_file: str, index_file: str, bvals: str, bvecs: str, output_base: str, use_cuda: bool = True, name: str = "eddy", ) -> Node: """Create FSL Eddy node for eddy current and motion correction. Args: acqparams_file: Path to acquisition parameters file (--acqp) index_file: Path to index file mapping volumes to acqparams rows (--index) bvals: Path to b-values file bvecs: Path to b-vectors file output_base: Base name for output files use_cuda: If True, attempt to use GPU-accelerated eddy_cuda name: Nipype node name (must be unique within a workflow) Returns: Configured Nipype Node for Eddy Example: >>> node = prepare_eddy_node( ... acqparams_file="acqparams.txt", ... index_file="index.txt", ... bvals="bvals", ... bvecs="bvecs", ... output_base="eddy_corrected", ... use_cuda=True ... ) Note: When use_cuda=True, attempts eddy_cuda10.2 first, then falls back to standard eddy if CUDA is not available. Enables outlier replacement (repol=True) and assumes single-shell or multi-shell data (is_shelled=True). """ # Ensure output directory exists out_dir = Path(output_base).parent out_dir.mkdir(parents=True, exist_ok=True) eddy = Node(Eddy(), name=name) # Required inputs if Path(acqparams_file).exists(): eddy.inputs.in_acqp = str(acqparams_file) if Path(index_file).exists(): eddy.inputs.in_index = str(index_file) if Path(bvals).exists(): eddy.inputs.in_bval = str(bvals) eddy.inputs.in_bvec = str(bvecs) eddy.inputs.out_base = str(output_base) # Correction options eddy.inputs.repol = True # Replace outliers eddy.inputs.is_shelled = True # Data organized in shells # Use CUDA binary only if the binary actually exists on this host. # Three-tier selection: # 1. use_cuda=True — Nipype's own mechanism; works when eddy_cuda10.2 or # eddy_cuda (unversioned) is in PATH. # 2. interface._cmd — Direct override for versioned binaries that Nipype # does not know about (e.g. eddy_cuda11.0). # 3. CPU fallback — Warn and run without GPU when nothing is found. if use_cuda: if any(shutil.which(n) for n in _NIPYPE_EDDY_CUDA_NAMES): eddy.inputs.use_cuda = True logger.debug(f"Configured Eddy node '{name}' with CUDA acceleration (Nipype native)") else: cuda_bin = _find_eddy_cuda() if cuda_bin: eddy.interface._cmd = cuda_bin logger.debug(f"Configured Eddy node '{name}' with CUDA acceleration ({cuda_bin})") else: logger.warning("eddy_cuda not found on PATH or $FSLDIR/bin — running Eddy on CPU") logger.debug(f"Configured Eddy node '{name}' with CPU (eddy_cuda absent)") else: logger.debug(f"Configured Eddy node '{name}' with CPU") return eddy
[docs] def prepare_convert_to_short_node(output_path: str, name: str = "convert_short") -> Node: """Create FSL ImageMaths node to convert to short datatype. Converts image data to 16-bit signed integer (short) format for compatibility with 3D Slicer and reduced file size. Args: output_path: Path for output converted file name: Nipype node name (must be unique within a workflow) Returns: Configured Nipype Node for ImageMaths Example: >>> node = prepare_convert_to_short_node( ... output_path="dwi_short.nii.gz" ... ) Note: This conversion is often necessary for visualization in 3D Slicer, which has limitations with certain data types. """ # Ensure output directory exists out_dir = Path(output_path).parent out_dir.mkdir(parents=True, exist_ok=True) convert = Node(ImageMaths(), name=name) convert.inputs.out_file = str(output_path) convert.inputs.out_data_type = "short" logger.debug(f"Configured ImageMaths node '{name}' to convert to short datatype") return convert