Adapting an arbitrary Nipype script to the framework#
If you already have a working — or half-working — standalone Nipype script,
you do not need to rewrite it from scratch to run it inside the framework.
This tutorial walks through converting a real, messy research script into a
single @workflow-decorated function that the CLI can discover, configure,
validate, and execute.
The “before” example is a genuine lab script,
scripts/nipype_workflow_okt2025.py (the dbsdti pipeline: TOPUP → Eddy →
DTIFit → BedpostX, plus N4/BET/ANTs registration of T1/T2/CT/MNI). It carries
every anti-pattern you are likely to meet in the wild:
hardcoded absolute paths (
/local/data1/terhe81/DTI_LDF/,/home/terhe81/SynthSeg-master/...);a
patient_no = 'DTI_LDF002'baked into the source;output directories built by hand and a
DataSink;a
sys.modules/importlib.utilhack to load SynthSeg;a sibling helper module (
from alt_bX import ...) that only exists in the author’s directory;run settings as module globals (
bw = 1685,pe = 96,n_fib = 3,mode = 'reg');eager
.run()calls mixed with a declarativeWorkflow;a trailing
for-loop ofApplyTransformsreferencing undefined names.
It even has a syntax error at lines 351–352 (a missing comma after
fixed_image=... in the regMNI node), so the script as shipped does not
parse. That is a useful teaching point: the framework’s import-time
registration and @verify preflight catch problems before a multi-hour run,
whereas a standalone script fails late — or never imports at all.
The idiomatic targets to compare against are
src/thesis/workflows/minimal.py(the smallest decorated workflow) andsrc/thesis/workflows/preprocess/workflow.py(the framework’s own TOPUP/Eddy/BedpostX/DTIFit workflow — the direct counterpart to this script). See also Custom workflows for the out-of-tree script mechanics.
Tip
For a compact, runnable companion to this tutorial, compare
examples/plain_nipype_workflow.py (an ordinary, un-annotated Nipype script)
with examples/annotated_nipype_workflow.py (the same pipeline after applying
the framework decorators). Run the plain script with
python examples/plain_nipype_workflow.py and dry-run the annotated one with
thesis run --script examples/annotated_nipype_workflow.py -p P001 -c default --dry-run
to see the before→after diff this guide walks through, end to end.
The single biggest shift#
In the standalone script, importing the file runs the pipeline: the module
body builds nodes, calls .run() on some of them, wires the rest into a
Workflow, and finishes with dbsdti.run() (line 486).
In the framework, all work happens inside one factory function decorated
with @workflow. The factory only constructs and returns a
nipype.Workflow; it never calls .run(). Importing the file only
registers the workflow — the framework’s executor runs it later.
Everything below is one application of that principle.
Step 1 — Wrap the script body in a single @workflow function#
Before — work at module top level, executed on import:
patient_no = 'DTI_LDF002'
home_dir = '/local/data1/terhe81/DTI_LDF/'
save_dir = home_dir + run_name
Path(save_dir).mkdir(parents=True, exist_ok=True)
dbsdti = Workflow(name='dbsdti')
dbsdti.connect([...])
dbsdti.base_dir = save_dir
dbsdti.run() # runs at import time
After — one factory that returns the graph:
from nipype import Workflow
from thesis.core.context import ProcessingContext
from thesis.core.decorators import workflow
@workflow(
name="dbsdti",
description="TOPUP/Eddy/DTIFit/BedpostX diffusion preprocessing.",
protocol="dbsdti",
default_config="dbsdti",
)
def build_workflow(*, context: ProcessingContext) -> Workflow:
wf = Workflow(name=f"dbsdti_{context.patient_id}")
# build nodes, then:
# wf.connect([...])
return wf # NEVER wf.run()
Notes that carry through every later step:
@workflowmust be the outermost decorator. The inner decorators you add next (@requires,@produces,@verify) are pure metadata-attachers and may appear in any order.The function body is keyword-only (
*,). It declares only the kwargs it actually uses.configandcontextare injected only if the signature names them.Drop
dbsdti.write_graph(...),dbsdti.base_dir = ..., anddbsdti.run(). Graph rendering isthesis run ... --graph; the working directory iscontext.working_dir; execution is the framework executor. The factory only returnswf.
Step 2 — Replace hardcoded input paths with @requires / PatientFile#
Before — patient inputs assembled from absolute globals:
data_dir = home_dir + 'Patients/' + patient_no + '/'
image_X = data_dir + patient_no + '_dmri_AP.nii.gz'
bval_X = data_dir + patient_no + '_dmri_AP.bval'
bvec_X = data_dir + patient_no + '_dmri_AP.bvec'
T1_img = data_dir + patient_no + '_T1.nii.gz'
T2_img = data_dir + patient_no + '_T2.nii.gz'
After — declare inputs with @requires; each becomes a pathlib.Path
keyword argument resolved against context.input_dir:
from pathlib import Path
from thesis.core.decorators import requires, workflow
from thesis.core.path_declarations import ExternalFile, PatientFile
@workflow(name="dbsdti", description="...", protocol="dbsdti", default_config="dbsdti")
@requires(
dwi_ap=PatientFile(
default="{patient_id}_dmri_AP.nii.gz",
config_path="dbsdti.dwi_ap",
),
dwi_ap_bval=PatientFile(
default="{patient_id}_dmri_AP.bval",
config_path="dbsdti.dwi_ap_bval",
),
dwi_ap_bvec=PatientFile(
default="{patient_id}_dmri_AP.bvec",
config_path="dbsdti.dwi_ap_bvec",
),
t1=PatientFile(default="{patient_id}_T1.nii.gz", config_path="dbsdti.t1_image", optional=True),
t2=PatientFile(default="{patient_id}_T2.nii.gz", config_path="dbsdti.t2_image", optional=True),
# Cohort-shared assets (the MNI template) are NOT patient inputs:
mni_template=ExternalFile(config_path="dbsdti.mni_template", optional=True),
)
def build_workflow(
*,
dwi_ap: Path,
dwi_ap_bval: Path,
dwi_ap_bvec: Path,
t1: Path | None,
t2: Path | None,
mni_template: Path | None,
context: ProcessingContext,
) -> Workflow:
...
What changed and why:
The
{patient_id}placeholder replaces the manualpatient_no + '_...'string-building; the resolver substitutes the CLI-pvalue.config_path="dbsdti.dwi_ap"lets a YAML file override the default filename without touching the code.config_paths=[...](plural) tries several keys in order; the first non-Nonevalue wins, else thedefaultapplies.A non-
optionalPatientFilegenerates an implicit existence check: the framework verifies the file exists before running. Mark genuinely optional inputs withoptional=True— they resolve toNone.The MNI template is a cohort-shared asset, not a per-patient input, so it is an
ExternalFile(resolved from a config value, supporting absolute paths,~,$ENV, andbase_dirs). A file that lives under yourdata_dircould instead useDataFile("MNI/...").
Inside the body, pass these Path objects to nodes as strings exactly where the
script used the globals:
extb0 = Node(ExtractROI(t_min=0, t_size=1, output_type="NIFTI"), name="extb0")
extb0.inputs.in_file = str(dwi_ap)
Step 3 — Replace DataSink / manual output dirs with @produces / OutputDir#
Before — output directory built by hand, plus a DataSink:
save_dir = home_dir + run_name
Path(save_dir).mkdir(parents=True, exist_ok=True)
dsDTIscalars = Node(DataSink(), name='dsDTIscalars')
dsDTIscalars.inputs.base_directory = os.path.join(save_dir, 'DTIscalars')
dbsdti.connect([(dti, dsDTIscalars, [('FA', 'FA')]), ...])
After — declare the output with @produces; OutputDir resolves to
context.output_dir / <subdir> and creates it for you
(mkdir(parents=True, exist_ok=True)):
from thesis.core.decorators import produces
from thesis.core.path_declarations import OutputDir
@produces(out_dir=OutputDir("dbsdti"))
def build_workflow(*, out_dir: Path, context: ProcessingContext) -> Workflow:
...
dti = Node(DTIFit(save_tensor=True, sse=True, args="--wls"), name="dti")
dti.inputs.base_name = str(out_dir / context.patient_id)
...
What changed and why:
Drop the
DataSinkentirely. Either point each node’sout_file/base_namedirectly underout_dir, or expose results on anoutputnodeIdentityInterfaceso a meta-workflow can consume them contract-to-contract (seeattach_outputnodeinthesis.core.contracts).@producesdeclarations generate no existence check — the directory is created when the path is resolved, then injected as aPath.OutputDir("")(empty subdir) resolves tocontext.output_diritself, as inminimal.py.
Step 4 — Thread subject identity through ProcessingContext, not globals#
Before — the subject id is a hardcoded module global, reused for paths, node names, and log directories:
patient_no = 'DTI_LDF002'
run_name = dateNr + '_' + patient_no + '_diffpy/'
T1_img = data_dir + patient_no + '_T1.nii.gz'
After — the subject comes from the CLI -p flag as
context.patient_id; never hardcode it:
def build_workflow(*, context: ProcessingContext, ...) -> Workflow:
wf = Workflow(name=f"dbsdti_{context.patient_id}")
...
dti.inputs.base_name = str(out_dir / context.patient_id)
ProcessingContext carries patient_id, the merged config, and the resolved
data_dir / input_dir / output_dir / working_dir. You read those fields
instead of re-deriving directories from a home dir. The path declarations in
Steps 2–3 already use the context’s directories under the hood, so most
save_dir + ... concatenations simply disappear.
Step 5 — Remove sys.path / importlib hacks and sibling-module imports#
Before — the two most framework-hostile patterns in the script:
# 1) Inject a foreign SynthSeg module into sys.modules
module_path = "/home/terhe81/SynthSeg-master/scripts/commands/SynthSeg_predict.py"
spec = importlib.util.spec_from_file_location("SynthSeg_predict", module_path)
SynthSeg_predict = importlib.util.module_from_spec(spec)
sys.modules["SynthSeg_predict"] = SynthSeg_predict
# 2) Import a local helper that only exists in the author's directory
from alt_bX import concat_files, extbX, rmbX, modbX
After — delete both; use clean imports:
SynthSeg: the framework already wraps it. Use the standalone
synthsegworkflow (mri_synthseg) or the preprocess SynthSeg node. Nosys.path/sys.modulesmutation is permitted in a framework workflow.alt_bXhelpers: the framework’s preprocess package ships vendored node-builders for the same operations (the b-val modification, index-file creation, b0 extraction, and merge thatmodbX/extbX/rmbXdid). Reuse those, or — if you truly need a missing helper — vendor it into your own workflow package (e.g.workflows/dbsdti/operations/) and import it by a normal absolute path. Never rely on a sibling-directory import.
The net effect: the top of your file becomes ordinary imports only.
from pathlib import Path
from nipype import Node, Workflow
from nipype.interfaces.fsl import BET, DTIFit, Eddy, ExtractROI, Merge, TOPUP
from thesis.core.config import PipelineConfig
from thesis.core.config.validators import BaseConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import ExternalFile, OutputDir, PatientFile
While you are here, also remove the eager .run() calls. The script runs
extb0AP.run() and merge_bX.run() inline, then chains their .outputs into
later nodes. Replace those with graph edges so the engine resolves the DAG:
# Before:
b0AP = extb0AP.run()
bX_img = merge_bX.run()
# After:
wf.connect([(extb0, merge, [("roi_file", "in1")]),
(merge, topup, [("merged_file", "in_file")])])
Step 6 — Declare a Pydantic schema for the workflow’s YAML block#
Before — run settings live as module globals, baked into each node:
dataset = 'AP'
mode = 'reg' # 'reg' (run registration) or 'avoid'
fs = 'avoid'
n_fib = 3
bw = 1685 # acquisition bandwidth
pe = 96 # phase-encoding directions
frac = 0.3 # used in BET(frac=0.3)
After — a BaseConfig subclass that owns a top-level YAML key. The string
mode flags (mode == 'reg', fs == 'run') become typed booleans:
from pydantic import Field
from thesis.core.config.validators import BaseConfig
class DbsdtiConfig(BaseConfig):
"""Schema for the `dbsdti:` block in YAML (extra keys are rejected)."""
bandwidth: float = Field(default=1685.0, gt=0.0)
phase_encoding_dirs: int = Field(default=96, gt=1)
n_fibres: int = Field(default=3, gt=0)
bet_frac: float = Field(default=0.3, ge=0.0, le=1.0)
run_registration: bool = True
topup_config: str = "b02b0_1.cnf"
The numeric bounds (
gt=0.0,gt=1,ge=0.0/le=1.0) are an added validation improvement, not a transcription of the original script — the script kept these as plain module globals (bw = 1685,pe = 96). They reject nonsensical YAML overrides at load time; tighten or drop them to suit your data.
Wire it into the decorator with config_namespace + config_schema:
@workflow(
name="dbsdti",
description="TOPUP/Eddy/DTIFit/BedpostX diffusion preprocessing.",
protocol="dbsdti",
default_config="dbsdti",
config_namespace="dbsdti",
config_schema=DbsdtiConfig,
)
Then read it inside the body as a typed object:
def build_workflow(*, config: PipelineConfig, ...) -> Workflow:
cfg = config.dbsdti # a validated DbsdtiConfig instance
bet = Node(BET(mask=True, frac=cfg.bet_frac, output_type="NIFTI"), name="bet")
...
if cfg.run_registration and t2 is not None:
... # the N4/BET/ANTs branch, gated by config
Why this matters:
BaseConfigsetsextra="forbid", so a typo in thedbsdti:block is rejected at load time instead of being silently ignored.Registering the namespace lets your workflow add its own config section without editing
core/config/validators.py. An unknown top-level key that is not a registered namespace is still rejected.config_schemarequiresconfig_namespace; setting the schema without the namespace raisesTypeError. If you omitconfig_schema, a schema is auto-derived from your@requiresconfig_paths(each becomes an optionalstrfield) — see the “Auto-derived schema” section of Custom workflows.Thread/worker and GPU settings come from
config.hardware(e.g.getattr(config.hardware, "gpu_enabled", False)), not fromos.sched_getaffinity(0).
You can also add an explicit preflight check with @verify. A verifier returns
[] on success or a list of human-readable error strings; opt into receiving
resolved paths by adding **kwargs:
def _check_inputs(config, context, **kwargs) -> list[str]:
errors: list[str] = []
if kwargs.get("dwi_ap") is None:
errors.append("dbsdti requires a DWI AP volume (set dbsdti.dwi_ap).")
return errors
@verify(_check_inputs)
def build_workflow(...):
...
Explicit checks run after the implicit existence checks generated from
@requires.
Step 7 — Fix logging (print inside Function nodes)#
Before — module-level logging reconfiguration and print for debugging:
config.update_config({'logging': {'log_directory': save_dir, 'log_to_file': True}})
logging.update_logging(config)
print(image_noX)
After — use the framework logger at module scope; never the stdlib
logging module, and never call logging.update_logging. The framework
owns log routing:
from thesis.core.logging import get_logger
logger = get_logger(__name__)
Critical exception: inside a nipype Function node body, use print() —
the loguru logger cannot be pickled into the worker process. All imports inside
a Function node must also be local to the function:
def _write_acqparams(out_dir: str, bandwidth: float, pe_dirs: int) -> str:
from pathlib import Path # local import: separate process
value = (pe_dirs - 1) / bandwidth
path = Path(out_dir) / "acqparams.txt"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(f"0 1 0 {value:.4f}\n0 -1 0 {value:.4f}\n", encoding="utf-8")
print(f"[dbsdti] wrote {path}") # print, NOT logger.*
return str(path)
This is exactly the pattern in minimal.py’s _write_marker: the manual
acqparams.txt write the script did at module level (lines 108–113) becomes a
Function node that runs in the engine, deriving its value from
cfg.bandwidth / cfg.phase_encoding_dirs.
Putting it together — the assembled file#
Each step above showed a fragment with only the kwargs that step introduced.
The finished dbsdti_workflow.py reconciles them into one decorator stack
and one build_workflow signature that lists every injected kwarg. This is
the single runnable file you point --script at:
from pathlib import Path
from nipype import Node, Workflow
from nipype.interfaces.fsl import BET, DTIFit, Eddy, ExtractROI, Merge, TOPUP
from nipype.interfaces.utility import Function
from pydantic import Field
from thesis.core.config import PipelineConfig
from thesis.core.config.validators import BaseConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.logging import get_logger
from thesis.core.path_declarations import ExternalFile, OutputDir, PatientFile
logger = get_logger(__name__)
class DbsdtiConfig(BaseConfig):
"""Schema for the `dbsdti:` block in YAML (extra keys are rejected)."""
# Numeric bounds are an added validation improvement, not in the original script.
bandwidth: float = Field(default=1685.0, gt=0.0)
phase_encoding_dirs: int = Field(default=96, gt=1)
n_fibres: int = Field(default=3, gt=0)
bet_frac: float = Field(default=0.3, ge=0.0, le=1.0)
run_registration: bool = True
topup_config: str = "b02b0_1.cnf"
def _write_acqparams(out_dir: str, bandwidth: float, pe_dirs: int) -> str:
from pathlib import Path # local import: this runs in a separate process
value = (pe_dirs - 1) / bandwidth
path = Path(out_dir) / "acqparams.txt"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(f"0 1 0 {value:.4f}\n0 -1 0 {value:.4f}\n", encoding="utf-8")
print(f"[dbsdti] wrote {path}") # print, NOT logger.* inside a Function node
return str(path)
def _check_inputs(config, context, **kwargs) -> list[str]:
errors: list[str] = []
if kwargs.get("dwi_ap") is None:
errors.append("dbsdti requires a DWI AP volume (set dbsdti.dwi_ap).")
return errors
@workflow(
name="dbsdti",
description="TOPUP/Eddy/DTIFit/BedpostX diffusion preprocessing.",
protocol="dbsdti",
default_config="dbsdti",
config_namespace="dbsdti",
config_schema=DbsdtiConfig,
)
@requires(
dwi_ap=PatientFile(default="{patient_id}_dmri_AP.nii.gz", config_path="dbsdti.dwi_ap"),
dwi_ap_bval=PatientFile(default="{patient_id}_dmri_AP.bval", config_path="dbsdti.dwi_ap_bval"),
dwi_ap_bvec=PatientFile(default="{patient_id}_dmri_AP.bvec", config_path="dbsdti.dwi_ap_bvec"),
t1=PatientFile(default="{patient_id}_T1.nii.gz", config_path="dbsdti.t1_image", optional=True),
t2=PatientFile(default="{patient_id}_T2.nii.gz", config_path="dbsdti.t2_image", optional=True),
mni_template=ExternalFile(config_path="dbsdti.mni_template", optional=True),
)
@produces(out_dir=OutputDir("dbsdti"))
@verify(_check_inputs)
def build_workflow(
*,
dwi_ap: Path,
dwi_ap_bval: Path,
dwi_ap_bvec: Path,
t1: Path | None,
t2: Path | None,
mni_template: Path | None,
out_dir: Path,
config: PipelineConfig,
context: ProcessingContext,
) -> Workflow:
cfg = config.dbsdti # a validated DbsdtiConfig instance
wf = Workflow(name=f"dbsdti_{context.patient_id}")
# --- b0 extraction -> acqparams -> TOPUP -> Eddy -> DTIFit ---
extb0 = Node(ExtractROI(t_min=0, t_size=1, output_type="NIFTI"), name="extb0")
extb0.inputs.in_file = str(dwi_ap)
acq = Node(
Function(
input_names=["out_dir", "bandwidth", "pe_dirs"],
output_names=["acqparams"],
function=_write_acqparams,
),
name="acqparams",
)
acq.inputs.out_dir = str(out_dir)
acq.inputs.bandwidth = cfg.bandwidth
acq.inputs.pe_dirs = cfg.phase_encoding_dirs
merge = Node(Merge(dimension="t", output_type="NIFTI_GZ"), name="merge")
topup = Node(TOPUP(config=cfg.topup_config, output_type="NIFTI_GZ"), name="topup")
eddy = Node(Eddy(), name="eddy")
eddy.inputs.in_bval = str(dwi_ap_bval)
eddy.inputs.in_bvec = str(dwi_ap_bvec)
dti = Node(DTIFit(save_tensor=True, sse=True, args="--wls"), name="dti")
dti.inputs.base_name = str(out_dir / context.patient_id)
wf.connect(
[
(extb0, merge, [("roi_file", "in1")]),
(merge, topup, [("merged_file", "in_file")]),
(acq, topup, [("acqparams", "encoding_file")]),
(topup, eddy, [("out_corrected", "in_file")]),
(eddy, dti, [("out_corrected", "dwi")]),
]
)
# --- optional N4/BET/ANTs registration branch, gated by config ---
if cfg.run_registration and t2 is not None:
bet = Node(BET(mask=True, frac=cfg.bet_frac, output_type="NIFTI"), name="bet")
bet.inputs.in_file = str(t2)
wf.add_nodes([bet])
# ... wire N4 / ANTs nodes (using mni_template) here ...
return wf # NEVER wf.run()
The node/connect body above is illustrative wiring — adapt FSL input/output
field names to your data. The load-bearing parts are the decorator stack
(@workflow carrying config_namespace + config_schema, then @requires,
@produces, @verify in any order beneath it) and the single keyword-only
signature that names every injected kwarg: the five patient/external inputs,
out_dir, config, and context.
Step 8 — Register and run it#
Two supported paths (no edit to framework source needed):
Option A — out-of-tree script (recommended for adaptation)#
Keep the adapted file anywhere and point the CLI at it:
thesis run --script ./dbsdti_workflow.py -p DTI_LDF002 -c dbsdti
--script imports the file, which self-registers via @workflow. Constraints:
the file must have a
.pyextension;it must register exactly one
@workflow(more than one is rejected — split each into its own file);the name must not collide with a built-in (so name it
dbsdti, notpreprocess).
If you pass both --script and -w and they differ, the CLI warns and uses the
script’s registered name.
Option B — a workflows directory (auto-discovery)#
Set paths.scripts_dir in your config and drop scripts into it:
# config/dbsdti.yaml
paths:
scripts_dir: ./my_workflows
thesis list-workflows then scans that directory (non-recursive *.py,
skipping names starting with _) and lists each user script with a
(user script: <path>) suffix:
thesis list-workflows
thesis run -w dbsdti -p DTI_LDF002 -c dbsdti
Verify it runs#
Validate wiring without executing the (multi-hour) pipeline:
# Build the graph, run preflight checks, but do not execute:
thesis run --script ./dbsdti_workflow.py -p DTI_LDF002 -c dbsdti --dry-run
# Render the DAG to <output>/workflow_graphs/workflow.png:
thesis run --script ./dbsdti_workflow.py -p DTI_LDF002 -c dbsdti --dry-run --graph
# Full event/log stream while debugging:
thesis run --script ./dbsdti_workflow.py -p DTI_LDF002 -c dbsdti -v
--dry-run builds the Workflow and runs the implicit @requires existence
checks and any @verify callables, so a missing input or a typo in the
dbsdti: config block surfaces immediately — long before a real run would.
Troubleshooting#
“more than one workflow registered” / load error. A
--scriptfile must contain exactly one@workflow. Split helpers and extra workflows into separate files.“name collides with a built-in.” Rename your workflow; built-in names (e.g.
preprocess,hcp) cannot be overwritten by a user script.TypeError: config_schema requires config_namespace. You setconfig_schema=...withoutconfig_namespace=.... Add the namespace, or drop the schema to auto-derive one.Unknown-key rejection in your YAML block. A key under
dbsdti:that is not a field ofDbsdtiConfigis rejected (extra="forbid"). Check for typos, or add the field to the schema.{placeholder}KeyError when resolving a path. Only{patient_id}is substituted inPatientFile/OutputDirtemplates; any other{...}raisesKeyError. Move that value into config instead.Loguru pickling error from a
Functionnode. You usedlogger.*inside a node body. Useprint()and keep all imports local to the function.Workflow “has no attribute
run”. Your factory returned something other than anipype.Workflow. Build andreturn wf; never callwf.run()yourself.
See also#
Custom workflows — the
--script/scripts_dirmechanics, one-workflow-per-file rules, and auto-derived schemas.Workflow usage guide — the built-in workflows, including
preprocess, the idiomatic counterpart to this script.Architecture — the decorator API, path declarations, and registry in depth.