Nipype Integration#

Bridge between the thesis pipeline and Nipype’s workflow engine.

thesis.core.nipype.executor#

Nipype workflow executor for thesis framework.

This module provides the execution engine for running Nipype workflows within your framework, managing execution context, caching, and result collection.

class thesis.core.nipype.executor.NipypeExecutor[source]#

Bases: object

Executes Nipype workflows within the thesis framework.

This executor bridges your context system with Nipype’s workflow engine, managing: - Workflow base directory and working directories - Plugin selection and configuration - Logging and error handling

Workflow outputs are written by individual nodes to their configured output_dir paths under workflow.base_dir; downstream code reads them back from the filesystem rather than from a returned dict.

Example

>>> from thesis.core import ConfigManager, create_context
>>> from nipype import Workflow
>>>
>>> config = ConfigManager().load_config("default", patient_id="patient_001")
>>> context = create_context("patient_001", config, Path("./data"))
>>> workflow = Workflow(name="example")
>>>
>>> executor = NipypeExecutor(context, config.nipype)
>>> executor.execute(workflow)
Parameters:
__init__(context, config=None)[source]#

Initialize the executor.

Parameters:
execute(workflow, *, event_bus=None, progress=None)[source]#

Execute a Nipype workflow.

Workflow outputs are written to disk under workflow.base_dir (and whatever output_dir paths individual nodes were configured with); downstream code reads them from the filesystem rather than from a returned dict.

Parameters:
  • workflow (Workflow) – nipype.pipeline.engine.Workflow instance

  • event_bus (Optional[EventBus]) – Optional event bus for emitting execution events.

  • progress (Optional[NodeProgressProtocol]) – Optional workflow progress tracker.

Return type:

None

get_working_directory()[source]#

Get the workflow working directory.

Return type:

Path

Returns:

Path to the workflow working directory

cleanup(remove_intermediate=None)[source]#

Clean up workflow intermediate files.

Parameters:

remove_intermediate (Optional[bool]) – Whether to remove intermediate outputs. If None, uses config setting.

Return type:

None

class thesis.core.nipype.executor.NipypeStatusCallback[source]#

Bases: object

Picklable callable for Nipype’s plugin status_callback hook.

Must be a top-level class (not a closure) so that multiprocessing can pickle anything that transitively references it when MultiProcPlugin submits work to workers. A nested-function closure here causes AttributeError: Can't get local object ... during _CallItem pickling, which wedges the scheduler.

Parameters:
__init__(progress=None, event_bus=None, patient_id='')[source]#
Parameters:
Return type:

None

property failed_nodes: set[str]#

Fullnames of nodes whose terminal status was exception/failed/crash/error.

property finished_ok_nodes: set[str]#

Fullnames of nodes whose terminal status was end/done/finish/finished/cached.

thesis.core.nipype.executor.apply_nipype_execution_config(workflow, config, crash_dir=None)[source]#

Apply NipypeConfig onto a workflow and Nipype’s global config.

Must be called on every entry path before workflow.run() — including the CLI’s batch parallel path that bypasses NipypeExecutor.

Two distinct updates happen here and BOTH are load-bearing:

  1. workflow.config['execution'] — consulted by Nipype’s scheduler (stop_on_first_crash, remove_unnecessary_outputs, keep_inputs, etc.).

  2. nipype.config global singleton — consulted by BaseTraitedSpec._get_sorteddict (interfaces/base/specs.py:307) when computing file-input hashes. Without step 2, hash_method silently falls back to Nipype’s package default (timestamp) no matter what the user configured, which is why content-mode trial YAMLs were producing timestamp hashfiles and cache-missing on every pipeline run.

Raises:

Exception – any error reading or mutating workflow.config is re-raised so the caller can log it against its own logger (executor vs CLI).

Parameters:
Return type:

None

thesis.core.nipype.executor.build_nipype_status_callback(*, progress=None, event_bus=None, patient_id='')[source]#

Create a Nipype status callback for node-level progress updates.

Parameters:
  • progress (Optional[NodeProgressProtocol]) – Optional progress bar updated on node start and finish.

  • event_bus (Optional[EventBus]) – Optional event bus for structured node lifecycle events.

  • patient_id (str) – Patient identifier attached to emitted events.

Return type:

NipypeStatusCallback

Returns:

Callback compatible with Nipype plugin status_callback hooks. Callers may also read failed_nodes / finished_ok_nodes after the workflow runs to recover per-node final status.

thesis.core.nipype.executor.count_workflow_nodes(workflow)[source]#

Count executable nodes in a workflow graph, including nested sub-workflows.

For meta-workflows (e.g. tract_synthseg) whose top-level graph contains child Workflow objects rather than Node objects, this recurses into each nested workflow so that every leaf node with an interface attribute is counted.

Parameters:

workflow (Workflow) – Nipype workflow whose executable leaf nodes should be counted.

Return type:

int

Returns:

Number of executable nodes, or 0 if graph introspection fails.

thesis.core.nipype.executor.run_workflow(workflow, context, config=None, *, event_bus=None, progress=None)[source]#

Convenience helper to execute a Nipype workflow with the thesis context.

Parameters:
Return type:

None

Interfaces#

Custom Nipype interfaces for tools not covered by built-in wrappers.

Available interface modules:

  • freesurfer: FreeSurfer tools without upstream Nipype wrappers such as SynthSeg and SynthStrip

  • fsl: patched FSL interfaces such as ProbTrackX2 and ProbTrackX2GPU

class thesis.core.nipype.interfaces.SynthSeg[source]

Bases: CommandLine

Nipype interface for FreeSurfer’s mri_synthseg segmentation tool.

SynthSeg segments brain MRI scans (and optionally performs cortical parcellation) without requiring a specific acquisition protocol or contrast. It is robust to clinical data and non-standard sequences.

Because this class subclasses CommandLine, it integrates transparently with Nipype’s workflow engine:

  • Caching: results are hashed and re-used automatically.

  • Parallelisation: place it in a MapNode or connect it to the workflow graph; the MultiProc / SGEGraph plugins handle scheduling.

  • Provenance: command line, inputs, and outputs are recorded.

Requirements:

mri_synthseg must be on PATH (provided by FreeSurfer ≥ 7.3).

Examples

Basic segmentation:

from thesis.core.nipype.interfaces.freesurfer import SynthSeg
seg = SynthSeg(
    input_image="sub-01_T1w.nii.gz",
    output_segmentation="sub-01_synthseg.nii.gz",
)
result = seg.run()

With parcellation, volumes CSV, and CPU execution:

seg = SynthSeg(
    input_image="sub-01_T1w.nii.gz",
    output_segmentation="sub-01_synthseg.nii.gz",
    parc=True,
    vol="sub-01_volumes.csv",
    cpu=True,
    threads=4,
)

Inside a Nipype workflow:

from nipype import Node, Workflow
from thesis.core.nipype.interfaces.freesurfer import SynthSeg

seg_node = Node(SynthSeg(), name="synthseg")
seg_node.inputs.input_image = "T1w.nii.gz"
seg_node.inputs.output_segmentation = "T1w_synthseg.nii.gz"
seg_node.inputs.parc = True

wf = Workflow(name="seg_workflow")
wf.add_nodes([seg_node])
wf.run()

Batch segmentation via MapNode:

from nipype import MapNode
seg_mapnode = MapNode(
    SynthSeg(parc=True, cpu=True, threads=4),
    iterfield=["input_image", "output_segmentation"],
    name="synthseg_batch",
)
seg_mapnode.inputs.input_image = ["sub-01_T1w.nii.gz", "sub-02_T1w.nii.gz"]
seg_mapnode.inputs.output_segmentation = ["sub-01_seg.nii.gz", "sub-02_seg.nii.gz"]
input_spec

alias of SynthSegInputSpec

output_spec

alias of SynthSegOutputSpec

class thesis.core.nipype.interfaces.SynthStrip[source]

Bases: CommandLine

Nipype CommandLine interface for FreeSurfer’s mri_synthstrip brain extraction tool.

mri_synthstrip performs learning-based skull-stripping without requiring a specific acquisition protocol or contrast. This wrapper drives it through a custom _run_interface that bypasses the standard Nipype cmdline() machinery so that GPU execution can be handled via the FreeSurfer Python launcher with automatic CPU fallback.

Requirements:

FREESURFER_HOME must be set in the environment. FreeSurfer ≥ 7 is required for the mri_synthstrip script to be present at $FREESURFER_HOME/python/scripts/mri_synthstrip.

GPU behaviour:

When use_gpu=True the FreeSurfer Python launcher is invoked with the -g flag and, if gpu_device is defined, CUDA_VISIBLE_DEVICES is set accordingly. If the GPU run exits non-zero the node automatically retries with the same launcher but without -g (CPU fallback). A torch diagnostic probe is logged at WARNING level before the retry to aid debugging.

Variables:
  • _cmd (str) – Fallback binary name ("mri_synthstrip"); used for the CPU non-launcher path only.

  • input_specSynthStripInputSpec

  • output_specSynthStripOutputSpec

Example

Basic skull-stripping inside a Nipype workflow:

from nipype import Node
from thesis.core.nipype.interfaces.freesurfer import SynthStrip

node = Node(SynthStrip(), name="synthstrip")
node.inputs.input_image  = "sub-01_T1w.nii.gz"
node.inputs.output_image = "sub-01_brain.nii.gz"
node.inputs.output_mask  = "sub-01_mask.nii.gz"
result = node.run()
input_spec

alias of SynthStripInputSpec

output_spec

alias of SynthStripOutputSpec

class thesis.core.nipype.interfaces.ProbTrackX2[source]

Bases: ProbTrackX2

ProbTrackX2 with a corrected _run_interface stderr check.

The upstream nipype implementation raises RuntimeError whenever the command writes anything to stderr — including harmless OS warnings:

bash: /path/to/libtinfo.so.6: no version information available

These appear on some HPC clusters due to conda / system library mismatches and are entirely benign. The upstream check therefore causes spurious workflow failures even when probtrackx2 exits with return code 0.

This subclass re-raises only for genuine command failures (non-zero return code). All other behaviour — sample symlinking, targets.txt writing, output listing — is inherited unchanged from the upstream class.

class thesis.core.nipype.interfaces.ProbTrackX2GPU[source]

Bases: ProbTrackX2

GPU-accelerated ProbTrackX2 (probtrackx2_gpu11.0 / probtrackx2_gpu).

Accepts the same inputs and produces the same outputs as the CPU ProbTrackX2. The binary is selected at module-import time by _find_gpu_binary() (presence check only).

CUDA compatibility is validated at CLI startup via thesis.core.gpu.check_gpu() — if a compatible GPU is not available, config.hardware.gpu_enabled is set to False before any workflow is built, so this class will never be instantiated on an incompatible node.

GPU serialization is handled at the scheduler level via n_gpu_procs=1 in plugin_args — no process-level lock is needed, so this class adds no _run_interface override and runs probtrackx2_gpu directly via the inherited ProbTrackX2 implementation.

Use get_probtrackx2_interface() to select between CPU and GPU transparently based on the config flag.

input_spec

alias of ProbTrackX2GPUInputSpec

thesis.core.nipype.interfaces.get_probtrackx2_interface(use_gpu=True)[source]

Return the appropriate ProbTrackX2 interface class.

When use_gpu=True and a GPU binary was found at module-import time, returns ProbTrackX2GPU. Otherwise returns CPU ProbTrackX2.

CUDA availability is not re-checked here — it was already validated (and use_gpu corrected if necessary) at CLI startup by thesis.core.gpu.check_gpu().

Parameters:

use_gpu (bool) – Use the GPU variant when True and the binary is present.

Return type:

Type[ProbTrackX2]

Returns:

Uninstantiated class; pass directly to nipype.Node.

Custom FSL Nipype interfaces.

Provides patched or extended versions of FSL Nipype interfaces to work around upstream bugs or limitations, and adds GPU-accelerated variants where FSL ships separate GPU binaries.

CUDA / GPU validation is performed once at CLI startup by thesis.core.gpu.check_gpu(), not here. By the time a workflow is built, config.hardware.gpu_enabled already reflects whether a compatible GPU setup was found. This module’s only responsibility is to map that flag to the correct binary name.

Classes:

ProbTrackX2: CPU ProbTrackX2 with corrected stderr-based error detection. ProbTrackX2GPU: GPU-accelerated ProbTrackX2.

Functions:

get_probtrackx2_interface: Return the best available ProbTrackX2 class.

class thesis.core.nipype.interfaces.fsl.ProbTrackX2[source]#

Bases: ProbTrackX2

ProbTrackX2 with a corrected _run_interface stderr check.

The upstream nipype implementation raises RuntimeError whenever the command writes anything to stderr — including harmless OS warnings:

bash: /path/to/libtinfo.so.6: no version information available

These appear on some HPC clusters due to conda / system library mismatches and are entirely benign. The upstream check therefore causes spurious workflow failures even when probtrackx2 exits with return code 0.

This subclass re-raises only for genuine command failures (non-zero return code). All other behaviour — sample symlinking, targets.txt writing, output listing — is inherited unchanged from the upstream class.

class thesis.core.nipype.interfaces.fsl.ProbTrackX2GPU[source]#

Bases: ProbTrackX2

GPU-accelerated ProbTrackX2 (probtrackx2_gpu11.0 / probtrackx2_gpu).

Accepts the same inputs and produces the same outputs as the CPU ProbTrackX2. The binary is selected at module-import time by _find_gpu_binary() (presence check only).

CUDA compatibility is validated at CLI startup via thesis.core.gpu.check_gpu() — if a compatible GPU is not available, config.hardware.gpu_enabled is set to False before any workflow is built, so this class will never be instantiated on an incompatible node.

GPU serialization is handled at the scheduler level via n_gpu_procs=1 in plugin_args — no process-level lock is needed, so this class adds no _run_interface override and runs probtrackx2_gpu directly via the inherited ProbTrackX2 implementation.

Use get_probtrackx2_interface() to select between CPU and GPU transparently based on the config flag.

input_spec#

alias of ProbTrackX2GPUInputSpec

thesis.core.nipype.interfaces.fsl.get_probtrackx2_interface(use_gpu=True)[source]#

Return the appropriate ProbTrackX2 interface class.

When use_gpu=True and a GPU binary was found at module-import time, returns ProbTrackX2GPU. Otherwise returns CPU ProbTrackX2.

CUDA availability is not re-checked here — it was already validated (and use_gpu corrected if necessary) at CLI startup by thesis.core.gpu.check_gpu().

Parameters:

use_gpu (bool) – Use the GPU variant when True and the binary is present.

Return type:

Type[ProbTrackX2]

Returns:

Uninstantiated class; pass directly to nipype.Node.

Custom FreeSurfer Nipype interfaces.

Provides CommandLine-based Nipype interfaces for FreeSurfer tools that do not have upstream wrappers in nipype.interfaces.freesurfer, enabling them to participate in Nipype workflows with full caching and parallelisation support.

Classes:

SynthSeg: Nipype interface for mri_synthseg (brain MRI segmentation). SynthStrip: Nipype interface for mri_synthstrip (brain extraction).

class thesis.core.nipype.interfaces.freesurfer.SynthSeg[source]#

Bases: CommandLine

Nipype interface for FreeSurfer’s mri_synthseg segmentation tool.

SynthSeg segments brain MRI scans (and optionally performs cortical parcellation) without requiring a specific acquisition protocol or contrast. It is robust to clinical data and non-standard sequences.

Because this class subclasses CommandLine, it integrates transparently with Nipype’s workflow engine:

  • Caching: results are hashed and re-used automatically.

  • Parallelisation: place it in a MapNode or connect it to the workflow graph; the MultiProc / SGEGraph plugins handle scheduling.

  • Provenance: command line, inputs, and outputs are recorded.

Requirements:

mri_synthseg must be on PATH (provided by FreeSurfer ≥ 7.3).

Examples

Basic segmentation:

from thesis.core.nipype.interfaces.freesurfer import SynthSeg
seg = SynthSeg(
    input_image="sub-01_T1w.nii.gz",
    output_segmentation="sub-01_synthseg.nii.gz",
)
result = seg.run()

With parcellation, volumes CSV, and CPU execution:

seg = SynthSeg(
    input_image="sub-01_T1w.nii.gz",
    output_segmentation="sub-01_synthseg.nii.gz",
    parc=True,
    vol="sub-01_volumes.csv",
    cpu=True,
    threads=4,
)

Inside a Nipype workflow:

from nipype import Node, Workflow
from thesis.core.nipype.interfaces.freesurfer import SynthSeg

seg_node = Node(SynthSeg(), name="synthseg")
seg_node.inputs.input_image = "T1w.nii.gz"
seg_node.inputs.output_segmentation = "T1w_synthseg.nii.gz"
seg_node.inputs.parc = True

wf = Workflow(name="seg_workflow")
wf.add_nodes([seg_node])
wf.run()

Batch segmentation via MapNode:

from nipype import MapNode
seg_mapnode = MapNode(
    SynthSeg(parc=True, cpu=True, threads=4),
    iterfield=["input_image", "output_segmentation"],
    name="synthseg_batch",
)
seg_mapnode.inputs.input_image = ["sub-01_T1w.nii.gz", "sub-02_T1w.nii.gz"]
seg_mapnode.inputs.output_segmentation = ["sub-01_seg.nii.gz", "sub-02_seg.nii.gz"]
input_spec#

alias of SynthSegInputSpec

output_spec#

alias of SynthSegOutputSpec

class thesis.core.nipype.interfaces.freesurfer.SynthStrip[source]#

Bases: CommandLine

Nipype CommandLine interface for FreeSurfer’s mri_synthstrip brain extraction tool.

mri_synthstrip performs learning-based skull-stripping without requiring a specific acquisition protocol or contrast. This wrapper drives it through a custom _run_interface that bypasses the standard Nipype cmdline() machinery so that GPU execution can be handled via the FreeSurfer Python launcher with automatic CPU fallback.

Requirements:

FREESURFER_HOME must be set in the environment. FreeSurfer ≥ 7 is required for the mri_synthstrip script to be present at $FREESURFER_HOME/python/scripts/mri_synthstrip.

GPU behaviour:

When use_gpu=True the FreeSurfer Python launcher is invoked with the -g flag and, if gpu_device is defined, CUDA_VISIBLE_DEVICES is set accordingly. If the GPU run exits non-zero the node automatically retries with the same launcher but without -g (CPU fallback). A torch diagnostic probe is logged at WARNING level before the retry to aid debugging.

Variables:
  • _cmd (str) – Fallback binary name ("mri_synthstrip"); used for the CPU non-launcher path only.

  • input_specSynthStripInputSpec

  • output_specSynthStripOutputSpec

Example

Basic skull-stripping inside a Nipype workflow:

from nipype import Node
from thesis.core.nipype.interfaces.freesurfer import SynthStrip

node = Node(SynthStrip(), name="synthstrip")
node.inputs.input_image  = "sub-01_T1w.nii.gz"
node.inputs.output_image = "sub-01_brain.nii.gz"
node.inputs.output_mask  = "sub-01_mask.nii.gz"
result = node.run()
input_spec#

alias of SynthStripInputSpec

output_spec#

alias of SynthStripOutputSpec