"""ROI merging from multiple atlas sources.
NOTE: Both tasks run inside Nipype ``Function`` nodes, which serialize each
function's source in isolation. A task therefore cannot reference module-level
helpers or sibling tasks at runtime — every helper must be defined *inside* the
function body and every import must be local. The two tasks consequently share
shape (out-dir scoping, mask union, waypoint concat) but each carries its own
nested copies; the only behavioural difference is the seed rule (binary merge
picks the left seed; the list merge unions all seeds).
"""
[docs]
def binary_merge_rois_task(
left_seed: str = "",
left_waypoints_file: str = "",
left_stop: str = "",
left_avoid: str = "",
left_target: str = "",
right_seed: str = "",
right_waypoints_file: str = "",
right_stop: str = "",
right_avoid: str = "",
right_target: str = "",
output_dir: str = "",
hemisphere: str = "both",
) -> tuple:
"""Combine two ROI bundles into one (legacy binary-merge interface).
Used by the MRtrix3 workflow's binary-tree merger and by the HCP
workflow's final_merger that combines atlas-joined output with the
SynthSeg branch. Semantics match the historical contract:
* Seed: pick-first (``left`` wins over ``right``).
* Waypoints: concatenate both files' lines.
* Stop / avoid / target: union via ``fslmaths`` when both are present,
else passthrough whichever exists.
Args:
hemisphere: ``"left"``, ``"right"``, or ``"both"`` (default). Scopes
the output dir to ``<output_dir>/<hemisphere>/`` for the first
two so concurrent left/right iterations don't race on the
same merged-output paths.
Returns:
Tuple ``(seed, waypoints_file, stop_mask, avoid_mask, target_mask)``.
"""
from pathlib import Path
from thesis.workflows.hcp.operations._fsl import run_fsl_command
out_path = Path(output_dir)
if hemisphere in ("left", "right"):
out_path = out_path / hemisphere
out_path.mkdir(parents=True, exist_ok=True)
def union_masks(paths: list, name: str) -> str:
kept = [p for p in paths if p and Path(p).exists()]
if not kept:
return ""
if len(kept) == 1:
return str(kept[0])
merged = str(out_path / f"{name}_merged.nii.gz")
run_fsl_command(["fslmaths", kept[0], "-bin", merged])
for path in kept[1:]:
run_fsl_command(["fslmaths", merged, "-add", path, "-bin", merged])
return merged
# Seed semantics differ from merge_rois_task: pick-first (left wins).
seed = left_seed if left_seed else right_seed
all_waypoints: list = []
for waypoint_file in (left_waypoints_file, right_waypoints_file):
if waypoint_file and Path(waypoint_file).exists():
with open(waypoint_file, "r", encoding="utf-8") as handle:
all_waypoints.extend(line.strip() for line in handle if line.strip())
waypoints_file = ""
if all_waypoints:
waypoints_file = str(out_path / "merged_waypoints.txt")
with open(waypoints_file, "w", encoding="utf-8") as handle:
handle.write("\n".join(all_waypoints) + "\n")
stop_mask = union_masks([left_stop, right_stop], "stop")
avoid_mask = union_masks([left_avoid, right_avoid], "avoid")
target_mask = union_masks([left_target, right_target], "target")
return seed, waypoints_file, stop_mask, avoid_mask, target_mask
[docs]
def merge_rois_task(
seeds: list,
waypoints_files: list,
stop_masks: list,
avoid_masks: list,
target_masks: list,
output_dir: str,
hemisphere: str = "both",
) -> tuple:
"""Merge N ROI bundles (one per atlas source) into one canonical bundle.
Designed to be the join task for an upstream ``MapNode``. Each list argument
is a per-iteration value (one entry per atlas source), and may contain empty
strings for sources that didn't produce that role.
Args:
seeds: Per-source seed mask paths.
waypoints_files: Per-source waypoint list-file paths.
stop_masks: Per-source stop mask paths.
avoid_masks: Per-source avoid mask paths.
target_masks: Per-source target mask paths.
output_dir: Directory for merged outputs.
hemisphere: ``"left"``, ``"right"``, or ``"both"`` (default). Scopes
``<output_dir>/<hemisphere>/`` for left/right so parallel
hemisphere iterations under ``--hemisphere both-separately`` don't
race on identically-named merged outputs.
Returns:
Tuple ``(seed, waypoints_file, stop_mask, avoid_mask, target_mask)``.
"""
from pathlib import Path
from thesis.workflows.hcp.operations._fsl import run_fsl_command
def existing(paths: list) -> list:
return [p for p in paths if p and Path(p).exists()]
out_path = Path(output_dir)
if hemisphere in ("left", "right"):
out_path = out_path / hemisphere
out_path.mkdir(parents=True, exist_ok=True)
def union_masks(paths: list, name: str) -> str:
kept = existing(paths)
if not kept:
return ""
if len(kept) == 1:
return str(kept[0])
merged = str(out_path / f"{name}_merged.nii.gz")
run_fsl_command(["fslmaths", kept[0], "-bin", merged])
for path in kept[1:]:
run_fsl_command(["fslmaths", merged, "-add", path, "-bin", merged])
return merged
seed = union_masks(seeds, "seed")
stop_mask = union_masks(stop_masks, "stop")
avoid_mask = union_masks(avoid_masks, "avoid")
target_mask = union_masks(target_masks, "target")
waypoints_file = ""
all_waypoints: list = []
for waypoint_file in existing(waypoints_files):
with open(waypoint_file, "r", encoding="utf-8") as handle:
all_waypoints.extend(line.strip() for line in handle if line.strip())
if all_waypoints:
waypoints_file = str(out_path / "merged_waypoints.txt")
with open(waypoints_file, "w", encoding="utf-8") as handle:
handle.write("\n".join(all_waypoints) + "\n")
return seed, waypoints_file, stop_mask, avoid_mask, target_mask