Adapting an arbitrary Nipype script to the framework#

If you already have a working — or half-working — standalone Nipype script, you do not need to rewrite it from scratch to run it inside the framework. This tutorial walks through converting a real, messy research script into a single @workflow-decorated function that the CLI can discover, configure, validate, and execute.

The “before” example is a genuine lab script, scripts/nipype_workflow_okt2025.py (the dbsdti pipeline: TOPUP → Eddy → DTIFit → BedpostX, plus N4/BET/ANTs registration of T1/T2/CT/MNI). It carries every anti-pattern you are likely to meet in the wild:

  • hardcoded absolute paths (/local/data1/terhe81/DTI_LDF/, /home/terhe81/SynthSeg-master/...);

  • a patient_no = 'DTI_LDF002' baked into the source;

  • output directories built by hand and a DataSink;

  • a sys.modules / importlib.util hack to load SynthSeg;

  • a sibling helper module (from alt_bX import ...) that only exists in the author’s directory;

  • run settings as module globals (bw = 1685, pe = 96, n_fib = 3, mode = 'reg');

  • eager .run() calls mixed with a declarative Workflow;

  • a trailing for-loop of ApplyTransforms referencing undefined names.

It even has a syntax error at lines 351–352 (a missing comma after fixed_image=... in the regMNI node), so the script as shipped does not parse. That is a useful teaching point: the framework’s import-time registration and @verify preflight catch problems before a multi-hour run, whereas a standalone script fails late — or never imports at all.

The idiomatic targets to compare against are src/thesis/workflows/minimal.py (the smallest decorated workflow) and src/thesis/workflows/preprocess/workflow.py (the framework’s own TOPUP/Eddy/BedpostX/DTIFit workflow — the direct counterpart to this script). See also Custom workflows for the out-of-tree script mechanics.

Tip

For a compact, runnable companion to this tutorial, compare examples/plain_nipype_workflow.py (an ordinary, un-annotated Nipype script) with examples/annotated_nipype_workflow.py (the same pipeline after applying the framework decorators). Run the plain script with python examples/plain_nipype_workflow.py and dry-run the annotated one with thesis run --script examples/annotated_nipype_workflow.py -p P001 -c default --dry-run to see the before→after diff this guide walks through, end to end.

The single biggest shift#

In the standalone script, importing the file runs the pipeline: the module body builds nodes, calls .run() on some of them, wires the rest into a Workflow, and finishes with dbsdti.run() (line 486).

In the framework, all work happens inside one factory function decorated with @workflow. The factory only constructs and returns a nipype.Workflow; it never calls .run(). Importing the file only registers the workflow — the framework’s executor runs it later.

Everything below is one application of that principle.


Step 1 — Wrap the script body in a single @workflow function#

Before — work at module top level, executed on import:

patient_no = 'DTI_LDF002'
home_dir = '/local/data1/terhe81/DTI_LDF/'
save_dir = home_dir + run_name
Path(save_dir).mkdir(parents=True, exist_ok=True)

dbsdti = Workflow(name='dbsdti')
dbsdti.connect([...])
dbsdti.base_dir = save_dir
dbsdti.run()                          # runs at import time

After — one factory that returns the graph:

from nipype import Workflow

from thesis.core.context import ProcessingContext
from thesis.core.decorators import workflow


@workflow(
    name="dbsdti",
    description="TOPUP/Eddy/DTIFit/BedpostX diffusion preprocessing.",
    protocol="dbsdti",
    default_config="dbsdti",
)
def build_workflow(*, context: ProcessingContext) -> Workflow:
    wf = Workflow(name=f"dbsdti_{context.patient_id}")
    # build nodes, then:
    #   wf.connect([...])
    return wf                         # NEVER wf.run()

Notes that carry through every later step:

  • @workflow must be the outermost decorator. The inner decorators you add next (@requires, @produces, @verify) are pure metadata-attachers and may appear in any order.

  • The function body is keyword-only (*,). It declares only the kwargs it actually uses. config and context are injected only if the signature names them.

  • Drop dbsdti.write_graph(...), dbsdti.base_dir = ..., and dbsdti.run(). Graph rendering is thesis run ... --graph; the working directory is context.working_dir; execution is the framework executor. The factory only returns wf.


Step 2 — Replace hardcoded input paths with @requires / PatientFile#

Before — patient inputs assembled from absolute globals:

data_dir = home_dir + 'Patients/' + patient_no + '/'
image_X  = data_dir + patient_no + '_dmri_AP.nii.gz'
bval_X   = data_dir + patient_no + '_dmri_AP.bval'
bvec_X   = data_dir + patient_no + '_dmri_AP.bvec'
T1_img   = data_dir + patient_no + '_T1.nii.gz'
T2_img   = data_dir + patient_no + '_T2.nii.gz'

After — declare inputs with @requires; each becomes a pathlib.Path keyword argument resolved against context.input_dir:

from pathlib import Path

from thesis.core.decorators import requires, workflow
from thesis.core.path_declarations import ExternalFile, PatientFile


@workflow(name="dbsdti", description="...", protocol="dbsdti", default_config="dbsdti")
@requires(
    dwi_ap=PatientFile(
        default="{patient_id}_dmri_AP.nii.gz",
        config_path="dbsdti.dwi_ap",
    ),
    dwi_ap_bval=PatientFile(
        default="{patient_id}_dmri_AP.bval",
        config_path="dbsdti.dwi_ap_bval",
    ),
    dwi_ap_bvec=PatientFile(
        default="{patient_id}_dmri_AP.bvec",
        config_path="dbsdti.dwi_ap_bvec",
    ),
    t1=PatientFile(default="{patient_id}_T1.nii.gz", config_path="dbsdti.t1_image", optional=True),
    t2=PatientFile(default="{patient_id}_T2.nii.gz", config_path="dbsdti.t2_image", optional=True),
    # Cohort-shared assets (the MNI template) are NOT patient inputs:
    mni_template=ExternalFile(config_path="dbsdti.mni_template", optional=True),
)
def build_workflow(
    *,
    dwi_ap: Path,
    dwi_ap_bval: Path,
    dwi_ap_bvec: Path,
    t1: Path | None,
    t2: Path | None,
    mni_template: Path | None,
    context: ProcessingContext,
) -> Workflow:
    ...

What changed and why:

  • The {patient_id} placeholder replaces the manual patient_no + '_...' string-building; the resolver substitutes the CLI -p value.

  • config_path="dbsdti.dwi_ap" lets a YAML file override the default filename without touching the code. config_paths=[...] (plural) tries several keys in order; the first non-None value wins, else the default applies.

  • A non-optional PatientFile generates an implicit existence check: the framework verifies the file exists before running. Mark genuinely optional inputs with optional=True — they resolve to None.

  • The MNI template is a cohort-shared asset, not a per-patient input, so it is an ExternalFile (resolved from a config value, supporting absolute paths, ~, $ENV, and base_dirs). A file that lives under your data_dir could instead use DataFile("MNI/...").

Inside the body, pass these Path objects to nodes as strings exactly where the script used the globals:

extb0 = Node(ExtractROI(t_min=0, t_size=1, output_type="NIFTI"), name="extb0")
extb0.inputs.in_file = str(dwi_ap)

Step 3 — Replace DataSink / manual output dirs with @produces / OutputDir#

Before — output directory built by hand, plus a DataSink:

save_dir = home_dir + run_name
Path(save_dir).mkdir(parents=True, exist_ok=True)

dsDTIscalars = Node(DataSink(), name='dsDTIscalars')
dsDTIscalars.inputs.base_directory = os.path.join(save_dir, 'DTIscalars')
dbsdti.connect([(dti, dsDTIscalars, [('FA', 'FA')]), ...])

After — declare the output with @produces; OutputDir resolves to context.output_dir / <subdir> and creates it for you (mkdir(parents=True, exist_ok=True)):

from thesis.core.decorators import produces
from thesis.core.path_declarations import OutputDir


@produces(out_dir=OutputDir("dbsdti"))
def build_workflow(*, out_dir: Path, context: ProcessingContext) -> Workflow:
    ...
    dti = Node(DTIFit(save_tensor=True, sse=True, args="--wls"), name="dti")
    dti.inputs.base_name = str(out_dir / context.patient_id)
    ...

What changed and why:

  • Drop the DataSink entirely. Either point each node’s out_file / base_name directly under out_dir, or expose results on an outputnode IdentityInterface so a meta-workflow can consume them contract-to-contract (see attach_outputnode in thesis.core.contracts).

  • @produces declarations generate no existence check — the directory is created when the path is resolved, then injected as a Path.

  • OutputDir("") (empty subdir) resolves to context.output_dir itself, as in minimal.py.


Step 4 — Thread subject identity through ProcessingContext, not globals#

Before — the subject id is a hardcoded module global, reused for paths, node names, and log directories:

patient_no = 'DTI_LDF002'
run_name   = dateNr + '_' + patient_no + '_diffpy/'
T1_img     = data_dir + patient_no + '_T1.nii.gz'

After — the subject comes from the CLI -p flag as context.patient_id; never hardcode it:

def build_workflow(*, context: ProcessingContext, ...) -> Workflow:
    wf = Workflow(name=f"dbsdti_{context.patient_id}")
    ...
    dti.inputs.base_name = str(out_dir / context.patient_id)

ProcessingContext carries patient_id, the merged config, and the resolved data_dir / input_dir / output_dir / working_dir. You read those fields instead of re-deriving directories from a home dir. The path declarations in Steps 2–3 already use the context’s directories under the hood, so most save_dir + ... concatenations simply disappear.


Step 5 — Remove sys.path / importlib hacks and sibling-module imports#

Before — the two most framework-hostile patterns in the script:

# 1) Inject a foreign SynthSeg module into sys.modules
module_path = "/home/terhe81/SynthSeg-master/scripts/commands/SynthSeg_predict.py"
spec = importlib.util.spec_from_file_location("SynthSeg_predict", module_path)
SynthSeg_predict = importlib.util.module_from_spec(spec)
sys.modules["SynthSeg_predict"] = SynthSeg_predict

# 2) Import a local helper that only exists in the author's directory
from alt_bX import concat_files, extbX, rmbX, modbX

After — delete both; use clean imports:

  • SynthSeg: the framework already wraps it. Use the standalone synthseg workflow (mri_synthseg) or the preprocess SynthSeg node. No sys.path / sys.modules mutation is permitted in a framework workflow.

  • alt_bX helpers: the framework’s preprocess package ships vendored node-builders for the same operations (the b-val modification, index-file creation, b0 extraction, and merge that modbX / extbX / rmbX did). Reuse those, or — if you truly need a missing helper — vendor it into your own workflow package (e.g. workflows/dbsdti/operations/) and import it by a normal absolute path. Never rely on a sibling-directory import.

The net effect: the top of your file becomes ordinary imports only.

from pathlib import Path

from nipype import Node, Workflow
from nipype.interfaces.fsl import BET, DTIFit, Eddy, ExtractROI, Merge, TOPUP

from thesis.core.config import PipelineConfig
from thesis.core.config.validators import BaseConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import ExternalFile, OutputDir, PatientFile

While you are here, also remove the eager .run() calls. The script runs extb0AP.run() and merge_bX.run() inline, then chains their .outputs into later nodes. Replace those with graph edges so the engine resolves the DAG:

# Before:
b0AP   = extb0AP.run()
bX_img = merge_bX.run()

# After:
wf.connect([(extb0, merge, [("roi_file", "in1")]),
            (merge, topup, [("merged_file", "in_file")])])

Step 6 — Declare a Pydantic schema for the workflow’s YAML block#

Before — run settings live as module globals, baked into each node:

dataset = 'AP'
mode    = 'reg'      # 'reg' (run registration) or 'avoid'
fs      = 'avoid'
n_fib   = 3
bw      = 1685       # acquisition bandwidth
pe      = 96         # phase-encoding directions
frac    = 0.3        # used in BET(frac=0.3)

After — a BaseConfig subclass that owns a top-level YAML key. The string mode flags (mode == 'reg', fs == 'run') become typed booleans:

from pydantic import Field

from thesis.core.config.validators import BaseConfig


class DbsdtiConfig(BaseConfig):
    """Schema for the `dbsdti:` block in YAML (extra keys are rejected)."""

    bandwidth: float = Field(default=1685.0, gt=0.0)
    phase_encoding_dirs: int = Field(default=96, gt=1)
    n_fibres: int = Field(default=3, gt=0)
    bet_frac: float = Field(default=0.3, ge=0.0, le=1.0)
    run_registration: bool = True
    topup_config: str = "b02b0_1.cnf"

The numeric bounds (gt=0.0, gt=1, ge=0.0/le=1.0) are an added validation improvement, not a transcription of the original script — the script kept these as plain module globals (bw = 1685, pe = 96). They reject nonsensical YAML overrides at load time; tighten or drop them to suit your data.

Wire it into the decorator with config_namespace + config_schema:

@workflow(
    name="dbsdti",
    description="TOPUP/Eddy/DTIFit/BedpostX diffusion preprocessing.",
    protocol="dbsdti",
    default_config="dbsdti",
    config_namespace="dbsdti",
    config_schema=DbsdtiConfig,
)

Then read it inside the body as a typed object:

def build_workflow(*, config: PipelineConfig, ...) -> Workflow:
    cfg = config.dbsdti                    # a validated DbsdtiConfig instance
    bet = Node(BET(mask=True, frac=cfg.bet_frac, output_type="NIFTI"), name="bet")
    ...
    if cfg.run_registration and t2 is not None:
        ...                                # the N4/BET/ANTs branch, gated by config

Why this matters:

  • BaseConfig sets extra="forbid", so a typo in the dbsdti: block is rejected at load time instead of being silently ignored.

  • Registering the namespace lets your workflow add its own config section without editing core/config/validators.py. An unknown top-level key that is not a registered namespace is still rejected.

  • config_schema requires config_namespace; setting the schema without the namespace raises TypeError. If you omit config_schema, a schema is auto-derived from your @requires config_paths (each becomes an optional str field) — see the “Auto-derived schema” section of Custom workflows.

  • Thread/worker and GPU settings come from config.hardware (e.g. getattr(config.hardware, "gpu_enabled", False)), not from os.sched_getaffinity(0).

You can also add an explicit preflight check with @verify. A verifier returns [] on success or a list of human-readable error strings; opt into receiving resolved paths by adding **kwargs:

def _check_inputs(config, context, **kwargs) -> list[str]:
    errors: list[str] = []
    if kwargs.get("dwi_ap") is None:
        errors.append("dbsdti requires a DWI AP volume (set dbsdti.dwi_ap).")
    return errors


@verify(_check_inputs)
def build_workflow(...):
    ...

Explicit checks run after the implicit existence checks generated from @requires.


Step 7 — Fix logging (print inside Function nodes)#

Before — module-level logging reconfiguration and print for debugging:

config.update_config({'logging': {'log_directory': save_dir, 'log_to_file': True}})
logging.update_logging(config)
print(image_noX)

After — use the framework logger at module scope; never the stdlib logging module, and never call logging.update_logging. The framework owns log routing:

from thesis.core.logging import get_logger

logger = get_logger(__name__)

Critical exception: inside a nipype Function node body, use print() — the loguru logger cannot be pickled into the worker process. All imports inside a Function node must also be local to the function:

def _write_acqparams(out_dir: str, bandwidth: float, pe_dirs: int) -> str:
    from pathlib import Path                       # local import: separate process

    value = (pe_dirs - 1) / bandwidth
    path = Path(out_dir) / "acqparams.txt"
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(f"0 1 0 {value:.4f}\n0 -1 0 {value:.4f}\n", encoding="utf-8")
    print(f"[dbsdti] wrote {path}")                # print, NOT logger.*
    return str(path)

This is exactly the pattern in minimal.py’s _write_marker: the manual acqparams.txt write the script did at module level (lines 108–113) becomes a Function node that runs in the engine, deriving its value from cfg.bandwidth / cfg.phase_encoding_dirs.


Putting it together — the assembled file#

Each step above showed a fragment with only the kwargs that step introduced. The finished dbsdti_workflow.py reconciles them into one decorator stack and one build_workflow signature that lists every injected kwarg. This is the single runnable file you point --script at:

from pathlib import Path

from nipype import Node, Workflow
from nipype.interfaces.fsl import BET, DTIFit, Eddy, ExtractROI, Merge, TOPUP
from nipype.interfaces.utility import Function
from pydantic import Field

from thesis.core.config import PipelineConfig
from thesis.core.config.validators import BaseConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import ExternalFile, OutputDir, PatientFile

logger = get_logger(__name__)


class DbsdtiConfig(BaseConfig):
    """Schema for the `dbsdti:` block in YAML (extra keys are rejected)."""

    # Numeric bounds are an added validation improvement, not in the original script.
    bandwidth: float = Field(default=1685.0, gt=0.0)
    phase_encoding_dirs: int = Field(default=96, gt=1)
    n_fibres: int = Field(default=3, gt=0)
    bet_frac: float = Field(default=0.3, ge=0.0, le=1.0)
    run_registration: bool = True
    topup_config: str = "b02b0_1.cnf"


def _write_acqparams(out_dir: str, bandwidth: float, pe_dirs: int) -> str:
    from pathlib import Path  # local import: this runs in a separate process

    value = (pe_dirs - 1) / bandwidth
    path = Path(out_dir) / "acqparams.txt"
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(f"0 1 0 {value:.4f}\n0 -1 0 {value:.4f}\n", encoding="utf-8")
    print(f"[dbsdti] wrote {path}")  # print, NOT logger.* inside a Function node
    return str(path)


def _check_inputs(config, context, **kwargs) -> list[str]:
    errors: list[str] = []
    if kwargs.get("dwi_ap") is None:
        errors.append("dbsdti requires a DWI AP volume (set dbsdti.dwi_ap).")
    return errors


@workflow(
    name="dbsdti",
    description="TOPUP/Eddy/DTIFit/BedpostX diffusion preprocessing.",
    protocol="dbsdti",
    default_config="dbsdti",
    config_namespace="dbsdti",
    config_schema=DbsdtiConfig,
)
@requires(
    dwi_ap=PatientFile(default="{patient_id}_dmri_AP.nii.gz", config_path="dbsdti.dwi_ap"),
    dwi_ap_bval=PatientFile(default="{patient_id}_dmri_AP.bval", config_path="dbsdti.dwi_ap_bval"),
    dwi_ap_bvec=PatientFile(default="{patient_id}_dmri_AP.bvec", config_path="dbsdti.dwi_ap_bvec"),
    t1=PatientFile(default="{patient_id}_T1.nii.gz", config_path="dbsdti.t1_image", optional=True),
    t2=PatientFile(default="{patient_id}_T2.nii.gz", config_path="dbsdti.t2_image", optional=True),
    mni_template=ExternalFile(config_path="dbsdti.mni_template", optional=True),
)
@produces(out_dir=OutputDir("dbsdti"))
@verify(_check_inputs)
def build_workflow(
    *,
    dwi_ap: Path,
    dwi_ap_bval: Path,
    dwi_ap_bvec: Path,
    t1: Path | None,
    t2: Path | None,
    mni_template: Path | None,
    out_dir: Path,
    config: PipelineConfig,
    context: ProcessingContext,
) -> Workflow:
    cfg = config.dbsdti  # a validated DbsdtiConfig instance
    wf = Workflow(name=f"dbsdti_{context.patient_id}")

    # --- b0 extraction -> acqparams -> TOPUP -> Eddy -> DTIFit ---
    extb0 = Node(ExtractROI(t_min=0, t_size=1, output_type="NIFTI"), name="extb0")
    extb0.inputs.in_file = str(dwi_ap)

    acq = Node(
        Function(
            input_names=["out_dir", "bandwidth", "pe_dirs"],
            output_names=["acqparams"],
            function=_write_acqparams,
        ),
        name="acqparams",
    )
    acq.inputs.out_dir = str(out_dir)
    acq.inputs.bandwidth = cfg.bandwidth
    acq.inputs.pe_dirs = cfg.phase_encoding_dirs

    merge = Node(Merge(dimension="t", output_type="NIFTI_GZ"), name="merge")
    topup = Node(TOPUP(config=cfg.topup_config, output_type="NIFTI_GZ"), name="topup")
    eddy = Node(Eddy(), name="eddy")
    eddy.inputs.in_bval = str(dwi_ap_bval)
    eddy.inputs.in_bvec = str(dwi_ap_bvec)

    dti = Node(DTIFit(save_tensor=True, sse=True, args="--wls"), name="dti")
    dti.inputs.base_name = str(out_dir / context.patient_id)

    wf.connect(
        [
            (extb0, merge, [("roi_file", "in1")]),
            (merge, topup, [("merged_file", "in_file")]),
            (acq, topup, [("acqparams", "encoding_file")]),
            (topup, eddy, [("out_corrected", "in_file")]),
            (eddy, dti, [("out_corrected", "dwi")]),
        ]
    )

    # --- optional N4/BET/ANTs registration branch, gated by config ---
    if cfg.run_registration and t2 is not None:
        bet = Node(BET(mask=True, frac=cfg.bet_frac, output_type="NIFTI"), name="bet")
        bet.inputs.in_file = str(t2)
        wf.add_nodes([bet])
        # ... wire N4 / ANTs nodes (using mni_template) here ...

    return wf  # NEVER wf.run()

The node/connect body above is illustrative wiring — adapt FSL input/output field names to your data. The load-bearing parts are the decorator stack (@workflow carrying config_namespace + config_schema, then @requires, @produces, @verify in any order beneath it) and the single keyword-only signature that names every injected kwarg: the five patient/external inputs, out_dir, config, and context.


Step 8 — Register and run it#

Two supported paths (no edit to framework source needed):

Option B — a workflows directory (auto-discovery)#

Set paths.scripts_dir in your config and drop scripts into it:

# config/dbsdti.yaml
paths:
  scripts_dir: ./my_workflows

thesis list-workflows then scans that directory (non-recursive *.py, skipping names starting with _) and lists each user script with a (user script: <path>) suffix:

thesis list-workflows
thesis run -w dbsdti -p DTI_LDF002 -c dbsdti

Verify it runs#

Validate wiring without executing the (multi-hour) pipeline:

# Build the graph, run preflight checks, but do not execute:
thesis run --script ./dbsdti_workflow.py -p DTI_LDF002 -c dbsdti --dry-run

# Render the DAG to <output>/workflow_graphs/workflow.png:
thesis run --script ./dbsdti_workflow.py -p DTI_LDF002 -c dbsdti --dry-run --graph

# Full event/log stream while debugging:
thesis run --script ./dbsdti_workflow.py -p DTI_LDF002 -c dbsdti -v

--dry-run builds the Workflow and runs the implicit @requires existence checks and any @verify callables, so a missing input or a typo in the dbsdti: config block surfaces immediately — long before a real run would.

Troubleshooting#

  • “more than one workflow registered” / load error. A --script file must contain exactly one @workflow. Split helpers and extra workflows into separate files.

  • “name collides with a built-in.” Rename your workflow; built-in names (e.g. preprocess, hcp) cannot be overwritten by a user script.

  • TypeError: config_schema requires config_namespace. You set config_schema=... without config_namespace=.... Add the namespace, or drop the schema to auto-derive one.

  • Unknown-key rejection in your YAML block. A key under dbsdti: that is not a field of DbsdtiConfig is rejected (extra="forbid"). Check for typos, or add the field to the schema.

  • {placeholder} KeyError when resolving a path. Only {patient_id} is substituted in PatientFile / OutputDir templates; any other {...} raises KeyError. Move that value into config instead.

  • Loguru pickling error from a Function node. You used logger.* inside a node body. Use print() and keep all imports local to the function.

  • Workflow “has no attribute run”. Your factory returned something other than a nipype.Workflow. Build and return wf; never call wf.run() yourself.

See also#

  • Custom workflows — the --script / scripts_dir mechanics, one-workflow-per-file rules, and auto-derived schemas.

  • Workflow usage guide — the built-in workflows, including preprocess, the idiomatic counterpart to this script.

  • Architecture — the decorator API, path declarations, and registry in depth.