Source code for thesis.core.contracts

"""Workflow I/O contract helpers.

Small factory helpers that attach standardized ``inputnode`` / ``outputnode``
``IdentityInterface`` boundary nodes to a Nipype workflow. They let a
meta-workflow wire to a stable named contract instead of reaching into a
sub-workflow's internal node names.

The pattern: a sub-workflow's builder calls :func:`attach_inputnode` with the
runtime-overridable input fields (defaulted from config so standalone runs
resolve statically), then connects ``inputnode.<field>`` to each internal
consumer via :func:`fan_out`. It calls :func:`attach_outputnode` for stable
outputs and connects producers into it. A meta-workflow then connects only
``upstream.outputnode.<f> -> downstream.inputnode.<f>``.

See ``docs/superpowers/specs/2026-05-26-full-pipeline-contracts-design.md``.
"""

from __future__ import annotations

from typing import Any, Iterable, Mapping, Sequence

import nipype.pipeline.engine as pe
from nipype import Node
from nipype.interfaces.utility import IdentityInterface

__all__ = [
    "ROI_OUTPUT_FIELDS",
    "MRTRIX3_ROI_OUTPUT_FIELDS",
    "HCP_ROI_OUTPUT_FIELDS",
    "attach_inputnode",
    "attach_outputnode",
    "fan_out",
]

#: ROI terminus output fields published on a tractography workflow ``outputnode``.
#:
#: Both backends re-expose their final ROI source (seed / stop / avoid / target
#: masks) under these stable contract names so a meta-workflow (e.g. the
#: ``*_synthseg`` ROI validation) can wire to them directly instead of scanning
#: ``wf._graph`` for a (possibly renamed) internal node name.
ROI_OUTPUT_FIELDS: tuple[str, str, str, str] = (
    "roi_seed",
    "roi_stop",
    "roi_avoid",
    "roi_target",
)

#: Backend-specific aliases (identical fields; kept for call-site clarity).
MRTRIX3_ROI_OUTPUT_FIELDS: tuple[str, str, str, str] = ROI_OUTPUT_FIELDS
HCP_ROI_OUTPUT_FIELDS: tuple[str, str, str, str] = ROI_OUTPUT_FIELDS


[docs] def attach_inputnode( wf: pe.Workflow, fields: Sequence[str], defaults: Mapping[str, Any] | None = None, *, name: str = "inputnode", ) -> Node: """Create an ``IdentityInterface`` input contract node on *wf*. Args: wf: Workflow to attach the node to. fields: Input field names exposed by the contract. defaults: Optional ``field -> value`` map applied as build-time ``node.inputs.<field>``. ``None`` values are skipped so the trait stays ``Undefined`` (a meta-workflow edge supplies it at run time). Keys not declared in *fields* raise :exc:`ValueError`. name: Node name (default ``"inputnode"``). Returns: The created :class:`nipype.Node`. """ node = Node(IdentityInterface(fields=list(fields)), name=name) if defaults is not None: unknown = set(defaults) - set(fields) if unknown: raise ValueError( f"attach_inputnode: defaults keys {sorted(unknown)} are not " f"declared in fields {list(fields)}" ) for key, value in defaults.items(): if value is not None: setattr(node.inputs, key, value) wf.add_nodes([node]) return node
[docs] def attach_outputnode( wf: pe.Workflow, fields: Sequence[str], *, name: str = "outputnode", ) -> Node: """Create an ``IdentityInterface`` output contract node on *wf*. Args: wf: Workflow to attach the node to. fields: Output field names exposed by the contract. name: Node name (default ``"outputnode"``). Returns: The created :class:`nipype.Node`. """ node = Node(IdentityInterface(fields=list(fields)), name=name) wf.add_nodes([node]) return node
[docs] def fan_out( wf: pe.Workflow, inputnode: Node, field: str, targets: Iterable[tuple[Node, str]], ) -> None: """Connect ``inputnode.<field>`` to each ``(node, port)`` in *targets*. Captures the per-hemisphere / per-source fan-out that used to live in the meta-workflow as ``startswith()`` loops. The builder owns the target node handles, so no introspection is needed. Args: wf: Workflow owning both ends. inputnode: The input contract node. field: Source field on *inputnode*. targets: Iterable of ``(consumer_node, consumer_port)`` pairs. """ for node, port in targets: wf.connect(inputnode, field, node, port)