Nipype Integration#
Bridge between the thesis pipeline and Nipype’s workflow engine.
thesis.core.nipype.executor#
Nipype workflow executor for thesis framework.
This module provides the execution engine for running Nipype workflows within your framework, managing execution context, caching, and result collection.
- class thesis.core.nipype.executor.NipypeExecutor[source]#
Bases:
objectExecutes Nipype workflows within the thesis framework.
This executor bridges your context system with Nipype’s workflow engine, managing: - Workflow base directory and working directories - Plugin selection and configuration - Logging and error handling
Workflow outputs are written by individual nodes to their configured
output_dirpaths underworkflow.base_dir; downstream code reads them back from the filesystem rather than from a returned dict.Example
>>> from thesis.core import ConfigManager, create_context >>> from nipype import Workflow >>> >>> config = ConfigManager().load_config("default", patient_id="patient_001") >>> context = create_context("patient_001", config, Path("./data")) >>> workflow = Workflow(name="example") >>> >>> executor = NipypeExecutor(context, config.nipype) >>> executor.execute(workflow)
- Parameters:
context (
ProcessingContext)config (
Union[Dict[str,Any],NipypeConfig,None])
- __init__(context, config=None)[source]#
Initialize the executor.
- Parameters:
context (
ProcessingContext) – ProcessingContext from your frameworkconfig (
Union[Dict[str,Any],NipypeConfig,None]) – Nipype configuration (dict or NipypeConfig instance)
- execute(workflow, *, event_bus=None, progress=None)[source]#
Execute a Nipype workflow.
Workflow outputs are written to disk under
workflow.base_dir(and whateveroutput_dirpaths individual nodes were configured with); downstream code reads them from the filesystem rather than from a returned dict.- Parameters:
workflow (
Workflow) – nipype.pipeline.engine.Workflow instanceevent_bus (
Optional[EventBus]) – Optional event bus for emitting execution events.progress (
Optional[NodeProgressProtocol]) – Optional workflow progress tracker.
- Return type:
- class thesis.core.nipype.executor.NipypeStatusCallback[source]#
Bases:
objectPicklable callable for Nipype’s plugin
status_callbackhook.Must be a top-level class (not a closure) so that
multiprocessingcan pickle anything that transitively references it whenMultiProcPluginsubmits work to workers. A nested-function closure here causesAttributeError: Can't get local object ...during_CallItempickling, which wedges the scheduler.- Parameters:
progress (
Optional[NodeProgressProtocol])patient_id (
str)
- __init__(progress=None, event_bus=None, patient_id='')[source]#
- Parameters:
progress (
Optional[NodeProgressProtocol])patient_id (
str)
- Return type:
None
- thesis.core.nipype.executor.apply_nipype_execution_config(workflow, config, crash_dir=None)[source]#
Apply NipypeConfig onto a workflow and Nipype’s global config.
Must be called on every entry path before
workflow.run()— including the CLI’s batch parallel path that bypassesNipypeExecutor.Two distinct updates happen here and BOTH are load-bearing:
workflow.config['execution']— consulted by Nipype’s scheduler (stop_on_first_crash, remove_unnecessary_outputs, keep_inputs, etc.).nipype.configglobal singleton — consulted byBaseTraitedSpec._get_sorteddict(interfaces/base/specs.py:307) when computing file-input hashes. Without step 2,hash_methodsilently falls back to Nipype’s package default (timestamp) no matter what the user configured, which is why content-mode trial YAMLs were producing timestamp hashfiles and cache-missing on every pipeline run.
- Raises:
Exception – any error reading or mutating
workflow.configis re-raised so the caller can log it against its own logger (executor vs CLI).- Parameters:
workflow (
Workflow)config (
NipypeConfig)
- Return type:
- thesis.core.nipype.executor.build_nipype_status_callback(*, progress=None, event_bus=None, patient_id='')[source]#
Create a Nipype status callback for node-level progress updates.
- Parameters:
progress (
Optional[NodeProgressProtocol]) – Optional progress bar updated on node start and finish.event_bus (
Optional[EventBus]) – Optional event bus for structured node lifecycle events.patient_id (
str) – Patient identifier attached to emitted events.
- Return type:
- Returns:
Callback compatible with Nipype plugin
status_callbackhooks. Callers may also readfailed_nodes/finished_ok_nodesafter the workflow runs to recover per-node final status.
- thesis.core.nipype.executor.count_workflow_nodes(workflow)[source]#
Count executable nodes in a workflow graph, including nested sub-workflows.
For meta-workflows (e.g.
tract_synthseg) whose top-level graph contains childWorkflowobjects rather thanNodeobjects, this recurses into each nested workflow so that every leaf node with aninterfaceattribute is counted.- Parameters:
workflow (
Workflow) – Nipype workflow whose executable leaf nodes should be counted.- Return type:
- Returns:
Number of executable nodes, or
0if graph introspection fails.
- thesis.core.nipype.executor.run_workflow(workflow, context, config=None, *, event_bus=None, progress=None)[source]#
Convenience helper to execute a Nipype workflow with the thesis context.
- Parameters:
workflow (
Workflow) – nipype.pipeline.engine.Workflow instancecontext (
ProcessingContext) – ProcessingContextconfig (
Union[Dict[str,Any],NipypeConfig,None]) – Optional NipypeConfig or dict overridesprogress (
Optional[NodeProgressProtocol])
- Return type:
Interfaces#
Custom Nipype interfaces for tools not covered by built-in wrappers.
Available interface modules:
freesurfer: FreeSurfer tools without upstream Nipype wrappers such as SynthSeg and SynthStripfsl: patched FSL interfaces such asProbTrackX2andProbTrackX2GPU
- class thesis.core.nipype.interfaces.SynthSeg[source]
Bases:
CommandLineNipype interface for FreeSurfer’s
mri_synthsegsegmentation tool.SynthSeg segments brain MRI scans (and optionally performs cortical parcellation) without requiring a specific acquisition protocol or contrast. It is robust to clinical data and non-standard sequences.
Because this class subclasses
CommandLine, it integrates transparently with Nipype’s workflow engine:Caching: results are hashed and re-used automatically.
Parallelisation: place it in a
MapNodeor connect it to the workflow graph; the MultiProc / SGEGraph plugins handle scheduling.Provenance: command line, inputs, and outputs are recorded.
- Requirements:
mri_synthsegmust be onPATH(provided by FreeSurfer ≥ 7.3).
Examples
Basic segmentation:
from thesis.core.nipype.interfaces.freesurfer import SynthSeg seg = SynthSeg( input_image="sub-01_T1w.nii.gz", output_segmentation="sub-01_synthseg.nii.gz", ) result = seg.run()
With parcellation, volumes CSV, and CPU execution:
seg = SynthSeg( input_image="sub-01_T1w.nii.gz", output_segmentation="sub-01_synthseg.nii.gz", parc=True, vol="sub-01_volumes.csv", cpu=True, threads=4, )
Inside a Nipype workflow:
from nipype import Node, Workflow from thesis.core.nipype.interfaces.freesurfer import SynthSeg seg_node = Node(SynthSeg(), name="synthseg") seg_node.inputs.input_image = "T1w.nii.gz" seg_node.inputs.output_segmentation = "T1w_synthseg.nii.gz" seg_node.inputs.parc = True wf = Workflow(name="seg_workflow") wf.add_nodes([seg_node]) wf.run()
Batch segmentation via MapNode:
from nipype import MapNode seg_mapnode = MapNode( SynthSeg(parc=True, cpu=True, threads=4), iterfield=["input_image", "output_segmentation"], name="synthseg_batch", ) seg_mapnode.inputs.input_image = ["sub-01_T1w.nii.gz", "sub-02_T1w.nii.gz"] seg_mapnode.inputs.output_segmentation = ["sub-01_seg.nii.gz", "sub-02_seg.nii.gz"]
- input_spec
alias of
SynthSegInputSpec
- output_spec
alias of
SynthSegOutputSpec
- class thesis.core.nipype.interfaces.SynthStrip[source]
Bases:
CommandLineNipype CommandLine interface for FreeSurfer’s
mri_synthstripbrain extraction tool.mri_synthstripperforms learning-based skull-stripping without requiring a specific acquisition protocol or contrast. This wrapper drives it through a custom_run_interfacethat bypasses the standard Nipypecmdline()machinery so that GPU execution can be handled via the FreeSurfer Python launcher with automatic CPU fallback.- Requirements:
FREESURFER_HOMEmust be set in the environment. FreeSurfer ≥ 7 is required for themri_synthstripscript to be present at$FREESURFER_HOME/python/scripts/mri_synthstrip.- GPU behaviour:
When
use_gpu=Truethe FreeSurfer Python launcher is invoked with the-gflag and, ifgpu_deviceis defined,CUDA_VISIBLE_DEVICESis set accordingly. If the GPU run exits non-zero the node automatically retries with the same launcher but without-g(CPU fallback). A torch diagnostic probe is logged at WARNING level before the retry to aid debugging.
- Variables:
_cmd (
str) – Fallback binary name ("mri_synthstrip"); used for the CPU non-launcher path only.input_spec –
SynthStripInputSpecoutput_spec –
SynthStripOutputSpec
Example
Basic skull-stripping inside a Nipype workflow:
from nipype import Node from thesis.core.nipype.interfaces.freesurfer import SynthStrip node = Node(SynthStrip(), name="synthstrip") node.inputs.input_image = "sub-01_T1w.nii.gz" node.inputs.output_image = "sub-01_brain.nii.gz" node.inputs.output_mask = "sub-01_mask.nii.gz" result = node.run()
- input_spec
alias of
SynthStripInputSpec
- output_spec
alias of
SynthStripOutputSpec
- class thesis.core.nipype.interfaces.ProbTrackX2[source]
Bases:
ProbTrackX2ProbTrackX2 with a corrected
_run_interfacestderr check.The upstream nipype implementation raises
RuntimeErrorwhenever the command writes anything to stderr — including harmless OS warnings:bash: /path/to/libtinfo.so.6: no version information available
These appear on some HPC clusters due to conda / system library mismatches and are entirely benign. The upstream check therefore causes spurious workflow failures even when
probtrackx2exits with return code 0.This subclass re-raises only for genuine command failures (non-zero return code). All other behaviour — sample symlinking,
targets.txtwriting, output listing — is inherited unchanged from the upstream class.
- class thesis.core.nipype.interfaces.ProbTrackX2GPU[source]
Bases:
ProbTrackX2GPU-accelerated ProbTrackX2 (
probtrackx2_gpu11.0/probtrackx2_gpu).Accepts the same inputs and produces the same outputs as the CPU
ProbTrackX2. The binary is selected at module-import time by_find_gpu_binary()(presence check only).CUDA compatibility is validated at CLI startup via
thesis.core.gpu.check_gpu()— if a compatible GPU is not available,config.hardware.gpu_enabledis set toFalsebefore any workflow is built, so this class will never be instantiated on an incompatible node.GPU serialization is handled at the scheduler level via
n_gpu_procs=1inplugin_args— no process-level lock is needed, so this class adds no_run_interfaceoverride and runsprobtrackx2_gpudirectly via the inheritedProbTrackX2implementation.Use
get_probtrackx2_interface()to select between CPU and GPU transparently based on the config flag.- input_spec
alias of
ProbTrackX2GPUInputSpec
- thesis.core.nipype.interfaces.get_probtrackx2_interface(use_gpu=True)[source]
Return the appropriate
ProbTrackX2interface class.When
use_gpu=Trueand a GPU binary was found at module-import time, returnsProbTrackX2GPU. Otherwise returns CPUProbTrackX2.CUDA availability is not re-checked here — it was already validated (and
use_gpucorrected if necessary) at CLI startup bythesis.core.gpu.check_gpu().- Parameters:
use_gpu (
bool) – Use the GPU variant whenTrueand the binary is present.- Return type:
- Returns:
Uninstantiated class; pass directly to
nipype.Node.
Custom FSL Nipype interfaces.
Provides patched or extended versions of FSL Nipype interfaces to work around upstream bugs or limitations, and adds GPU-accelerated variants where FSL ships separate GPU binaries.
CUDA / GPU validation is performed once at CLI startup by
thesis.core.gpu.check_gpu(), not here. By the time a workflow is
built, config.hardware.gpu_enabled already reflects whether a compatible
GPU setup was found. This module’s only responsibility is to map that flag
to the correct binary name.
- Classes:
ProbTrackX2: CPU ProbTrackX2 with corrected stderr-based error detection. ProbTrackX2GPU: GPU-accelerated ProbTrackX2.
- Functions:
get_probtrackx2_interface: Return the best available ProbTrackX2 class.
- class thesis.core.nipype.interfaces.fsl.ProbTrackX2[source]#
Bases:
ProbTrackX2ProbTrackX2 with a corrected
_run_interfacestderr check.The upstream nipype implementation raises
RuntimeErrorwhenever the command writes anything to stderr — including harmless OS warnings:bash: /path/to/libtinfo.so.6: no version information available
These appear on some HPC clusters due to conda / system library mismatches and are entirely benign. The upstream check therefore causes spurious workflow failures even when
probtrackx2exits with return code 0.This subclass re-raises only for genuine command failures (non-zero return code). All other behaviour — sample symlinking,
targets.txtwriting, output listing — is inherited unchanged from the upstream class.
- class thesis.core.nipype.interfaces.fsl.ProbTrackX2GPU[source]#
Bases:
ProbTrackX2GPU-accelerated ProbTrackX2 (
probtrackx2_gpu11.0/probtrackx2_gpu).Accepts the same inputs and produces the same outputs as the CPU
ProbTrackX2. The binary is selected at module-import time by_find_gpu_binary()(presence check only).CUDA compatibility is validated at CLI startup via
thesis.core.gpu.check_gpu()— if a compatible GPU is not available,config.hardware.gpu_enabledis set toFalsebefore any workflow is built, so this class will never be instantiated on an incompatible node.GPU serialization is handled at the scheduler level via
n_gpu_procs=1inplugin_args— no process-level lock is needed, so this class adds no_run_interfaceoverride and runsprobtrackx2_gpudirectly via the inheritedProbTrackX2implementation.Use
get_probtrackx2_interface()to select between CPU and GPU transparently based on the config flag.- input_spec#
alias of
ProbTrackX2GPUInputSpec
- thesis.core.nipype.interfaces.fsl.get_probtrackx2_interface(use_gpu=True)[source]#
Return the appropriate
ProbTrackX2interface class.When
use_gpu=Trueand a GPU binary was found at module-import time, returnsProbTrackX2GPU. Otherwise returns CPUProbTrackX2.CUDA availability is not re-checked here — it was already validated (and
use_gpucorrected if necessary) at CLI startup bythesis.core.gpu.check_gpu().- Parameters:
use_gpu (
bool) – Use the GPU variant whenTrueand the binary is present.- Return type:
- Returns:
Uninstantiated class; pass directly to
nipype.Node.
Custom FreeSurfer Nipype interfaces.
Provides CommandLine-based Nipype interfaces for FreeSurfer tools that do not have upstream wrappers in nipype.interfaces.freesurfer, enabling them to participate in Nipype workflows with full caching and parallelisation support.
- Classes:
SynthSeg: Nipype interface for mri_synthseg (brain MRI segmentation). SynthStrip: Nipype interface for mri_synthstrip (brain extraction).
- class thesis.core.nipype.interfaces.freesurfer.SynthSeg[source]#
Bases:
CommandLineNipype interface for FreeSurfer’s
mri_synthsegsegmentation tool.SynthSeg segments brain MRI scans (and optionally performs cortical parcellation) without requiring a specific acquisition protocol or contrast. It is robust to clinical data and non-standard sequences.
Because this class subclasses
CommandLine, it integrates transparently with Nipype’s workflow engine:Caching: results are hashed and re-used automatically.
Parallelisation: place it in a
MapNodeor connect it to the workflow graph; the MultiProc / SGEGraph plugins handle scheduling.Provenance: command line, inputs, and outputs are recorded.
- Requirements:
mri_synthsegmust be onPATH(provided by FreeSurfer ≥ 7.3).
Examples
Basic segmentation:
from thesis.core.nipype.interfaces.freesurfer import SynthSeg seg = SynthSeg( input_image="sub-01_T1w.nii.gz", output_segmentation="sub-01_synthseg.nii.gz", ) result = seg.run()
With parcellation, volumes CSV, and CPU execution:
seg = SynthSeg( input_image="sub-01_T1w.nii.gz", output_segmentation="sub-01_synthseg.nii.gz", parc=True, vol="sub-01_volumes.csv", cpu=True, threads=4, )
Inside a Nipype workflow:
from nipype import Node, Workflow from thesis.core.nipype.interfaces.freesurfer import SynthSeg seg_node = Node(SynthSeg(), name="synthseg") seg_node.inputs.input_image = "T1w.nii.gz" seg_node.inputs.output_segmentation = "T1w_synthseg.nii.gz" seg_node.inputs.parc = True wf = Workflow(name="seg_workflow") wf.add_nodes([seg_node]) wf.run()
Batch segmentation via MapNode:
from nipype import MapNode seg_mapnode = MapNode( SynthSeg(parc=True, cpu=True, threads=4), iterfield=["input_image", "output_segmentation"], name="synthseg_batch", ) seg_mapnode.inputs.input_image = ["sub-01_T1w.nii.gz", "sub-02_T1w.nii.gz"] seg_mapnode.inputs.output_segmentation = ["sub-01_seg.nii.gz", "sub-02_seg.nii.gz"]
- input_spec#
alias of
SynthSegInputSpec
- output_spec#
alias of
SynthSegOutputSpec
- class thesis.core.nipype.interfaces.freesurfer.SynthStrip[source]#
Bases:
CommandLineNipype CommandLine interface for FreeSurfer’s
mri_synthstripbrain extraction tool.mri_synthstripperforms learning-based skull-stripping without requiring a specific acquisition protocol or contrast. This wrapper drives it through a custom_run_interfacethat bypasses the standard Nipypecmdline()machinery so that GPU execution can be handled via the FreeSurfer Python launcher with automatic CPU fallback.- Requirements:
FREESURFER_HOMEmust be set in the environment. FreeSurfer ≥ 7 is required for themri_synthstripscript to be present at$FREESURFER_HOME/python/scripts/mri_synthstrip.- GPU behaviour:
When
use_gpu=Truethe FreeSurfer Python launcher is invoked with the-gflag and, ifgpu_deviceis defined,CUDA_VISIBLE_DEVICESis set accordingly. If the GPU run exits non-zero the node automatically retries with the same launcher but without-g(CPU fallback). A torch diagnostic probe is logged at WARNING level before the retry to aid debugging.
- Variables:
_cmd (
str) – Fallback binary name ("mri_synthstrip"); used for the CPU non-launcher path only.input_spec –
SynthStripInputSpecoutput_spec –
SynthStripOutputSpec
Example
Basic skull-stripping inside a Nipype workflow:
from nipype import Node from thesis.core.nipype.interfaces.freesurfer import SynthStrip node = Node(SynthStrip(), name="synthstrip") node.inputs.input_image = "sub-01_T1w.nii.gz" node.inputs.output_image = "sub-01_brain.nii.gz" node.inputs.output_mask = "sub-01_mask.nii.gz" result = node.run()
- input_spec#
alias of
SynthStripInputSpec
- output_spec#
alias of
SynthStripOutputSpec