"""Workflow I/O contract helpers.
Small factory helpers that attach standardized ``inputnode`` / ``outputnode``
``IdentityInterface`` boundary nodes to a Nipype workflow. They let a
meta-workflow wire to a stable named contract instead of reaching into a
sub-workflow's internal node names.
The pattern: a sub-workflow's builder calls :func:`attach_inputnode` with the
runtime-overridable input fields (defaulted from config so standalone runs
resolve statically), then connects ``inputnode.<field>`` to each internal
consumer via :func:`fan_out`. It calls :func:`attach_outputnode` for stable
outputs and connects producers into it. A meta-workflow then connects only
``upstream.outputnode.<f> -> downstream.inputnode.<f>``.
See ``docs/superpowers/specs/2026-05-26-full-pipeline-contracts-design.md``.
"""
from __future__ import annotations
from typing import Any, Iterable, Mapping, Sequence
import nipype.pipeline.engine as pe
from nipype import Node
from nipype.interfaces.utility import IdentityInterface
__all__ = [
"ROI_OUTPUT_FIELDS",
"MRTRIX3_ROI_OUTPUT_FIELDS",
"HCP_ROI_OUTPUT_FIELDS",
"attach_inputnode",
"attach_outputnode",
"fan_out",
]
#: ROI terminus output fields published on a tractography workflow ``outputnode``.
#:
#: Both backends re-expose their final ROI source (seed / stop / avoid / target
#: masks) under these stable contract names so a meta-workflow (e.g. the
#: ``*_synthseg`` ROI validation) can wire to them directly instead of scanning
#: ``wf._graph`` for a (possibly renamed) internal node name.
ROI_OUTPUT_FIELDS: tuple[str, str, str, str] = (
"roi_seed",
"roi_stop",
"roi_avoid",
"roi_target",
)
#: Backend-specific aliases (identical fields; kept for call-site clarity).
MRTRIX3_ROI_OUTPUT_FIELDS: tuple[str, str, str, str] = ROI_OUTPUT_FIELDS
HCP_ROI_OUTPUT_FIELDS: tuple[str, str, str, str] = ROI_OUTPUT_FIELDS
[docs]
def attach_outputnode(
wf: pe.Workflow,
fields: Sequence[str],
*,
name: str = "outputnode",
) -> Node:
"""Create an ``IdentityInterface`` output contract node on *wf*.
Args:
wf: Workflow to attach the node to.
fields: Output field names exposed by the contract.
name: Node name (default ``"outputnode"``).
Returns:
The created :class:`nipype.Node`.
"""
node = Node(IdentityInterface(fields=list(fields)), name=name)
wf.add_nodes([node])
return node
[docs]
def fan_out(
wf: pe.Workflow,
inputnode: Node,
field: str,
targets: Iterable[tuple[Node, str]],
) -> None:
"""Connect ``inputnode.<field>`` to each ``(node, port)`` in *targets*.
Captures the per-hemisphere / per-source fan-out that used to live in the
meta-workflow as ``startswith()`` loops. The builder owns the target node
handles, so no introspection is needed.
Args:
wf: Workflow owning both ends.
inputnode: The input contract node.
field: Source field on *inputnode*.
targets: Iterable of ``(consumer_node, consumer_port)`` pairs.
"""
for node, port in targets:
wf.connect(inputnode, field, node, port)