Source code for thesis.workflows.hcp.operations.transformation
"""ROI transformation using ANTs.
NOTE: Runs inside a Nipype Function node; the thesis logger is not available at
runtime. ``print()`` may be used for progress output instead.
The low-level ANTs call and sform/qform fix are provided by the shared
:mod:`thesis.workflows.transforms.operations` module. This module retains its
original public interface for backward compatibility with the HCP workflow.
"""
from __future__ import annotations
[docs]
def transform_rois_task(
seed: str,
waypoints_file: str,
stop_mask: str,
avoid_mask: str,
target_mask: str,
warp_field: str | list[str],
reference_image: str,
output_dir: str,
invert_transform_flags: list[bool] | None = None,
hemisphere: str = "both",
_ordering_signal: str = "",
) -> tuple:
"""Apply template-to-patient space transform to ROIs using ANTs ApplyTransforms.
Returns transformed ROI paths in the same structure as extract_rois_task.
Args:
seed: Path to seed ROI mask (or empty string)
waypoints_file: Text file containing paths to waypoint masks
stop_mask: Path to stop mask (or empty string)
avoid_mask: Path to avoid mask (or empty string)
target_mask: Path to target/destination mask (or empty string)
warp_field: Path or ordered list of transform paths for template-to-patient transformation
reference_image: Reference image in patient space for resampling
output_dir: Output directory for transformed ROIs
invert_transform_flags: Optional per-transform inversion flags.
hemisphere: ``"left"``, ``"right"``, or ``"both"`` (default). When the
HCP workflow runs under ``--hemisphere both-separately``, two
iterations of this task race for the same output paths. Scoping the
output dir per hemisphere (``<output_dir>/<hemisphere>/``) prevents
the collision. ``"both"`` keeps the legacy flat layout for
single-hemisphere runs.
_ordering_signal: Unused ordering-only input. Connecting an upstream
node (e.g. registration completion, via the hcp ``roi_transform_gate``
contract field) to this MapNode input creates a Nipype dependency
edge so the ROI warp waits until the template->patient transforms
exist on disk. The value itself is ignored.
Returns:
Tuple of (transformed_seed, transformed_waypoints_file, transformed_stop,
transformed_avoid, transformed_target)
"""
from pathlib import Path
from thesis.workflows.transforms.operations import apply_transform_ants
transform_paths = warp_field if isinstance(warp_field, list) else [warp_field]
transform_paths = [p for p in transform_paths if p]
# No-op passthrough when no transform is configured for this source.
if not transform_paths:
return (seed, waypoints_file, stop_mask, avoid_mask, target_mask)
# Per-hemisphere subdir so that parallel left/right iterations under
# --hemisphere both-separately don't race on identically-named outputs
# (e.g. rois_transformed/<source>/internal-capsule-waypoint_transformed.nii.gz).
# Mirrors the scoping applied by extract_rois_task.
out_path = Path(output_dir)
if hemisphere in ("left", "right"):
out_path = out_path / hemisphere
out_path.mkdir(parents=True, exist_ok=True)
def transform_roi(input_mask: str, stem: str) -> str:
if not input_mask:
return ""
input_path = Path(input_mask)
if not input_path.exists():
raise FileNotFoundError(
f"ROI mask not found: '{input_mask}'. "
"This may happen when Nipype uses a stale cache but the "
"extracted ROI files have been deleted. Try clearing the "
"Nipype working directory or re-running with --force."
)
output_path = out_path / f"{stem}_transformed.nii.gz"
return str(
apply_transform_ants(
input_image=input_path,
output_image=output_path,
transforms=[Path(p) for p in transform_paths],
reference_image=Path(reference_image),
interpolation="GenericLabel",
invert_transform_flags=invert_transform_flags,
)
)
def nii_stem(p: str) -> str:
return Path(p).with_suffix("").stem
transformed_seed = transform_roi(seed, nii_stem(seed) if seed else "seed")
transformed_waypoints_file = ""
if waypoints_file and Path(waypoints_file).exists():
with open(waypoints_file, "r", encoding="utf-8") as fh:
waypoint_paths = [line.strip() for line in fh if line.strip()]
transformed_wp_paths = [transform_roi(wp, nii_stem(wp)) for wp in waypoint_paths]
transformed_wp_paths = [p for p in transformed_wp_paths if p]
if transformed_wp_paths:
transformed_waypoints_file = str(out_path / "waypoints_transformed.txt")
with open(transformed_waypoints_file, "w", encoding="utf-8") as fh:
for wp in transformed_wp_paths:
fh.write(wp + "\n")
transformed_stop = transform_roi(stop_mask, nii_stem(stop_mask) if stop_mask else "stop")
transformed_avoid = transform_roi(avoid_mask, nii_stem(avoid_mask) if avoid_mask else "avoid")
transformed_target = transform_roi(
target_mask, nii_stem(target_mask) if target_mask else "target"
)
return (
transformed_seed,
transformed_waypoints_file,
transformed_stop,
transformed_avoid,
transformed_target,
)