Source code for thesis.workflows.preprocess.workflow

"""Raw-to-HCP-compatible diffusion preprocessing workflow."""

from __future__ import annotations

import shutil
from pathlib import Path

from nipype import Node, Workflow
from nipype.interfaces.utility import Function
from nipype.interfaces.utility import Merge as UtilMerge

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.contracts import attach_outputnode
from thesis.core.decorators import requires, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import PatientFile
from thesis.workflows.preprocess.config import PreprocessConfig, prepare_preprocess_paths
from thesis.workflows.preprocess.nodes import (
    make_ants_apply_transforms_interface,
    prepare_bedpostx_node,
    prepare_brain_extraction_node,
    prepare_convert_to_short_node,
    prepare_create_index_node,
    prepare_dtifit_node,
    prepare_eddy_node,
    prepare_extract_b0_node,
    prepare_gm_wm_csf_converter_node,
    prepare_merge_b0_node,
    prepare_modify_bval_node,
    prepare_n4_bias_correction_node,
    prepare_robust_fov_node,
    prepare_synthseg_node,
    prepare_topup_node,
    prepare_tract_masks_creator_node,
)
from thesis.workflows.preprocess.operations.file_ops import (
    create_acqparams_file,
    create_hcp_output_structure,
)

logger = get_logger(__name__)

__all__ = ["build_workflow", "verify_requirements"]


def _load_preprocess_config(config: PipelineConfig) -> PreprocessConfig:
    """Validate and normalize the workflow-specific preprocess config."""
    raw = getattr(config, "preprocess", {}) or {}
    if isinstance(raw, PreprocessConfig):
        return raw
    return PreprocessConfig.model_validate(raw)


def _copy_file(source: str, destination: str) -> str:
    """Copy a file and return the destination path."""
    import shutil
    from pathlib import Path

    src = Path(source)
    dst = Path(destination)
    dst.parent.mkdir(parents=True, exist_ok=True)
    shutil.copy2(src, dst)
    return str(dst.resolve())


def _passthrough_dwi(dwi: str, bvecs: str, out_data: str) -> tuple:
    """Pass an already-corrected DWI through unchanged (used when run_eddy=False).

    Mirrors the Eddy node's output interface so it is a drop-in source: copies the
    input DWI to the HCP ``data.nii.gz`` and returns it as ``out_corrected``, plus
    the original (unrotated) bvecs as ``out_rotated_bvecs``. Runs inside a Nipype
    Function node, so it must be fully self-contained.

    Args:
        dwi: Path to the already-corrected input DWI.
        bvecs: Path to the input bvecs (no eddy rotation applied).
        out_data: Destination for the HCP ``data.nii.gz``.

    Returns:
        Tuple of ``(out_corrected, out_rotated_bvecs)`` paths.
    """
    import shutil
    from pathlib import Path

    out = Path(out_data)
    out.parent.mkdir(parents=True, exist_ok=True)
    shutil.copyfile(str(dwi), str(out))
    return str(out), str(bvecs)


def _label_output_name(label_path: str, output_dir: str, patient_id: str) -> str:
    """Build a patient-specific warped label output path."""
    label = Path(label_path)
    suffix = ".nii.gz" if label.suffix == ".gz" else label.suffix or ".nii.gz"
    stem = label.name.replace(".nii.gz", "").replace(".nii", "")
    out = Path(output_dir) / "labels" / f"{patient_id}_{stem}{suffix}"
    out.parent.mkdir(parents=True, exist_ok=True)
    return str(out.resolve())


[docs] def verify_requirements( config: PipelineConfig, context: ProcessingContext, *, t1: Path | None = None, t2: Path | None = None, dwi_ap: Path | None = None, dwi_ap_bval: Path | None = None, dwi_ap_bvec: Path | None = None, dwi_pa: Path | None = None, **_: object, ) -> list[str]: """Preflight checks: required inputs exist, required binaries on PATH.""" errors: list[str] = [] preproc = _load_preprocess_config(config) # When called directly (not via the registry's composite verifier) the # path kwargs come through as None — fall back to the helper resolution # so external callers don't need to construct path declarations. if t1 is None and dwi_ap is None: paths = prepare_preprocess_paths(config, context) t1, t2 = paths["t1_image"], paths["t2_image"] dwi_ap, dwi_pa = paths["dwi_ap"], paths["dwi_pa"] dwi_ap_bval, dwi_ap_bvec = paths["dwi_ap_bval"], paths["dwi_ap_bvec"] required: dict[str, Path | None] = { "T1": t1, "DWI AP": dwi_ap, "DWI AP bval": dwi_ap_bval, "DWI AP bvec": dwi_ap_bvec, } if t2 is not None: required["T2"] = t2 if preproc.run_topup and dwi_pa is not None: required["DWI PA"] = dwi_pa for label, path in required.items(): if path is None or not Path(path).exists(): errors.append(f"Missing required preprocess input for {label}: '{path}'") commands: list[str] = [] if preproc.run_eddy: commands.append("eddy") if preproc.run_dtifit: commands.append("dtifit") if preproc.run_robustfov: commands.append("robustfov") if preproc.run_topup: commands.append("topup") if preproc.run_bedpostx: commands.append("bedpostx") if config.preprocessing.brain_extraction_method == "synthstrip": commands.append("mri_synthstrip") if preproc.run_synthseg: commands.append("mri_synthseg") if preproc.run_coregistration: commands.extend(["antsRegistration", "antsApplyTransforms"]) errors.extend( f"Required command not found on PATH: '{cmd}'" for cmd in sorted(c for c in commands if shutil.which(c) is None) ) return errors
[docs] @workflow( name="preprocess", description="Raw DWI/T1/T2 preprocessing to an HCP-compatible output layout.", protocol="preprocess", default_config="preprocess", ) @requires( t1=PatientFile( default="{patient_id}_T1.nii.gz", config_path="preprocess.t1_image", optional=True ), # No ``default=`` on t2 / dwi_pa: the user can set the field to ``null`` # in config to signal "no T2 acquired", and that ``None`` must propagate # so the workflow body skips the T2 branch. t2=PatientFile(config_path="preprocess.t2_image", optional=True), dwi_ap=PatientFile( default="{patient_id}_dmri_AP.nii.gz", config_path="preprocess.dwi_ap", optional=True ), dwi_ap_bval=PatientFile( default="{patient_id}_dmri_AP.bval", config_path="preprocess.dwi_ap_bval", optional=True, ), dwi_ap_bvec=PatientFile( default="{patient_id}_dmri_AP.bvec", config_path="preprocess.dwi_ap_bvec", optional=True, ), dwi_pa=PatientFile(config_path="preprocess.dwi_pa", optional=True), ) @verify(verify_requirements) def build_workflow( *, t1: Path | None, t2: Path | None, dwi_ap: Path | None, dwi_ap_bval: Path | None, dwi_ap_bvec: Path | None, dwi_pa: Path | None, config: PipelineConfig, context: ProcessingContext, ) -> Workflow: """Build the preprocess workflow for one raw patient dataset.""" preproc = _load_preprocess_config(config) synthstrip_use_gpu = bool(getattr(config.hardware, "gpu_enabled", False)) synthstrip_gpu_device = getattr(config.hardware, "gpu_device", None) output_dir = ( Path(context.output_dir).resolve() if context.output_dir is not None else Path(".").resolve() ) dirs = create_hcp_output_structure(output_dir, context.patient_id) pid = context.patient_id def _output(pattern: str) -> Path: return output_dir / pattern.replace("{patient_id}", pid) t1_final = _output(preproc.t1_final) t2_final = _output(preproc.t2_final) if t2 is not None else None wf = Workflow(name=f"preprocess_{pid}") if context.working_dir is not None: wf.base_dir = str(context.working_dir) topup_dir = output_dir / "topup" eddy_dir = output_dir / "eddy" labels_dir = output_dir / "labels" is_synthstrip = config.preprocessing.brain_extraction_method == "synthstrip" # Brain-extraction port names differ between SynthStrip (input_image / # output_image / output_mask) and FSL BET (in_file / out_file / mask_file). # Resolve once here and reuse everywhere a brain-extraction node is wired. be_in_port = "input_image" if is_synthstrip else "in_file" be_out_port = "output_image" if is_synthstrip else "out_file" be_mask_port = "output_mask" if is_synthstrip else "mask_file" def _brain_extract(name: str, frac: float, brain: Path, mask: Path | None = None) -> Node: node = prepare_brain_extraction_node( method=config.preprocessing.brain_extraction_method, frac=frac, name=name, use_gpu=synthstrip_use_gpu, gpu_device=synthstrip_gpu_device, # BET-only tuning knobs (ignored on the synthstrip path). robust=preproc.bet.robust, padding=preproc.bet.padding, radius=preproc.bet.radius, ) if is_synthstrip: node.inputs.output_image = str(brain) if mask is not None: node.inputs.output_mask = str(mask) else: node.inputs.out_file = str(brain) return node topup_dir.mkdir(parents=True, exist_ok=True) eddy_dir.mkdir(parents=True, exist_ok=True) labels_dir.mkdir(parents=True, exist_ok=True) # Write acqparams.txt eagerly (content is config-determined): a Function # node would invalidate the hash on every workflow.py edit and force # TOPUP/Eddy to re-run. create_acqparams_file is idempotent. acqparams_path = str(topup_dir / "acqparams.txt") create_acqparams_file( acqparams_path, preproc.acq_params.bandwidth, preproc.acq_params.phase_encoding_dirs ) modify_bval = prepare_modify_bval_node() modify_bval.inputs.input_bval = str(dwi_ap_bval) modify_bval.inputs.output_bval = str(dirs["diffusion"] / "bvals") create_index = prepare_create_index_node() create_index.inputs.output_index = str(eddy_dir / "index.txt") extract_b0_ap = prepare_extract_b0_node( str(dwi_ap), str(topup_dir / "b0_AP.nii.gz"), name="extract_b0_ap" ) extract_b0_pa = merge_b0 = pair_b0 = topup = topup_mask = topup_b0 = pre_eddy_mask = None if preproc.run_topup and dwi_pa is not None: extract_b0_pa = prepare_extract_b0_node( str(dwi_pa), str(topup_dir / "b0_PA.nii.gz"), name="extract_b0_pa" ) merge_b0 = prepare_merge_b0_node(str(topup_dir / "b0_AP_PA.nii.gz")) # Use UtilMerge (a stable BaseInterface) over a Function node so an # edit to workflow.py doesn't invalidate the TOPUP cache. pair_b0 = Node(UtilMerge(2), name="pair_b0_files") topup = prepare_topup_node(acqparams_path, str(topup_dir / "topup_results")) topup_b0 = prepare_extract_b0_node( str(topup_dir / "topup_results_corrected.nii.gz"), str(topup_dir / "topup_b0.nii.gz"), name="extract_topup_b0", ) topup_mask = _brain_extract( "topup_brain_extraction", preproc.bet.frac_dwi, topup_dir / "topup_b0_brain.nii.gz", topup_dir / "topup_b0_brain_mask.nii.gz", ) else: pre_eddy_mask = _brain_extract( "pre_eddy_brain_extraction", preproc.bet.frac_dwi, eddy_dir / "pre_eddy_b0_brain.nii.gz", eddy_dir / "pre_eddy_b0_brain_mask.nii.gz", ) # ``eddy`` is the diffusion source: every downstream consumer (and the output # contract) reads out_corrected / out_rotated_bvecs from it. When run_eddy is # off (e.g. an already-eddy-corrected clinical DWI), swap in a passthrough # Function node with the same output interface so the rest of the graph is # unchanged; it copies the input DWI to data.nii.gz and keeps the bvecs as-is. if preproc.run_eddy: eddy = prepare_eddy_node( acqparams_path, str(eddy_dir / "index.txt"), str(dirs["diffusion"] / "bvals"), str(dwi_ap_bvec), str(dirs["diffusion"] / "data"), use_cuda=bool(getattr(config.hardware, "gpu_enabled", False)), ) eddy.inputs.in_file = str(dwi_ap) else: eddy = Node( Function( input_names=["dwi", "bvecs", "out_data"], output_names=["out_corrected", "out_rotated_bvecs"], function=_passthrough_dwi, ), name="eddy_passthrough", ) eddy.inputs.dwi = str(dwi_ap) eddy.inputs.bvecs = str(dwi_ap_bvec) eddy.inputs.out_data = str(dirs["diffusion"] / "data.nii.gz") eddy_b0 = prepare_extract_b0_node( str(dirs["diffusion"] / "data.nii.gz"), str(eddy_dir / "data_b0.nii.gz"), name="extract_eddy_b0", ) eddy_brain = _brain_extract( "eddy_brain_extraction", preproc.bet.frac_dwi, dirs["diffusion"] / "data_brain.nii.gz", dirs["diffusion"] / "nodif_brain_mask.nii.gz", ) copy_bvecs = Node( Function( input_names=["source", "destination"], output_names=["destination"], function=_copy_file, ), name="copy_rotated_bvecs", ) copy_bvecs.inputs.destination = str(dirs["diffusion"] / "bvecs") convert_short = prepare_convert_to_short_node( str(dirs["diffusion"] / f"{context.patient_id}_dwi_slicer.nii.gz") ) # dtifit is optional: gated on run_dtifit. Its tensor outputs (FA/MD/…) are # not consumed downstream (tractography uses BedpostX), so when off the node # is simply omitted. Lets single-shell data that can't fit a tensor (e.g. a # b800-only clinical scan, where dtifit aborts with a singular matrix) run. dtifit = None if preproc.run_dtifit: dtifit = prepare_dtifit_node( use_wls=preproc.dtifit.use_wls, save_tensor=preproc.dtifit.save_tensor, compute_sse=True, compute_kurt=preproc.dtifit.compute_kurt, ) dtifit.inputs.base_name = str(dirs["dti_wls"] / f"{context.patient_id}_dti_wls") bedpostx = None if preproc.run_bedpostx: bedpostx = prepare_bedpostx_node( n_fibres=preproc.bedpostx.n_fibres, model=preproc.bedpostx.model, burn_in=preproc.bedpostx.burn_in, n_jumps=preproc.bedpostx.n_jumps, sample_every=preproc.bedpostx.sample_every, # Respect hardware.gpu_enabled (same source as eddy/synthstrip) and # allow preproc.bedpostx.use_gpu to explicitly opt in even when the # hardware flag is off (e.g. on a CPU-only node in a mixed cluster). use_gpu=bool(getattr(config.hardware, "gpu_enabled", False)) or preproc.bedpostx.use_gpu, ) # FSL's bedpostx appends ".bedpostX" to the subject directory it receives, # so passing dirs["diffusion"] (T1w/Diffusion) produces the correct HCP # output at T1w/Diffusion.bedpostX — not T1w/Diffusion.bedpostX.bedpostX. bedpostx.inputs.out_dir = str(dirs["diffusion"]) wf.add_nodes([modify_bval, create_index, extract_b0_ap, eddy, eddy_b0, eddy_brain]) wf.add_nodes([copy_bvecs, convert_short]) if dtifit is not None: wf.add_nodes([dtifit]) if bedpostx is not None: wf.add_nodes([bedpostx]) wf.connect(modify_bval, "output_bval", create_index, "bval_file") wf.connect(eddy, "out_corrected", eddy_b0, "in_file") wf.connect(eddy, "out_corrected", convert_short, "in_file") wf.connect(eddy_b0, "roi_file", eddy_brain, be_in_port) wf.connect(eddy, "out_rotated_bvecs", copy_bvecs, "source") if dtifit is not None: wf.connect(eddy, "out_corrected", dtifit, "dwi") wf.connect(eddy, "out_rotated_bvecs", dtifit, "bvecs") wf.connect(modify_bval, "output_bval", dtifit, "bvals") if bedpostx is not None: wf.connect(eddy, "out_corrected", bedpostx, "dwi") wf.connect(eddy, "out_rotated_bvecs", bedpostx, "bvecs") wf.connect(modify_bval, "output_bval", bedpostx, "bvals") if dtifit is not None: wf.connect(eddy_brain, be_mask_port, dtifit, "mask") if bedpostx is not None: wf.connect(eddy_brain, be_mask_port, bedpostx, "mask") # Eddy-input wiring (acqp mask / index / bval / topup field map). Skipped # entirely when run_eddy is off, since the passthrough node has no such # inputs and the DWI is already corrected. if preproc.run_eddy and ( topup is not None and merge_b0 is not None and extract_b0_pa is not None and topup_b0 is not None ): wf.add_nodes([extract_b0_pa, merge_b0, topup, topup_b0]) if pair_b0 is not None: wf.add_nodes([pair_b0]) if topup_mask is not None: wf.add_nodes([topup_mask]) if pair_b0 is not None: wf.connect(extract_b0_ap, "roi_file", pair_b0, "in1") wf.connect(extract_b0_pa, "roi_file", pair_b0, "in2") wf.connect(pair_b0, "out", merge_b0, "in_files") wf.connect(merge_b0, "merged_file", topup, "in_file") wf.connect(topup, "out_corrected", topup_b0, "in_file") if topup_mask is not None: wf.connect(topup_b0, "roi_file", topup_mask, be_in_port) wf.connect(topup_mask, be_mask_port, eddy, "in_mask") wf.connect(create_index, "output_index", eddy, "in_index") wf.connect(modify_bval, "output_bval", eddy, "in_bval") wf.connect(topup, "out_fieldcoef", eddy, "in_topup_fieldcoef") wf.connect(topup, "out_movpar", eddy, "in_topup_movpar") elif preproc.run_eddy: if pre_eddy_mask is not None: wf.add_nodes([pre_eddy_mask]) wf.connect(extract_b0_ap, "roi_file", pre_eddy_mask, be_in_port) wf.connect(pre_eddy_mask, be_mask_port, eddy, "in_mask") wf.connect(create_index, "output_index", eddy, "in_index") wf.connect(modify_bval, "output_bval", eddy, "in_bval") reg_dir = dirs["registration"] def _structural_pipeline(modality: str, source: Path | None, frac: float, final: Path | None): """Build (robustfov?) → n4 → brain_extract pipeline for T1/T2.""" n4 = prepare_n4_bias_correction_node(name=f"n4_{modality.lower()}") if final is not None: n4.inputs.output_image = str(final) if preproc.run_robustfov: robust = prepare_robust_fov_node(name=f"robustfov_{modality.lower()}") robust.inputs.in_file = str(source) robust.inputs.out_roi = str(reg_dir / f"{pid}_{modality}_robustfov.nii.gz") wf.add_nodes([robust, n4]) wf.connect(robust, "out_roi", n4, "input_image") else: n4.inputs.input_image = str(source) wf.add_nodes([n4]) brain = _brain_extract( f"{modality.lower()}_brain_extraction", frac, reg_dir / f"{pid}_{modality}_brain.nii.gz", reg_dir / f"{pid}_{modality}_brainmask.nii.gz", ) wf.connect(n4, "output_image", brain, be_in_port) return n4, brain n4_t1, t1_brain = _structural_pipeline("T1", t1, preproc.bet.frac_t1, t1_final) n4_t2 = None t2_brain = None if t2 is not None: n4_t2, t2_brain = _structural_pipeline("T2", t2, preproc.bet.frac_t2, t2_final) if preproc.run_synthseg: ss = preproc.synthseg ss_cpu = bool(ss.cpu) or not bool(getattr(config.hardware, "gpu_enabled", True)) ss_threads = int(ss.threads) ss_gpu_device = getattr(config.hardware, "gpu_device", None) ss_dir = dirs["synthseg"] def _build_synthseg(modality: str, name: str) -> Node: node = prepare_synthseg_node(parc=ss.parc, robust=ss.robust, fast=ss.fast, name=name) node.inputs.output_segmentation = str(ss_dir / f"{pid}_synthseg_{modality}.nii.gz") node.inputs.vol = str(ss_dir / f"{pid}_vol_{modality}.csv") node.inputs.qc = str(ss_dir / f"{pid}_qc_{modality}.csv") node.inputs.resample = str(ss_dir / f"{pid}_{modality}_resampled.nii.gz") if ss.crop is not None: node.inputs.crop = int(ss.crop) if ss_cpu: node.inputs.cpu = True node.inputs.threads = ss_threads else: node.inputs.use_gpu = True if ss_gpu_device is not None: node.inputs.environ = {"CUDA_VISIBLE_DEVICES": str(ss_gpu_device)} return node if ss.run_on_t1: synthseg_t1 = _build_synthseg("T1", "synthseg_t1") wf.add_nodes([synthseg_t1]) wf.connect(n4_t1, "output_image", synthseg_t1, "input_image") if ss.run_on_t2 and n4_t2 is not None: synthseg_t2 = _build_synthseg("T2", "synthseg_t2") gm_wm_csf = prepare_gm_wm_csf_converter_node() gm_wm_csf.inputs.output_image = str(ss_dir / f"{pid}_synthseg_GMWMmap.nii.gz") tract_masks = prepare_tract_masks_creator_node() tract_masks.inputs.output_prefix = str(ss_dir / f"{pid}_") wf.add_nodes([synthseg_t2, gm_wm_csf, tract_masks]) wf.connect(n4_t2, "output_image", synthseg_t2, "input_image") wf.connect(synthseg_t2, "output_segmentation", gm_wm_csf, "input_image") wf.connect(synthseg_t2, "output_segmentation", tract_masks, "input_image") if preproc.run_coregistration: threads = int(getattr(config.hardware, "threads", 1)) from thesis.workflows.preprocess.nodes.registration import prepare_ants_registration_node def _reg_node(name: str, transform_type: str, prefix: str) -> Node: node = prepare_ants_registration_node( transform_type=transform_type, num_threads=threads, name=name ) node.inputs.output_transform_prefix = str(reg_dir / f"{pid}_{prefix}_") node.inputs.output_warped_image = str(reg_dir / f"{pid}_{prefix}.nii.gz") return node dwi_to_t1 = _reg_node( "dwi_to_t1_registration", preproc.registration.dwi_to_t1.transform_type, "b0toT1" ) wf.add_nodes([dwi_to_t1]) wf.connect(eddy_brain, be_out_port, dwi_to_t1, "moving_image") wf.connect(t1_brain, be_out_port, dwi_to_t1, "fixed_image") if n4_t2 is not None and t2_brain is not None: t2_to_t1 = _reg_node( "t2_to_t1_registration", preproc.registration.t2_to_t1.transform_type, "T2toT1" ) wf.add_nodes([t2_to_t1]) wf.connect(t2_brain, be_out_port, t2_to_t1, "moving_image") wf.connect(t1_brain, be_out_port, t2_to_t1, "fixed_image") # T1->template registration: lets a patient that was NOT used to build # the cohort template still be registered to it. Built only when a # template fixed image is configured (config.registration.fixed_image). # Moving = T1 brain; fixed = the template image reused from the # registration workflow's resolver (no new top-level path field). t1_to_template = None if config.registration.fixed_image: from thesis.workflows.registration.paths import resolve_fixed_image template_image = resolve_fixed_image(config, context) t1_to_template = _reg_node( "t1_to_template_registration", preproc.registration.t1_to_template.transform_type, "T1toTemplate", ) t1_to_template.inputs.fixed_image = str(template_image) wf.add_nodes([t1_to_template]) wf.connect(t1_brain, be_out_port, t1_to_template, "moving_image") # MNI-label-to-DWI transform applies dwi_to_t1's composite transform # to each label so labels in T1-space land on the DWI grid. # Uses a MapNode so all labels share one Nipype node with per-label # iteration rather than a Python loop building N separate nodes. if preproc.label_transform.transform_mni_labels: from nipype import MapNode label_paths = list(preproc.label_transform.mni_labels_list) if label_paths: lt = preproc.label_transform output_images = [_label_output_name(p, str(output_dir), pid) for p in label_paths] warp_labels = MapNode( # Interpolation is config-driven (default NearestNeighbor for # discrete labels); MultiLabel is no longer hardcoded. make_ants_apply_transforms_interface(interpolation=lt.interpolation), iterfield=["input_image", "output_image"], name="warp_labels", ) warp_labels.inputs.input_image = label_paths warp_labels.inputs.output_image = output_images # Reference grid: when use_hcp_template is set, warp labels onto # the template (HCP MNI) grid; otherwise keep them on the DWI grid # (the b0, current default behavior). if lt.use_hcp_template: from thesis.workflows.registration.paths import resolve_fixed_image warp_labels.inputs.reference_image = str(resolve_fixed_image(config, context)) else: warp_labels.inputs.reference_image = str(eddy_dir / "data_b0.nii.gz") # use_inverse_warp -> invert the single composite transform. warp_labels.inputs.invert_transform_flags = [bool(lt.use_inverse_warp)] wf.add_nodes([warp_labels]) wf.connect(dwi_to_t1, "composite_transform", warp_labels, "transforms") # -- Output contract ---------------------------------------------------- # Stable named outputs for meta-workflow consumers (full_pipeline). The # brain-extraction port branch was resolved once near the top (be_out_port / # be_mask_port) so downstream stages read uniform field names regardless of # bet vs synthstrip. out_fields = ["t1_brain", "dwi_mask", "dwi_corrected", "rotated_bvecs", "modified_bval"] has_t1_to_template = bool(preproc.run_coregistration and config.registration.fixed_image) if preproc.run_coregistration: out_fields.append("t1_to_dwi_transform") if has_t1_to_template: out_fields += ["t1_to_template_transform", "t1_to_template_warped"] if bedpostx is not None: out_fields += ["bedpostx_thsamples", "bedpostx_phsamples", "bedpostx_fsamples"] outputnode = attach_outputnode(wf, out_fields) wf.connect( [ (t1_brain, outputnode, [(be_out_port, "t1_brain")]), (eddy_brain, outputnode, [(be_mask_port, "dwi_mask")]), ( eddy, outputnode, [ ("out_corrected", "dwi_corrected"), ("out_rotated_bvecs", "rotated_bvecs"), ], ), (modify_bval, outputnode, [("output_bval", "modified_bval")]), ] ) # Field carries dwi_to_t1_registration.composite_transform. Named for the # consumer side: hcp/mrtrix3 roi_dwi_resampler + fivett_to_dwi expose a # `t1_to_dwi_transform` input port that uses it to resample T1-space images # onto the DWI grid. The name is uniform across the contract by design. if preproc.run_coregistration: wf.connect(dwi_to_t1, "composite_transform", outputnode, "t1_to_dwi_transform") # T1->template outputs (only when a template fixed image is configured). The # composite transform places this patient's T1 into template space; the # warped image is the patient T1 resampled onto the template grid. These let # consumers register a patient that was not part of the template cohort. if has_t1_to_template and t1_to_template is not None: wf.connect( [ ( t1_to_template, outputnode, [ ("composite_transform", "t1_to_template_transform"), ("warped_image", "t1_to_template_warped"), ], ), ] ) if bedpostx is not None: wf.connect( [ ( bedpostx, outputnode, [ ("merged_thsamples", "bedpostx_thsamples"), ("merged_phsamples", "bedpostx_phsamples"), ("merged_fsamples", "bedpostx_fsamples"), ], ), ] ) return wf