Nipype Integration Guide#

Overview#

This project uses Nipype as the execution engine for workflow graphs while keeping configuration, validation, logging, and CLI orchestration inside the thesis package.

In practice, a workflow module:

  1. Loads validated settings from PipelineConfig

  2. Receives a ProcessingContext with patient-specific paths and metadata

  3. Builds and returns a nipype.Workflow (a raw graph — there is no workflow base class)

  4. Lets NipypeExecutor apply runtime settings and execute the graph

This keeps workflow code focused on graph construction while the framework handles dependency checking, output rendering, summaries, and config merging.

Note

This page focuses on how the framework dispatches and wraps Nipype and how a workflow build hooks into it. For the complete guide to building a workflow with the decorator API, see custom_workflows.md; to port a standalone script, see nipype_to_framework.md.

Minimal Pattern#

from pathlib import Path

from nipype import Workflow

from thesis.core import ConfigManager, create_context
from thesis.core.nipype import run_workflow

config = ConfigManager().load_config("default", patient_id="114823", protocol="hcp")
context = create_context("114823", config, Path("./data"))

workflow = Workflow(name="example")
# add nodes / connections here

run_workflow(workflow, context, config.nipype)

CLI equivalent:

thesis run -w hcp -p 114823 -c default

Integration Layers#

Framework Layer#

  • thesis.cli resolves the workflow entry, loads config, creates the context, and selects output/progress behaviour.

  • ConfigManager performs the five-level merge: default -> hardware -> protocol -> patient -> CLI overrides.

  • ProcessingContext carries resolved paths, patient ID, config, and results.

Workflow Layer#

  • Each workflow is a factory function decorated with @workflow(name=…) from thesis.core.decorators. The decorator synthesises a (config, context) -> Workflow adapter and registers a WorkflowEntry in WORKFLOW_REGISTRY at import time. The adapter’s __signature__ is pinned to (config, context) so the CLI can dispatch it with two positional arguments regardless of which kwargs the body actually declares.

  • @requires(...) / @produces(...) attach path declarations (see core.path_declarations); the adapter resolves each declaration against the active ProcessingContext and injects the resulting pathlib.Path (or None/list) as a keyword-only argument named after the declaration key. config/context are forwarded only when the body names them. The body returns a raw nipype.Workflow.

  • @verify(check_fn, ...) attaches optional preflight checks. The historical signature is (config, context) -> list[str]; a verifier may also opt into the resolved declared kwargs via a **kwargs catch-all or a parameter named after a declared key. Every non-optional @requires declaration also generates an implicit existence check. The composite verifier runs the implicit checks first, then the explicit @verify checks, before Nipype builds the workflow. When a workflow has no required inputs and no @verify checks, the verifier is None.

Nipype Layer#

  • NipypeExecutor sets base_dir, crash handling, plugin settings, profiler flags, and status callbacks.

  • build_nipype_status_callback() translates node start/finish/failure events into structured CLI progress updates.

  • count_workflow_nodes() supports nested workflows so progress bars remain accurate for meta-workflows such as tract_synthseg.

Current Architecture#

CLI
  -> ConfigManager.load_config()
  -> create_context()
  -> workflow entry lookup via WORKFLOW_REGISTRY
  -> entry.verifier(config, context)   # composite @requires + @verify checks
  -> build_workflow(config, context)   # the @workflow adapter
  -> NipypeExecutor / run_workflow()
  -> RunSummary / BatchSummary

The project does not generate workflows from a separate builder or factory layer. Workflow modules construct the Nipype graph directly.

Building Workflow Modules#

Factory Signature#

Every workflow exposes a factory decorated with @workflow(...). The decorator stack runs outside-in, so the inner @requires / @produces decorators attach metadata that the outer @workflow consumes when it builds the adapter.

from pathlib import Path

from nipype import Workflow

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, workflow
from thesis.core.path_declarations import OutputDir, PatientFile


@workflow(name="my_workflow", description="Example workflow.", protocol="default")
@requires(t1=PatientFile(default="T1w/T1w.nii.gz", config_paths=["hcp.t1_image"]))
@produces(out_dir=OutputDir("my_workflow"))
def build_workflow(
    *,
    t1: Path,
    out_dir: Path,
    config: PipelineConfig,
    context: ProcessingContext,
) -> Workflow:
    """Build the workflow graph for one patient."""
    return Workflow(name=f"my_workflow_{context.patient_id}")

Resolved paths are passed in as keyword-only arguments named after the @requires / @produces keys. config and context are forwarded only when the wrapped function names them; the adapter inspects the signature at decoration time.

Requirement Checks#

@requires declarations automatically generate implicit existence checks unless optional=True. For cross-field validation or other custom rules, attach @verify(check_fn):

def _check_diffusion_dir(config, context, **kwargs) -> list[str]:
    diffusion_dir: Path | None = kwargs.get("diffusion_dir")
    if diffusion_dir is None or not (diffusion_dir / "bvals").exists():
        return [f"diffusion_dir is missing the 'bvals' file: {diffusion_dir}"]
    return []


@workflow(name="my_workflow")
@requires(diffusion_dir=PatientDir(default="T1w/Diffusion"))
@verify(_check_diffusion_dir)
def build_workflow(*, diffusion_dir: Path, context: ProcessingContext) -> Workflow:
    ...

Verifier callables can opt into receiving resolved kwargs by declaring a **kwargs catch-all (as above) or by naming the kwarg directly. The historical (config, context) -> list[str] signature is still honoured.

Execution Model#

NipypeExecutor is responsible for runtime behaviour, not graph construction.

It currently handles:

  • workflow working directory resolution from config.nipype.working_dir

  • optional crash dump directory configuration

  • plugin execution via config.nipype.plugin and plugin_args

  • Windows compatibility around Nipype’s Unix-only pwd import path

  • GPU-aware scheduler arguments when hardware.gpu_enabled is enabled

  • node progress callbacks that feed the structured CLI output system

  • result collection for executed leaf nodes

Workflow I/O contracts#

Sub-workflows that are meant to be composed into a meta-pipeline expose a stable I/O contract: an inputnode and an outputnode (IdentityInterface boundary nodes) whose field names are published in thesis.core.contracts. A meta-workflow wires contract-to-contractupstream.outputnode.<field> -> downstream.inputnode.<field> — instead of reaching into a sub-workflow’s internal node names. Internal nodes can then be renamed or restructured without breaking any meta-workflow.

The sub-workflow’s builder publishes its contract with three helpers from thesis.core.contracts:

  • attach_inputnode(wf, fields, defaults=...) — create the inputnode. The defaults map lets a standalone run resolve statically; a meta-workflow edge overrides a field at run time.

  • attach_outputnode(wf, fields) — create the outputnode.

  • fan_out(wf, inputnode, field, targets) — connect one inputnode field to several internal consumer ports.

Field names live in thesis.core.contracts (e.g. ROI_OUTPUT_FIELDS = ("roi_seed", "roi_stop", "roi_avoid", "roi_target")) so both ends of an edge agree on the spelling. A sub-workflow builder typically does:

from thesis.core.contracts import attach_inputnode, attach_outputnode, fan_out

inputnode = attach_inputnode(wf, ["moving_image"], defaults={"moving_image": str(t1)})
fan_out(wf, inputnode, "moving_image", [(reg_node, "moving_image")])

outputnode = attach_outputnode(wf, ["transform", "t1_brain"])
wf.connect(reg_node, "out_transform", outputnode, "transform")

See src/thesis/workflows/registration/workflow.py and src/thesis/workflows/preprocess/workflow.py for real attach_* call sites.

Nested Workflows#

The project supports nested Nipype workflows directly, and a meta-workflow wires sub-workflows through the contract nodes described above.

The main example is tract_synthseg, which composes:

  • a SynthSeg sub-workflow that produces a segmentation from T1w input

  • an HCP tractography sub-workflow that can consume the segmentation after resampling it to the T1w grid

Connect contract-to-contract via the published outputnode / inputnode fields — never to a sub-workflow’s internal node names:

# upstream.outputnode.<field> -> downstream.inputnode.<field>
meta.connect(
    preprocess_wf,
    "outputnode.t1_brain",
    registration_wf,
    "inputnode.moving_image",
)

src/thesis/workflows/full_pipeline/_core.py is the worked reference: it wires preprocess, registration, atlas_to_patient, and tract_similarity exclusively through outputnode / inputnode edges.

Custom Interfaces and Adapters#

For tools without a suitable upstream interface, define a custom Nipype interface in src/thesis/core/nipype/interfaces/.

Current examples:

  • thesis.core.nipype.interfaces.fsl for project-specific ProbTrackX2 wrappers

  • thesis.core.nipype.interfaces.freesurfer for mri_synthseg

src/thesis/core/nipype/adapters/ is the designated location for higher-level, config-driven adapters that wrap tool families with project conventions.

Multi-Source ROI Handling in HCP#

The HCP workflow is a good example of how Nipype graph construction stays explicit.

  • Atlas ROI sources are normalized from config in workflows/hcp/common.py

  • Each atlas source may build extractor, transformer, and validator nodes

  • SynthSeg ROI extraction is optional and can be merged with atlas-derived ROIs

  • ROI bundles are reduced to one canonical endpoint before connecting to ProbTrackX2

This approach keeps the graph readable and avoids hidden builder conventions.

Configuration Notes#

Relevant runtime settings include:

nipype:
  working_dir: work/{patient_id}
  plugin: MultiProc
  plugin_args:
    n_procs: 4
    memory_gb: 8

hardware:
  gpu_enabled: false
  n_gpu_procs: 1
  n_gpus: 1

See docs/guides/configuration/ for the full config reference.

Troubleshooting#

  • Workflow fails before execution: run thesis run ... --dry-run and inspect the verifier errors first (the composite @requires existence checks plus any @verify callables, run via entry.verifier).

  • Wrong files are being resolved: check the merged config with thesis show-config default and confirm path templates after patient expansion.

  • Node progress looks incomplete: nested workflows are counted recursively, but non-leaf workflow containers are not treated as executable nodes.

  • Nipype crash files are missing: ensure nipype.crash_dir is configured or that the working directory is writable.

For broader runtime issues, see docs/guides/troubleshooting.md.