Source code for thesis.workflows.hcp.operations.transformation

"""ROI transformation using ANTs.

NOTE: Runs inside a Nipype Function node; the thesis logger is not available at
runtime.  ``print()`` may be used for progress output instead.

The low-level ANTs call and sform/qform fix are provided by the shared
:mod:`thesis.workflows.transforms.operations` module.  This module retains its
original public interface for backward compatibility with the HCP workflow.
"""

from __future__ import annotations


[docs] def transform_rois_task( seed: str, waypoints_file: str, stop_mask: str, avoid_mask: str, target_mask: str, warp_field: str | list[str], reference_image: str, output_dir: str, invert_transform_flags: list[bool] | None = None, hemisphere: str = "both", _ordering_signal: str = "", ) -> tuple: """Apply template-to-patient space transform to ROIs using ANTs ApplyTransforms. Returns transformed ROI paths in the same structure as extract_rois_task. Args: seed: Path to seed ROI mask (or empty string) waypoints_file: Text file containing paths to waypoint masks stop_mask: Path to stop mask (or empty string) avoid_mask: Path to avoid mask (or empty string) target_mask: Path to target/destination mask (or empty string) warp_field: Path or ordered list of transform paths for template-to-patient transformation reference_image: Reference image in patient space for resampling output_dir: Output directory for transformed ROIs invert_transform_flags: Optional per-transform inversion flags. hemisphere: ``"left"``, ``"right"``, or ``"both"`` (default). When the HCP workflow runs under ``--hemisphere both-separately``, two iterations of this task race for the same output paths. Scoping the output dir per hemisphere (``<output_dir>/<hemisphere>/``) prevents the collision. ``"both"`` keeps the legacy flat layout for single-hemisphere runs. _ordering_signal: Unused ordering-only input. Connecting an upstream node (e.g. registration completion, via the hcp ``roi_transform_gate`` contract field) to this MapNode input creates a Nipype dependency edge so the ROI warp waits until the template->patient transforms exist on disk. The value itself is ignored. Returns: Tuple of (transformed_seed, transformed_waypoints_file, transformed_stop, transformed_avoid, transformed_target) """ from pathlib import Path from thesis.workflows.transforms.operations import apply_transform_ants transform_paths = warp_field if isinstance(warp_field, list) else [warp_field] transform_paths = [p for p in transform_paths if p] # No-op passthrough when no transform is configured for this source. if not transform_paths: return (seed, waypoints_file, stop_mask, avoid_mask, target_mask) # Per-hemisphere subdir so that parallel left/right iterations under # --hemisphere both-separately don't race on identically-named outputs # (e.g. rois_transformed/<source>/internal-capsule-waypoint_transformed.nii.gz). # Mirrors the scoping applied by extract_rois_task. out_path = Path(output_dir) if hemisphere in ("left", "right"): out_path = out_path / hemisphere out_path.mkdir(parents=True, exist_ok=True) def transform_roi(input_mask: str, stem: str) -> str: if not input_mask: return "" input_path = Path(input_mask) if not input_path.exists(): raise FileNotFoundError( f"ROI mask not found: '{input_mask}'. " "This may happen when Nipype uses a stale cache but the " "extracted ROI files have been deleted. Try clearing the " "Nipype working directory or re-running with --force." ) output_path = out_path / f"{stem}_transformed.nii.gz" return str( apply_transform_ants( input_image=input_path, output_image=output_path, transforms=[Path(p) for p in transform_paths], reference_image=Path(reference_image), interpolation="GenericLabel", invert_transform_flags=invert_transform_flags, ) ) def nii_stem(p: str) -> str: return Path(p).with_suffix("").stem transformed_seed = transform_roi(seed, nii_stem(seed) if seed else "seed") transformed_waypoints_file = "" if waypoints_file and Path(waypoints_file).exists(): with open(waypoints_file, "r", encoding="utf-8") as fh: waypoint_paths = [line.strip() for line in fh if line.strip()] transformed_wp_paths = [transform_roi(wp, nii_stem(wp)) for wp in waypoint_paths] transformed_wp_paths = [p for p in transformed_wp_paths if p] if transformed_wp_paths: transformed_waypoints_file = str(out_path / "waypoints_transformed.txt") with open(transformed_waypoints_file, "w", encoding="utf-8") as fh: for wp in transformed_wp_paths: fh.write(wp + "\n") transformed_stop = transform_roi(stop_mask, nii_stem(stop_mask) if stop_mask else "stop") transformed_avoid = transform_roi(avoid_mask, nii_stem(avoid_mask) if avoid_mask else "avoid") transformed_target = transform_roi( target_mask, nii_stem(target_mask) if target_mask else "target" ) return ( transformed_seed, transformed_waypoints_file, transformed_stop, transformed_avoid, transformed_target, )