"""Distortion correction node builders for preprocessing workflows."""
import os
import shutil
from pathlib import Path
from nipype import Node
from nipype.interfaces.fsl import BET, TOPUP, Eddy, ExtractROI, ImageMaths, Merge
from thesis.core.logging import get_logger
from thesis.core.nipype.interfaces import SynthStrip
# Unversioned eddy CUDA binary name (FSL ≥ 6.0.5).
# Versioned variants (e.g. eddy_cuda11.0) are discovered dynamically.
_EDDY_CUDA_PREFIX = "eddy_cuda"
# Binary names that Nipype's Eddy._run_interface knows how to select natively
# when use_cuda=True (checked in priority order).
_NIPYPE_EDDY_CUDA_NAMES = ("eddy_cuda10.2", "eddy_cuda")
logger = get_logger(__name__)
__all__ = [
"prepare_extract_b0_node",
"prepare_merge_b0_node",
"prepare_topup_node",
"prepare_brain_extraction_node",
"prepare_eddy_node",
"prepare_convert_to_short_node",
]
[docs]
def prepare_merge_b0_node(output_path: str, name: str = "merge_b0") -> Node:
"""Create FSL Merge node to combine AP and PA b0 volumes.
The caller is responsible for wiring ``in_files`` via a workflow
connection (use ``nipype.interfaces.utility.Merge`` upstream to
assemble the list). Never set ``in_files`` statically here: a
conditional assignment that fires only when the files already exist
produces a different hash on first run vs. subsequent runs, causing
Nipype to re-execute the node even when no real inputs changed.
Args:
output_path: Path for output merged b0 file
name: Nipype node name (must be unique within a workflow)
Returns:
Configured Nipype Node for Merge
Example:
>>> node = prepare_merge_b0_node(output_path="b0_merged.nii.gz")
"""
# Ensure output directory exists
out_dir = Path(output_path).parent
out_dir.mkdir(parents=True, exist_ok=True)
merge = Node(Merge(), name=name)
merge.inputs.dimension = "t"
merge.inputs.merged_file = str(output_path)
logger.debug(f"Configured Merge node '{name}' to combine AP/PA b0 volumes")
return merge
[docs]
def prepare_topup_node(acqparams_file: str, output_base: str, name: str = "topup") -> Node:
"""Create FSL TOPUP node for susceptibility distortion correction.
Args:
acqparams_file: Path to acquisition parameters file (--datain)
output_base: Base name for output files (without extension)
name: Nipype node name (must be unique within a workflow)
Returns:
Configured Nipype Node for TOPUP
Example:
>>> node = prepare_topup_node(
... acqparams_file="acqparams.txt",
... output_base="topup_results"
... )
Note:
Uses standard FSL b02b0.cnf configuration file. The acquisition
parameters file should contain one line per input volume with format:
phase_encode_direction readout_time (e.g., "0 1 0 0.05" for PA).
"""
# Ensure output directory exists
out_dir = Path(output_base).parent
out_dir.mkdir(parents=True, exist_ok=True)
topup = Node(TOPUP(), name=name)
if Path(acqparams_file).exists():
topup.inputs.encoding_file = str(acqparams_file)
topup.inputs.config = "b02b0.cnf" # Standard FSL config for b0 to b0 registration
topup.inputs.out_base = str(output_base)
logger.debug(f"Configured TOPUP node '{name}' with acqparams: {acqparams_file}")
return topup
def _find_eddy_cuda() -> str | None:
"""Return an eddy CUDA binary name, or ``None`` if none is found.
Searches ``$FSLDIR/bin`` for any file whose name starts with
``eddy_cuda`` (covering both the unversioned ``eddy_cuda`` and
versioned variants such as ``eddy_cuda11.0`` or ``eddy_cuda10.2``).
Also checks ``$PATH`` for the same prefix. The binary with the
highest version suffix is preferred; the unversioned name wins when
no versioned variant is found.
"""
import re
fsl_bin = Path(os.environ.get("FSLDIR", "")) / "bin"
candidates: list[str] = []
# Scan $FSLDIR/bin for eddy_cuda* files.
if fsl_bin.is_dir():
for entry in fsl_bin.iterdir():
if entry.name.startswith(_EDDY_CUDA_PREFIX) and entry.is_file():
candidates.append(entry.name)
# Also check PATH for the unversioned name and any versioned variant
# that shutil.which can resolve (covers non-FSL installs).
if shutil.which(_EDDY_CUDA_PREFIX) is not None:
candidates.append(_EDDY_CUDA_PREFIX)
if not candidates:
return None
# Sort so versioned names (eddy_cuda11.0 > eddy_cuda10.2) come first;
# fall back to lexicographic order which puts higher version numbers last,
# so reverse-sort puts the highest version first.
def _version_key(name: str) -> tuple[int, int]:
m = re.search(r"(\d+)\.(\d+)$", name)
return (int(m.group(1)), int(m.group(2))) if m else (0, 0)
candidates = sorted(set(candidates), key=_version_key, reverse=True)
return candidates[0]
[docs]
def prepare_eddy_node(
acqparams_file: str,
index_file: str,
bvals: str,
bvecs: str,
output_base: str,
use_cuda: bool = True,
name: str = "eddy",
) -> Node:
"""Create FSL Eddy node for eddy current and motion correction.
Args:
acqparams_file: Path to acquisition parameters file (--acqp)
index_file: Path to index file mapping volumes to acqparams rows (--index)
bvals: Path to b-values file
bvecs: Path to b-vectors file
output_base: Base name for output files
use_cuda: If True, attempt to use GPU-accelerated eddy_cuda
name: Nipype node name (must be unique within a workflow)
Returns:
Configured Nipype Node for Eddy
Example:
>>> node = prepare_eddy_node(
... acqparams_file="acqparams.txt",
... index_file="index.txt",
... bvals="bvals",
... bvecs="bvecs",
... output_base="eddy_corrected",
... use_cuda=True
... )
Note:
When use_cuda=True, attempts eddy_cuda10.2 first, then falls back
to standard eddy if CUDA is not available. Enables outlier
replacement (repol=True) and assumes single-shell or multi-shell
data (is_shelled=True).
"""
# Ensure output directory exists
out_dir = Path(output_base).parent
out_dir.mkdir(parents=True, exist_ok=True)
eddy = Node(Eddy(), name=name)
# Required inputs
if Path(acqparams_file).exists():
eddy.inputs.in_acqp = str(acqparams_file)
if Path(index_file).exists():
eddy.inputs.in_index = str(index_file)
if Path(bvals).exists():
eddy.inputs.in_bval = str(bvals)
eddy.inputs.in_bvec = str(bvecs)
eddy.inputs.out_base = str(output_base)
# Correction options
eddy.inputs.repol = True # Replace outliers
eddy.inputs.is_shelled = True # Data organized in shells
# Use CUDA binary only if the binary actually exists on this host.
# Three-tier selection:
# 1. use_cuda=True — Nipype's own mechanism; works when eddy_cuda10.2 or
# eddy_cuda (unversioned) is in PATH.
# 2. interface._cmd — Direct override for versioned binaries that Nipype
# does not know about (e.g. eddy_cuda11.0).
# 3. CPU fallback — Warn and run without GPU when nothing is found.
if use_cuda:
if any(shutil.which(n) for n in _NIPYPE_EDDY_CUDA_NAMES):
eddy.inputs.use_cuda = True
logger.debug(f"Configured Eddy node '{name}' with CUDA acceleration (Nipype native)")
else:
cuda_bin = _find_eddy_cuda()
if cuda_bin:
eddy.interface._cmd = cuda_bin
logger.debug(f"Configured Eddy node '{name}' with CUDA acceleration ({cuda_bin})")
else:
logger.warning("eddy_cuda not found on PATH or $FSLDIR/bin — running Eddy on CPU")
logger.debug(f"Configured Eddy node '{name}' with CPU (eddy_cuda absent)")
else:
logger.debug(f"Configured Eddy node '{name}' with CPU")
return eddy
[docs]
def prepare_convert_to_short_node(output_path: str, name: str = "convert_short") -> Node:
"""Create FSL ImageMaths node to convert to short datatype.
Converts image data to 16-bit signed integer (short) format for
compatibility with 3D Slicer and reduced file size.
Args:
output_path: Path for output converted file
name: Nipype node name (must be unique within a workflow)
Returns:
Configured Nipype Node for ImageMaths
Example:
>>> node = prepare_convert_to_short_node(
... output_path="dwi_short.nii.gz"
... )
Note:
This conversion is often necessary for visualization in 3D Slicer,
which has limitations with certain data types.
"""
# Ensure output directory exists
out_dir = Path(output_path).parent
out_dir.mkdir(parents=True, exist_ok=True)
convert = Node(ImageMaths(), name=name)
convert.inputs.out_file = str(output_path)
convert.inputs.out_data_type = "short"
logger.debug(f"Configured ImageMaths node '{name}' to convert to short datatype")
return convert