"""Raw-to-HCP-compatible diffusion preprocessing workflow."""
from __future__ import annotations
import shutil
from pathlib import Path
from nipype import Node, Workflow
from nipype.interfaces.utility import Function
from nipype.interfaces.utility import Merge as UtilMerge
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.contracts import attach_outputnode
from thesis.core.decorators import requires, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import PatientFile
from thesis.workflows.preprocess.config import PreprocessConfig, prepare_preprocess_paths
from thesis.workflows.preprocess.nodes import (
make_ants_apply_transforms_interface,
prepare_bedpostx_node,
prepare_brain_extraction_node,
prepare_convert_to_short_node,
prepare_create_index_node,
prepare_dtifit_node,
prepare_eddy_node,
prepare_extract_b0_node,
prepare_gm_wm_csf_converter_node,
prepare_merge_b0_node,
prepare_modify_bval_node,
prepare_n4_bias_correction_node,
prepare_robust_fov_node,
prepare_synthseg_node,
prepare_topup_node,
prepare_tract_masks_creator_node,
)
from thesis.workflows.preprocess.operations.file_ops import (
create_acqparams_file,
create_hcp_output_structure,
)
logger = get_logger(__name__)
__all__ = ["build_workflow", "verify_requirements"]
def _load_preprocess_config(config: PipelineConfig) -> PreprocessConfig:
"""Validate and normalize the workflow-specific preprocess config."""
raw = getattr(config, "preprocess", {}) or {}
if isinstance(raw, PreprocessConfig):
return raw
return PreprocessConfig.model_validate(raw)
def _copy_file(source: str, destination: str) -> str:
"""Copy a file and return the destination path."""
import shutil
from pathlib import Path
src = Path(source)
dst = Path(destination)
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
return str(dst.resolve())
def _passthrough_dwi(dwi: str, bvecs: str, out_data: str) -> tuple:
"""Pass an already-corrected DWI through unchanged (used when run_eddy=False).
Mirrors the Eddy node's output interface so it is a drop-in source: copies the
input DWI to the HCP ``data.nii.gz`` and returns it as ``out_corrected``, plus
the original (unrotated) bvecs as ``out_rotated_bvecs``. Runs inside a Nipype
Function node, so it must be fully self-contained.
Args:
dwi: Path to the already-corrected input DWI.
bvecs: Path to the input bvecs (no eddy rotation applied).
out_data: Destination for the HCP ``data.nii.gz``.
Returns:
Tuple of ``(out_corrected, out_rotated_bvecs)`` paths.
"""
import shutil
from pathlib import Path
out = Path(out_data)
out.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(str(dwi), str(out))
return str(out), str(bvecs)
def _label_output_name(label_path: str, output_dir: str, patient_id: str) -> str:
"""Build a patient-specific warped label output path."""
label = Path(label_path)
suffix = ".nii.gz" if label.suffix == ".gz" else label.suffix or ".nii.gz"
stem = label.name.replace(".nii.gz", "").replace(".nii", "")
out = Path(output_dir) / "labels" / f"{patient_id}_{stem}{suffix}"
out.parent.mkdir(parents=True, exist_ok=True)
return str(out.resolve())
[docs]
def verify_requirements(
config: PipelineConfig,
context: ProcessingContext,
*,
t1: Path | None = None,
t2: Path | None = None,
dwi_ap: Path | None = None,
dwi_ap_bval: Path | None = None,
dwi_ap_bvec: Path | None = None,
dwi_pa: Path | None = None,
**_: object,
) -> list[str]:
"""Preflight checks: required inputs exist, required binaries on PATH."""
errors: list[str] = []
preproc = _load_preprocess_config(config)
# When called directly (not via the registry's composite verifier) the
# path kwargs come through as None — fall back to the helper resolution
# so external callers don't need to construct path declarations.
if t1 is None and dwi_ap is None:
paths = prepare_preprocess_paths(config, context)
t1, t2 = paths["t1_image"], paths["t2_image"]
dwi_ap, dwi_pa = paths["dwi_ap"], paths["dwi_pa"]
dwi_ap_bval, dwi_ap_bvec = paths["dwi_ap_bval"], paths["dwi_ap_bvec"]
required: dict[str, Path | None] = {
"T1": t1,
"DWI AP": dwi_ap,
"DWI AP bval": dwi_ap_bval,
"DWI AP bvec": dwi_ap_bvec,
}
if t2 is not None:
required["T2"] = t2
if preproc.run_topup and dwi_pa is not None:
required["DWI PA"] = dwi_pa
for label, path in required.items():
if path is None or not Path(path).exists():
errors.append(f"Missing required preprocess input for {label}: '{path}'")
commands: list[str] = []
if preproc.run_eddy:
commands.append("eddy")
if preproc.run_dtifit:
commands.append("dtifit")
if preproc.run_robustfov:
commands.append("robustfov")
if preproc.run_topup:
commands.append("topup")
if preproc.run_bedpostx:
commands.append("bedpostx")
if config.preprocessing.brain_extraction_method == "synthstrip":
commands.append("mri_synthstrip")
if preproc.run_synthseg:
commands.append("mri_synthseg")
if preproc.run_coregistration:
commands.extend(["antsRegistration", "antsApplyTransforms"])
errors.extend(
f"Required command not found on PATH: '{cmd}'"
for cmd in sorted(c for c in commands if shutil.which(c) is None)
)
return errors
[docs]
@workflow(
name="preprocess",
description="Raw DWI/T1/T2 preprocessing to an HCP-compatible output layout.",
protocol="preprocess",
default_config="preprocess",
)
@requires(
t1=PatientFile(
default="{patient_id}_T1.nii.gz", config_path="preprocess.t1_image", optional=True
),
# No ``default=`` on t2 / dwi_pa: the user can set the field to ``null``
# in config to signal "no T2 acquired", and that ``None`` must propagate
# so the workflow body skips the T2 branch.
t2=PatientFile(config_path="preprocess.t2_image", optional=True),
dwi_ap=PatientFile(
default="{patient_id}_dmri_AP.nii.gz", config_path="preprocess.dwi_ap", optional=True
),
dwi_ap_bval=PatientFile(
default="{patient_id}_dmri_AP.bval",
config_path="preprocess.dwi_ap_bval",
optional=True,
),
dwi_ap_bvec=PatientFile(
default="{patient_id}_dmri_AP.bvec",
config_path="preprocess.dwi_ap_bvec",
optional=True,
),
dwi_pa=PatientFile(config_path="preprocess.dwi_pa", optional=True),
)
@verify(verify_requirements)
def build_workflow(
*,
t1: Path | None,
t2: Path | None,
dwi_ap: Path | None,
dwi_ap_bval: Path | None,
dwi_ap_bvec: Path | None,
dwi_pa: Path | None,
config: PipelineConfig,
context: ProcessingContext,
) -> Workflow:
"""Build the preprocess workflow for one raw patient dataset."""
preproc = _load_preprocess_config(config)
synthstrip_use_gpu = bool(getattr(config.hardware, "gpu_enabled", False))
synthstrip_gpu_device = getattr(config.hardware, "gpu_device", None)
output_dir = (
Path(context.output_dir).resolve()
if context.output_dir is not None
else Path(".").resolve()
)
dirs = create_hcp_output_structure(output_dir, context.patient_id)
pid = context.patient_id
def _output(pattern: str) -> Path:
return output_dir / pattern.replace("{patient_id}", pid)
t1_final = _output(preproc.t1_final)
t2_final = _output(preproc.t2_final) if t2 is not None else None
wf = Workflow(name=f"preprocess_{pid}")
if context.working_dir is not None:
wf.base_dir = str(context.working_dir)
topup_dir = output_dir / "topup"
eddy_dir = output_dir / "eddy"
labels_dir = output_dir / "labels"
is_synthstrip = config.preprocessing.brain_extraction_method == "synthstrip"
# Brain-extraction port names differ between SynthStrip (input_image /
# output_image / output_mask) and FSL BET (in_file / out_file / mask_file).
# Resolve once here and reuse everywhere a brain-extraction node is wired.
be_in_port = "input_image" if is_synthstrip else "in_file"
be_out_port = "output_image" if is_synthstrip else "out_file"
be_mask_port = "output_mask" if is_synthstrip else "mask_file"
def _brain_extract(name: str, frac: float, brain: Path, mask: Path | None = None) -> Node:
node = prepare_brain_extraction_node(
method=config.preprocessing.brain_extraction_method,
frac=frac,
name=name,
use_gpu=synthstrip_use_gpu,
gpu_device=synthstrip_gpu_device,
# BET-only tuning knobs (ignored on the synthstrip path).
robust=preproc.bet.robust,
padding=preproc.bet.padding,
radius=preproc.bet.radius,
)
if is_synthstrip:
node.inputs.output_image = str(brain)
if mask is not None:
node.inputs.output_mask = str(mask)
else:
node.inputs.out_file = str(brain)
return node
topup_dir.mkdir(parents=True, exist_ok=True)
eddy_dir.mkdir(parents=True, exist_ok=True)
labels_dir.mkdir(parents=True, exist_ok=True)
# Write acqparams.txt eagerly (content is config-determined): a Function
# node would invalidate the hash on every workflow.py edit and force
# TOPUP/Eddy to re-run. create_acqparams_file is idempotent.
acqparams_path = str(topup_dir / "acqparams.txt")
create_acqparams_file(
acqparams_path, preproc.acq_params.bandwidth, preproc.acq_params.phase_encoding_dirs
)
modify_bval = prepare_modify_bval_node()
modify_bval.inputs.input_bval = str(dwi_ap_bval)
modify_bval.inputs.output_bval = str(dirs["diffusion"] / "bvals")
create_index = prepare_create_index_node()
create_index.inputs.output_index = str(eddy_dir / "index.txt")
extract_b0_ap = prepare_extract_b0_node(
str(dwi_ap), str(topup_dir / "b0_AP.nii.gz"), name="extract_b0_ap"
)
extract_b0_pa = merge_b0 = pair_b0 = topup = topup_mask = topup_b0 = pre_eddy_mask = None
if preproc.run_topup and dwi_pa is not None:
extract_b0_pa = prepare_extract_b0_node(
str(dwi_pa), str(topup_dir / "b0_PA.nii.gz"), name="extract_b0_pa"
)
merge_b0 = prepare_merge_b0_node(str(topup_dir / "b0_AP_PA.nii.gz"))
# Use UtilMerge (a stable BaseInterface) over a Function node so an
# edit to workflow.py doesn't invalidate the TOPUP cache.
pair_b0 = Node(UtilMerge(2), name="pair_b0_files")
topup = prepare_topup_node(acqparams_path, str(topup_dir / "topup_results"))
topup_b0 = prepare_extract_b0_node(
str(topup_dir / "topup_results_corrected.nii.gz"),
str(topup_dir / "topup_b0.nii.gz"),
name="extract_topup_b0",
)
topup_mask = _brain_extract(
"topup_brain_extraction",
preproc.bet.frac_dwi,
topup_dir / "topup_b0_brain.nii.gz",
topup_dir / "topup_b0_brain_mask.nii.gz",
)
else:
pre_eddy_mask = _brain_extract(
"pre_eddy_brain_extraction",
preproc.bet.frac_dwi,
eddy_dir / "pre_eddy_b0_brain.nii.gz",
eddy_dir / "pre_eddy_b0_brain_mask.nii.gz",
)
# ``eddy`` is the diffusion source: every downstream consumer (and the output
# contract) reads out_corrected / out_rotated_bvecs from it. When run_eddy is
# off (e.g. an already-eddy-corrected clinical DWI), swap in a passthrough
# Function node with the same output interface so the rest of the graph is
# unchanged; it copies the input DWI to data.nii.gz and keeps the bvecs as-is.
if preproc.run_eddy:
eddy = prepare_eddy_node(
acqparams_path,
str(eddy_dir / "index.txt"),
str(dirs["diffusion"] / "bvals"),
str(dwi_ap_bvec),
str(dirs["diffusion"] / "data"),
use_cuda=bool(getattr(config.hardware, "gpu_enabled", False)),
)
eddy.inputs.in_file = str(dwi_ap)
else:
eddy = Node(
Function(
input_names=["dwi", "bvecs", "out_data"],
output_names=["out_corrected", "out_rotated_bvecs"],
function=_passthrough_dwi,
),
name="eddy_passthrough",
)
eddy.inputs.dwi = str(dwi_ap)
eddy.inputs.bvecs = str(dwi_ap_bvec)
eddy.inputs.out_data = str(dirs["diffusion"] / "data.nii.gz")
eddy_b0 = prepare_extract_b0_node(
str(dirs["diffusion"] / "data.nii.gz"),
str(eddy_dir / "data_b0.nii.gz"),
name="extract_eddy_b0",
)
eddy_brain = _brain_extract(
"eddy_brain_extraction",
preproc.bet.frac_dwi,
dirs["diffusion"] / "data_brain.nii.gz",
dirs["diffusion"] / "nodif_brain_mask.nii.gz",
)
copy_bvecs = Node(
Function(
input_names=["source", "destination"],
output_names=["destination"],
function=_copy_file,
),
name="copy_rotated_bvecs",
)
copy_bvecs.inputs.destination = str(dirs["diffusion"] / "bvecs")
convert_short = prepare_convert_to_short_node(
str(dirs["diffusion"] / f"{context.patient_id}_dwi_slicer.nii.gz")
)
# dtifit is optional: gated on run_dtifit. Its tensor outputs (FA/MD/…) are
# not consumed downstream (tractography uses BedpostX), so when off the node
# is simply omitted. Lets single-shell data that can't fit a tensor (e.g. a
# b800-only clinical scan, where dtifit aborts with a singular matrix) run.
dtifit = None
if preproc.run_dtifit:
dtifit = prepare_dtifit_node(
use_wls=preproc.dtifit.use_wls,
save_tensor=preproc.dtifit.save_tensor,
compute_sse=True,
compute_kurt=preproc.dtifit.compute_kurt,
)
dtifit.inputs.base_name = str(dirs["dti_wls"] / f"{context.patient_id}_dti_wls")
bedpostx = None
if preproc.run_bedpostx:
bedpostx = prepare_bedpostx_node(
n_fibres=preproc.bedpostx.n_fibres,
model=preproc.bedpostx.model,
burn_in=preproc.bedpostx.burn_in,
n_jumps=preproc.bedpostx.n_jumps,
sample_every=preproc.bedpostx.sample_every,
# Respect hardware.gpu_enabled (same source as eddy/synthstrip) and
# allow preproc.bedpostx.use_gpu to explicitly opt in even when the
# hardware flag is off (e.g. on a CPU-only node in a mixed cluster).
use_gpu=bool(getattr(config.hardware, "gpu_enabled", False))
or preproc.bedpostx.use_gpu,
)
# FSL's bedpostx appends ".bedpostX" to the subject directory it receives,
# so passing dirs["diffusion"] (T1w/Diffusion) produces the correct HCP
# output at T1w/Diffusion.bedpostX — not T1w/Diffusion.bedpostX.bedpostX.
bedpostx.inputs.out_dir = str(dirs["diffusion"])
wf.add_nodes([modify_bval, create_index, extract_b0_ap, eddy, eddy_b0, eddy_brain])
wf.add_nodes([copy_bvecs, convert_short])
if dtifit is not None:
wf.add_nodes([dtifit])
if bedpostx is not None:
wf.add_nodes([bedpostx])
wf.connect(modify_bval, "output_bval", create_index, "bval_file")
wf.connect(eddy, "out_corrected", eddy_b0, "in_file")
wf.connect(eddy, "out_corrected", convert_short, "in_file")
wf.connect(eddy_b0, "roi_file", eddy_brain, be_in_port)
wf.connect(eddy, "out_rotated_bvecs", copy_bvecs, "source")
if dtifit is not None:
wf.connect(eddy, "out_corrected", dtifit, "dwi")
wf.connect(eddy, "out_rotated_bvecs", dtifit, "bvecs")
wf.connect(modify_bval, "output_bval", dtifit, "bvals")
if bedpostx is not None:
wf.connect(eddy, "out_corrected", bedpostx, "dwi")
wf.connect(eddy, "out_rotated_bvecs", bedpostx, "bvecs")
wf.connect(modify_bval, "output_bval", bedpostx, "bvals")
if dtifit is not None:
wf.connect(eddy_brain, be_mask_port, dtifit, "mask")
if bedpostx is not None:
wf.connect(eddy_brain, be_mask_port, bedpostx, "mask")
# Eddy-input wiring (acqp mask / index / bval / topup field map). Skipped
# entirely when run_eddy is off, since the passthrough node has no such
# inputs and the DWI is already corrected.
if preproc.run_eddy and (
topup is not None
and merge_b0 is not None
and extract_b0_pa is not None
and topup_b0 is not None
):
wf.add_nodes([extract_b0_pa, merge_b0, topup, topup_b0])
if pair_b0 is not None:
wf.add_nodes([pair_b0])
if topup_mask is not None:
wf.add_nodes([topup_mask])
if pair_b0 is not None:
wf.connect(extract_b0_ap, "roi_file", pair_b0, "in1")
wf.connect(extract_b0_pa, "roi_file", pair_b0, "in2")
wf.connect(pair_b0, "out", merge_b0, "in_files")
wf.connect(merge_b0, "merged_file", topup, "in_file")
wf.connect(topup, "out_corrected", topup_b0, "in_file")
if topup_mask is not None:
wf.connect(topup_b0, "roi_file", topup_mask, be_in_port)
wf.connect(topup_mask, be_mask_port, eddy, "in_mask")
wf.connect(create_index, "output_index", eddy, "in_index")
wf.connect(modify_bval, "output_bval", eddy, "in_bval")
wf.connect(topup, "out_fieldcoef", eddy, "in_topup_fieldcoef")
wf.connect(topup, "out_movpar", eddy, "in_topup_movpar")
elif preproc.run_eddy:
if pre_eddy_mask is not None:
wf.add_nodes([pre_eddy_mask])
wf.connect(extract_b0_ap, "roi_file", pre_eddy_mask, be_in_port)
wf.connect(pre_eddy_mask, be_mask_port, eddy, "in_mask")
wf.connect(create_index, "output_index", eddy, "in_index")
wf.connect(modify_bval, "output_bval", eddy, "in_bval")
reg_dir = dirs["registration"]
def _structural_pipeline(modality: str, source: Path | None, frac: float, final: Path | None):
"""Build (robustfov?) → n4 → brain_extract pipeline for T1/T2."""
n4 = prepare_n4_bias_correction_node(name=f"n4_{modality.lower()}")
if final is not None:
n4.inputs.output_image = str(final)
if preproc.run_robustfov:
robust = prepare_robust_fov_node(name=f"robustfov_{modality.lower()}")
robust.inputs.in_file = str(source)
robust.inputs.out_roi = str(reg_dir / f"{pid}_{modality}_robustfov.nii.gz")
wf.add_nodes([robust, n4])
wf.connect(robust, "out_roi", n4, "input_image")
else:
n4.inputs.input_image = str(source)
wf.add_nodes([n4])
brain = _brain_extract(
f"{modality.lower()}_brain_extraction",
frac,
reg_dir / f"{pid}_{modality}_brain.nii.gz",
reg_dir / f"{pid}_{modality}_brainmask.nii.gz",
)
wf.connect(n4, "output_image", brain, be_in_port)
return n4, brain
n4_t1, t1_brain = _structural_pipeline("T1", t1, preproc.bet.frac_t1, t1_final)
n4_t2 = None
t2_brain = None
if t2 is not None:
n4_t2, t2_brain = _structural_pipeline("T2", t2, preproc.bet.frac_t2, t2_final)
if preproc.run_synthseg:
ss = preproc.synthseg
ss_cpu = bool(ss.cpu) or not bool(getattr(config.hardware, "gpu_enabled", True))
ss_threads = int(ss.threads)
ss_gpu_device = getattr(config.hardware, "gpu_device", None)
ss_dir = dirs["synthseg"]
def _build_synthseg(modality: str, name: str) -> Node:
node = prepare_synthseg_node(parc=ss.parc, robust=ss.robust, fast=ss.fast, name=name)
node.inputs.output_segmentation = str(ss_dir / f"{pid}_synthseg_{modality}.nii.gz")
node.inputs.vol = str(ss_dir / f"{pid}_vol_{modality}.csv")
node.inputs.qc = str(ss_dir / f"{pid}_qc_{modality}.csv")
node.inputs.resample = str(ss_dir / f"{pid}_{modality}_resampled.nii.gz")
if ss.crop is not None:
node.inputs.crop = int(ss.crop)
if ss_cpu:
node.inputs.cpu = True
node.inputs.threads = ss_threads
else:
node.inputs.use_gpu = True
if ss_gpu_device is not None:
node.inputs.environ = {"CUDA_VISIBLE_DEVICES": str(ss_gpu_device)}
return node
if ss.run_on_t1:
synthseg_t1 = _build_synthseg("T1", "synthseg_t1")
wf.add_nodes([synthseg_t1])
wf.connect(n4_t1, "output_image", synthseg_t1, "input_image")
if ss.run_on_t2 and n4_t2 is not None:
synthseg_t2 = _build_synthseg("T2", "synthseg_t2")
gm_wm_csf = prepare_gm_wm_csf_converter_node()
gm_wm_csf.inputs.output_image = str(ss_dir / f"{pid}_synthseg_GMWMmap.nii.gz")
tract_masks = prepare_tract_masks_creator_node()
tract_masks.inputs.output_prefix = str(ss_dir / f"{pid}_")
wf.add_nodes([synthseg_t2, gm_wm_csf, tract_masks])
wf.connect(n4_t2, "output_image", synthseg_t2, "input_image")
wf.connect(synthseg_t2, "output_segmentation", gm_wm_csf, "input_image")
wf.connect(synthseg_t2, "output_segmentation", tract_masks, "input_image")
if preproc.run_coregistration:
threads = int(getattr(config.hardware, "threads", 1))
from thesis.workflows.preprocess.nodes.registration import prepare_ants_registration_node
def _reg_node(name: str, transform_type: str, prefix: str) -> Node:
node = prepare_ants_registration_node(
transform_type=transform_type, num_threads=threads, name=name
)
node.inputs.output_transform_prefix = str(reg_dir / f"{pid}_{prefix}_")
node.inputs.output_warped_image = str(reg_dir / f"{pid}_{prefix}.nii.gz")
return node
dwi_to_t1 = _reg_node(
"dwi_to_t1_registration", preproc.registration.dwi_to_t1.transform_type, "b0toT1"
)
wf.add_nodes([dwi_to_t1])
wf.connect(eddy_brain, be_out_port, dwi_to_t1, "moving_image")
wf.connect(t1_brain, be_out_port, dwi_to_t1, "fixed_image")
if n4_t2 is not None and t2_brain is not None:
t2_to_t1 = _reg_node(
"t2_to_t1_registration", preproc.registration.t2_to_t1.transform_type, "T2toT1"
)
wf.add_nodes([t2_to_t1])
wf.connect(t2_brain, be_out_port, t2_to_t1, "moving_image")
wf.connect(t1_brain, be_out_port, t2_to_t1, "fixed_image")
# T1->template registration: lets a patient that was NOT used to build
# the cohort template still be registered to it. Built only when a
# template fixed image is configured (config.registration.fixed_image).
# Moving = T1 brain; fixed = the template image reused from the
# registration workflow's resolver (no new top-level path field).
t1_to_template = None
if config.registration.fixed_image:
from thesis.workflows.registration.paths import resolve_fixed_image
template_image = resolve_fixed_image(config, context)
t1_to_template = _reg_node(
"t1_to_template_registration",
preproc.registration.t1_to_template.transform_type,
"T1toTemplate",
)
t1_to_template.inputs.fixed_image = str(template_image)
wf.add_nodes([t1_to_template])
wf.connect(t1_brain, be_out_port, t1_to_template, "moving_image")
# MNI-label-to-DWI transform applies dwi_to_t1's composite transform
# to each label so labels in T1-space land on the DWI grid.
# Uses a MapNode so all labels share one Nipype node with per-label
# iteration rather than a Python loop building N separate nodes.
if preproc.label_transform.transform_mni_labels:
from nipype import MapNode
label_paths = list(preproc.label_transform.mni_labels_list)
if label_paths:
lt = preproc.label_transform
output_images = [_label_output_name(p, str(output_dir), pid) for p in label_paths]
warp_labels = MapNode(
# Interpolation is config-driven (default NearestNeighbor for
# discrete labels); MultiLabel is no longer hardcoded.
make_ants_apply_transforms_interface(interpolation=lt.interpolation),
iterfield=["input_image", "output_image"],
name="warp_labels",
)
warp_labels.inputs.input_image = label_paths
warp_labels.inputs.output_image = output_images
# Reference grid: when use_hcp_template is set, warp labels onto
# the template (HCP MNI) grid; otherwise keep them on the DWI grid
# (the b0, current default behavior).
if lt.use_hcp_template:
from thesis.workflows.registration.paths import resolve_fixed_image
warp_labels.inputs.reference_image = str(resolve_fixed_image(config, context))
else:
warp_labels.inputs.reference_image = str(eddy_dir / "data_b0.nii.gz")
# use_inverse_warp -> invert the single composite transform.
warp_labels.inputs.invert_transform_flags = [bool(lt.use_inverse_warp)]
wf.add_nodes([warp_labels])
wf.connect(dwi_to_t1, "composite_transform", warp_labels, "transforms")
# -- Output contract ----------------------------------------------------
# Stable named outputs for meta-workflow consumers (full_pipeline). The
# brain-extraction port branch was resolved once near the top (be_out_port /
# be_mask_port) so downstream stages read uniform field names regardless of
# bet vs synthstrip.
out_fields = ["t1_brain", "dwi_mask", "dwi_corrected", "rotated_bvecs", "modified_bval"]
has_t1_to_template = bool(preproc.run_coregistration and config.registration.fixed_image)
if preproc.run_coregistration:
out_fields.append("t1_to_dwi_transform")
if has_t1_to_template:
out_fields += ["t1_to_template_transform", "t1_to_template_warped"]
if bedpostx is not None:
out_fields += ["bedpostx_thsamples", "bedpostx_phsamples", "bedpostx_fsamples"]
outputnode = attach_outputnode(wf, out_fields)
wf.connect(
[
(t1_brain, outputnode, [(be_out_port, "t1_brain")]),
(eddy_brain, outputnode, [(be_mask_port, "dwi_mask")]),
(
eddy,
outputnode,
[
("out_corrected", "dwi_corrected"),
("out_rotated_bvecs", "rotated_bvecs"),
],
),
(modify_bval, outputnode, [("output_bval", "modified_bval")]),
]
)
# Field carries dwi_to_t1_registration.composite_transform. Named for the
# consumer side: hcp/mrtrix3 roi_dwi_resampler + fivett_to_dwi expose a
# `t1_to_dwi_transform` input port that uses it to resample T1-space images
# onto the DWI grid. The name is uniform across the contract by design.
if preproc.run_coregistration:
wf.connect(dwi_to_t1, "composite_transform", outputnode, "t1_to_dwi_transform")
# T1->template outputs (only when a template fixed image is configured). The
# composite transform places this patient's T1 into template space; the
# warped image is the patient T1 resampled onto the template grid. These let
# consumers register a patient that was not part of the template cohort.
if has_t1_to_template and t1_to_template is not None:
wf.connect(
[
(
t1_to_template,
outputnode,
[
("composite_transform", "t1_to_template_transform"),
("warped_image", "t1_to_template_warped"),
],
),
]
)
if bedpostx is not None:
wf.connect(
[
(
bedpostx,
outputnode,
[
("merged_thsamples", "bedpostx_thsamples"),
("merged_phsamples", "bedpostx_phsamples"),
("merged_fsamples", "bedpostx_fsamples"),
],
),
]
)
return wf