Building a workflow#
This is the complete reference for building a workflow with the thesis
decorator API. It applies equally to the package’s built-in workflows under
src/thesis/workflows/ and to out-of-tree scripts you run with --script or
auto-discover from paths.scripts_dir.
A workflow is a single factory function that builds and returns a raw
nipype.Workflow. You decorate it to:
give it a name and metadata (
@workflow),declare the inputs it needs and where they come from (
@requires),declare the outputs it writes (
@produces), and(optionally) attach preflight checks (
@verify).
The framework does the rest: it resolves your declared paths into
pathlib.Path objects, injects them as keyword arguments, runs your preflight
checks, registers the workflow so the CLI can find it, and hands the returned
graph to the Nipype executor. Your factory never calls .run() — it only
constructs the graph.
Tip
If you are porting an existing standalone Nipype script into this framework,
read the step-by-step tutorial first:
Adapting a standalone Nipype script. It maps every
“messy real-world” pattern (hardcoded paths, eager .run(), sys.modules
hacks) onto the idiom described here.
TL;DR#
# Run an out-of-tree script directly:
thesis run --script ./my_workflow.py -p DTI_001 -c default
# config/default.yaml — or auto-discover a whole directory of scripts:
paths:
scripts_dir: ./my_workflows
thesis list-workflows # discovers everything in paths.scripts_dir
thesis run -w my_workflow -p DTI_001 -c default
The four decorators#
All four live in thesis.core.decorators. Three of them — @requires,
@produces, @verify — are pure metadata attachers: they accumulate
declarations on the function and have no other side effect. Only @workflow
does real work (it builds the adapter and registers the workflow).
Important
Decoration order matters in exactly one way: @workflow must be the
outermost decorator. The inner three (@requires, @produces, @verify)
may appear in any order relative to each other.
@workflow(name="my_workflow", ...) # OUTERMOST — the only one with side effects
@requires(t1=PatientFile(...)) # inner three: any order
@produces(out_dir=OutputDir(...))
@verify(_check_inputs)
def build_workflow(*, t1, out_dir, config, context):
...
@workflow(...) — register the workflow#
The outermost decorator. It reads the metadata the inner decorators attached,
synthesises a (config, context) -> Workflow adapter, builds a composite
verifier, and registers the result in WORKFLOW_REGISTRY at import time.
It returns the original function unchanged.
@workflow(
name="my_workflow", # required: unique CLI id / registry key
description="One-liner shown by `thesis list-workflows`.",
protocol="hcp", # optional: default protocol (-> default_protocol)
default_config="my_workflow", # optional: default config when -c is omitted
scope="patient", # "patient" (default) or "cohort"
config_namespace="my_workflow",# optional: own a top-level YAML key (see below)
config_schema=MyWorkflowConfig,# optional: Pydantic schema for that key
)
Parameter |
Effect |
|---|---|
|
Required. Unique registry key; |
|
Shown by |
|
Becomes |
|
The default |
|
|
|
A top-level YAML key this workflow owns (see Owning a config section). |
|
A |
Errors and validation at decoration time:
ValueErrorifscopeis neither"patient"nor"cohort".TypeErrorifconfig_schemais set withoutconfig_namespace.TypeError(scope/declaration mismatch): ascope="cohort"workflow may not declare aPatientFile/PatientDirrequirement, and ascope="patient"workflow may not declare aCohortDiroutput.With
THESIS_STRICT_REGISTRY=1, a duplicate workflow name or duplicate config namespace raisesValueErrorinstead of warning — useful for catching accidental double-imports in tests.
@requires(**path_decls) — declare inputs#
Each keyword names a keyword argument your factory body will receive, mapped to
a path declaration. The adapter resolves each declaration
against the active ProcessingContext and injects the resulting Path (or
None, or list[Path], depending on the declaration type).
@requires(
t1=PatientFile(default="T1w/T1w_acpc_dc_restore.nii.gz"),
diffusion=PatientDir(default="T1w/Diffusion"),
)
def build_workflow(*, t1: Path, diffusion: Path, ...): ...
Every non-optional @requires declaration also generates an implicit
existence check that runs before your workflow is built. There is no
boilerplate to write for “the input file must exist”.
@produces(**path_decls) — declare outputs#
Same shape as @requires, but for outputs. Output declarations generate no
existence check (the file does not exist yet). The adapter still resolves them
— and OutputDir/CohortDir create the directory (mkdir(parents=True, exist_ok=True)) on resolve — then injects the resulting Path.
@produces(out_dir=OutputDir("my_workflow")) # -> context.output_dir / "my_workflow", created
def build_workflow(*, out_dir: Path, ...): ...
@verify(*checks) — custom preflight checks#
Attaches one or more preflight callables. Each returns a list[str] of
human-readable error messages (an empty list means “all clear”). They run
after the implicit existence checks from @requires.
A verifier may use either signature:
The historical
(config, context) -> list[str].An opt-in form that also receives resolved declared values: add a
**kwargscatch-all (it receives all declared values keyed by their declaration name), or name a parameter exactly after a declared key.
def _check_t1_is_nifti(config, context, **kwargs) -> list[str]:
t1: Path = kwargs["t1"] # same key as @requires(t1=...)
if t1 is not None and t1.suffix not in {".gz", ".nii"}:
return [f"t1 must be NIfTI, got {t1.suffix!r}"]
return []
Note
When a workflow has no required inputs and no @verify checks, the
composite verifier is None (the “no verifier” convention). Otherwise it
resolves all declared paths once, runs the existence checks, then dispatches
your explicit checks.
Path declarations#
Path declarations (thesis.core.path_declarations) describe where an input
comes from or where an output goes — declaratively, without hardcoding
absolute paths. The @workflow adapter calls each declaration’s resolve()
against the ProcessingContext and passes the result to your body.
The most common ones:
Declaration |
Resolves to |
Anchored at |
Implicit existence check |
|---|---|---|---|
|
|
|
must exist + be a file, unless |
|
|
|
must exist + be a dir, unless |
|
|
|
none |
|
|
|
none |
|
|
|
none |
|
|
|
exists/file, or glob ≥1; unless |
|
|
|
exists + file, unless |
|
|
|
exists + dir, unless |
|
|
config value; absolute/ |
exists + file, unless |
|
|
|
|
|
|
shared base dir; group falls through together |
each item non-empty, unless |
|
|
YAML list at |
per-item file/dir existence, unless |
|
|
|
|
See docs/api/core.path_declarations for
the exhaustive list and per-field reference.
Where a value comes from: the config chain#
PatientFile, PatientDir, and ExternalFile let you point at one or more
config keys, falling back to a default:
config_paths(astror list of dotted keys) is consulted in order;config_pathis appended to the chain if not already present.The first non-
Noneconfig value wins; if all are absent, the declaration’sdefaultis used.A declaration must supply at least one of
default,config_path,config_paths, oroptional=True— otherwiseConfigurationErroris raised at construction.
t1=PatientFile(
default="T1w/T1w_acpc_dc_restore.nii.gz", # last-resort fallback
config_paths=["my_workflow.t1_image", "hcp.t1_image"], # first non-None wins
)
{patient_id} templating#
Filenames, directory names, subdir names, and glob patterns support the
{patient_id} placeholder, which is substituted from context.patient_id:
dwi_ap=PatientFile(default="{patient_id}_dmri_AP.nii.gz")
Any other {placeholder} raises KeyError — only {patient_id} is valid.
fallback_dirs#
When set on PatientFile/PatientDir, fallback_dirs becomes the complete
search order — "input_dir" is no longer implied, so include it explicitly if
you want it. Valid names: "input_dir", "output_dir", "working_dir",
"data_dir", ".".
mask=PatientFile(
default="brain_mask.nii.gz",
fallback_dirs=["input_dir", "output_dir", "data_dir"], # searched in this order
)
A complete, annotated example#
This is the smallest correct decorated workflow that exercises every decorator
and the most common declarations. Save it as my_workflow.py and run it
out-of-tree.
"""my_workflow.py — a minimal correct decorator-based thesis workflow.
Run it out-of-tree:
thesis run --script ./my_workflow.py -p P001 -c default
Or place it in paths.scripts_dir and it appears in `thesis list-workflows`.
"""
from pathlib import Path
from nipype import Node, Workflow
from nipype.interfaces.utility import Function
from thesis.core.config import PipelineConfig # type-only / optional
from thesis.core.context import ProcessingContext # type-only / optional
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile
# --- Function-node body: runs in a SEPARATE process -------------------------
# Rule: NO loguru here (it can't be pickled). Use print(). All imports LOCAL.
def _copy_header_note(t1_path: str, out_dir: str, patient_id: str) -> str:
from pathlib import Path # local import is mandatory inside Function nodes
note = Path(out_dir) / f"{patient_id}_note.txt"
note.parent.mkdir(parents=True, exist_ok=True)
note.write_text(f"processed {t1_path}\n", encoding="utf-8")
print(f"[my_workflow] wrote {note}") # NOT logger.* — print only
return str(note)
# --- Optional custom preflight check ----------------------------------------
# Opts into resolved kwargs via **kwargs. Returns [] on success, else error
# strings. Runs AFTER the implicit existence check from @requires.
def _check_t1_is_nifti(config, context, **kwargs) -> list[str]:
t1: Path = kwargs["t1"] # same name as the @requires key
if t1 is not None and t1.suffix not in {".gz", ".nii"}:
return [f"t1 must be NIfTI, got {t1.suffix!r}"]
return []
# --- The workflow ------------------------------------------------------------
# DECORATION ORDER: @workflow MUST be outermost. Inner three: any order.
@workflow(
name="my_workflow", # unique CLI id (registry key)
description="One-line summary for `thesis list-workflows`.",
protocol="hcp", # -> WorkflowEntry.default_protocol (optional)
scope="patient", # "patient" (default) or "cohort"
)
@requires(
# Injected as a keyword-only `t1: Path`. Resolves config_paths in order
# (first non-None wins), else `default`, anchored at input_dir. Generates an
# IMPLICIT existence check (the file must exist) unless optional=True.
t1=PatientFile(
default="T1w/T1w_acpc_dc_restore.nii.gz",
config_paths=["my_workflow.t1_image", "hcp.t1_image"],
),
)
@produces(
# Injected as `out_dir: Path`. Created on resolve. No existence check.
out_dir=OutputDir("my_workflow"), # -> context.output_dir / "my_workflow"
)
@verify(_check_t1_is_nifti) # runs AFTER the implicit @requires check
def build_workflow(
*, # body is keyword-only
t1: Path, # from @requires(t1=...)
out_dir: Path, # from @produces(out_dir=...)
config: PipelineConfig, # injected ONLY because declared here
context: ProcessingContext, # injected ONLY because declared here
) -> Workflow:
"""Build the workflow. Return a raw nipype.Workflow — no base class."""
wf = Workflow(name=f"my_workflow_{context.patient_id}")
node = Node(
Function(
input_names=["t1_path", "out_dir", "patient_id"],
output_names=["out_file"],
function=_copy_header_note,
),
name="note",
)
node.inputs.t1_path = str(t1)
node.inputs.out_dir = str(out_dir)
node.inputs.patient_id = context.patient_id
wf.add_nodes([node])
return wf # framework executes it; never wf.run()
The smallest in-tree example is src/thesis/workflows/minimal.py (no
@requires, no @verify, a single @produces(out_dir=OutputDir("")) — note
that OutputDir("") resolves to context.output_dir itself). A complete
runnable out-of-tree example lives at examples/user_workflow_example.py.
What the body receives#
The body is keyword-only (*,). Declare only the kwargs you actually
use:
One keyword per
@requires/@produceskey, carrying the resolved value.config: PipelineConfigand/orcontext: ProcessingContextare injected only if your signature names them. The adapter inspects the signature and forwards them on demand.
The body returns a raw nipype.Workflow. There is no base class to
inherit — BaseModule, BasePipelineStep, BasePipeline, and NipypeStep do
not exist. Just build nipype.Node(...) objects, wire them with
wf.connect(...), and return wf.
ProcessingContext: the state carrier#
ProcessingContext (thesis.core.context) is created by the CLI before your
workflow is built and carries everything a workflow needs:
Field |
Meaning |
|---|---|
|
The |
|
The merged |
|
Cohort-shared data root (anchors |
|
Per-patient input root (anchors |
|
Per-patient (or cohort) output root (anchors |
|
Nipype working directory (anchors |
|
Free-form dicts for cross-stage handoffs. |
output_dir and working_dir are resolved and created automatically when
the context is constructed. Path declarations call the context’s
traversal-guarded helpers (get_input_path, get_output_path,
get_working_path) — you rarely touch these directly; the declarations do it
for you. Inside the body, the most common direct use is
context.patient_id (e.g. for a unique workflow name).
inputnode / outputnode contracts (for composability)#
A standalone single-patient workflow runs as-is. To embed it in a
meta-pipeline (full_pipeline, tract_synthseg), it must publish a
stable I/O contract so the meta-workflow can wire it contract-to-contract
rather than reaching into internal node names.
Use the three helpers from thesis.core.contracts:
attach_inputnode(wf, fields, defaults=...)— create anIdentityInterfaceinputnode.defaultslets a standalone run resolve statically (aNonedefault is skipped so the trait staysUndefined); a meta-workflow edge overrides a field at run time. Adefaultskey not infieldsraisesValueError.attach_outputnode(wf, fields)— create theoutputnode.fan_out(wf, inputnode, field, targets)— connect oneinputnodefield to several internal consumer(node, port)targets.
Published field-name tuples (so both ends agree on spelling) live in
thesis.core.contracts, e.g.
ROI_OUTPUT_FIELDS = ("roi_seed", "roi_stop", "roi_avoid", "roi_target").
from thesis.core.contracts import attach_inputnode, attach_outputnode, fan_out
inputnode = attach_inputnode(wf, ["moving_image"], defaults={"moving_image": str(t1)})
fan_out(wf, inputnode, "moving_image", [(reg_node, "moving_image")])
outputnode = attach_outputnode(wf, ["transform", "t1_brain"])
wf.connect(reg_node, "out_transform", outputnode, "transform")
A meta-workflow then connects only the boundary nodes:
meta.connect(upstream_wf, "outputnode.t1_brain", downstream_wf, "inputnode.moving_image")
See nipype_integration.md for the full contracts
walk-through and src/thesis/workflows/full_pipeline/_core.py for the worked
reference.
Logging rule (and the Function-node exception)#
In normal module code, use loguru via the framework helper — never the stdlib
logging module:
from thesis.core.logging import get_logger
logger = get_logger(__name__)
Warning
Inside a Nipype Function node body, use print() — not loguru. Function
nodes run in a separate process, and the loguru logger cannot be pickled
into the worker. For the same reason, all imports inside a Function node must
be local to the function (see _copy_header_note in the example above, which
re-imports pathlib locally).
Owning a config section#
By default, PipelineConfig rejects unknown top-level YAML keys. A workflow can
claim its own top-level key without editing core/config/validators.py by
passing config_namespace to @workflow:
from pydantic import Field
from thesis.core.config.validators import BaseConfig # the schema base class
class MyWorkflowConfig(BaseConfig): # BaseConfig forbids extra keys (catches typos)
threshold: float = Field(default=0.5, ge=0.0, le=1.0)
label: str = Field(default="run")
@workflow(
name="my_workflow",
config_namespace="my_workflow", # owns the `my_workflow:` YAML key
config_schema=MyWorkflowConfig, # validates that key
)
@produces(out_dir=OutputDir("my_workflow"))
def build_workflow(*, out_dir: Path, config: PipelineConfig, context) -> Workflow:
cfg = config.my_workflow # typed MyWorkflowConfig instance
...
Now this YAML block is accepted and validated:
my_workflow:
threshold: 0.8
label: trial-A
How it works: the decorator registers MyWorkflowConfig under
"my_workflow" in NAMESPACE_REGISTRY at import time. PipelineConfig
consults that registry during validation — a top-level key matching a
registered namespace is validated by its schema; an unknown key with no
registered namespace is still rejected.
Note
config_schema requires config_namespace (otherwise TypeError). The
schema class must subclass thesis.core.config.validators.BaseConfig (a
Pydantic v2 BaseModel with extra="forbid"). There is no core/base.py
workflow base class — BaseConfig here is only a config-validation base model.
Auto-derived schema (omit config_schema)#
If you set config_namespace but omit config_schema, the schema is
auto-derived from the @requires config_paths that point at your own
namespace. Every such key becomes an optional str field (the resolver
still applies the declaration’s default when the YAML value is absent):
@workflow(name="my_workflow", config_namespace="my_workflow") # no config_schema
@requires(t1=PatientFile(config_paths=["my_workflow.t1_image", "hcp.t1_image"]))
@produces(out_dir=OutputDir("my_workflow"))
def build_workflow(*, t1: Path, out_dir: Path, context: ProcessingContext) -> Workflow:
...
accepts exactly:
my_workflow:
t1_image: /path/to/T1w.nii.gz # optional; falls back to the PatientFile default
Limits of auto-derivation:
Only keys named in
config_pathswithin your own namespace are derived; foreign-namespace paths (hcp.t1_imageabove) belong tohcp’s schema and are ignored here.Every derived field is
Optional[str]— no numeric types, bounds, or cross-field validators. For those, write an explicitconfig_schema.
Registration and discovery#
Workflows self-register at import time (the @workflow decorator runs on
import). There are two supported ways to get the framework to import your file.
--script PATH.py (out-of-tree, recommended for experiments)#
thesis run --script ./my_workflow.py -p DTI_001 -c my_workflow
The file is imported as an isolated module (thesis_user_scripts.<hash>),
which triggers self-registration. --script overrides -w; if you pass both
and they name different workflows, the CLI warns and uses the script’s
registered name.
paths.scripts_dir (auto-discovery)#
Point your config at a directory; the framework scans it
non-recursively for *.py files (skipping names starting with _):
paths:
scripts_dir: ./my_workflows
thesis list-workflows then lists each discovered script with a
(user script: <path>) suffix (load failures are reported but do not abort);
thesis run -w my_workflow ... runs it.
Config hierarchy#
User scripts go through the same five-level config merge as built-in workflows:
default.yamlhardware.yamlprotocol config (from
@workflow(protocol=...)or--protocol)per-patient config
config/patients/<patient_id>.yamlCLI flags / overrides
If your script declares config_namespace, the matching YAML block in any of
those layers is validated against your schema at load time.
Common gotchas#
@workflowmust be outermost. If an inner decorator ends up outermost, the registration never happens. The inner three may be in any order.One
@workflowper--scriptfile. A file that registers zero or more than one workflow is rejected with aClickException; split each@workflowinto its own file.No name collisions with built-ins. A user script that registers a name already used by a built-in (e.g.
hcp,preprocess) is rejected. Pick a unique name.Never call
.run()in the factory. Return theWorkflow; the framework executes it. Eagernode.run()/wf.run()defeats the DAG scheduler and breaks dry-runs and graph rendering.The returned object must be a
nipype.Workflow(the CLI requires a.runattribute). Don’t return aNode, a path, orNone.Declare only the kwargs you use.
config/contextarrive only if your signature names them; a typo’d or missing declared key is aTypeErrorat call time.print()inside Function nodes, loguru everywhere else — and keep all imports local inside Function-node bodies (they run in a separate process).@requires→ implicit existence check;@produces→ none. A required input that does not exist fails preflight (unlessoptional=True); an output directory is created for you on resolve.fallback_dirsreplaces the default search order. When you set it on aPatientFile/PatientDir, include"input_dir"explicitly if you still want it searched.Only
{patient_id}is a valid template placeholder. Any other{placeholder}raisesKeyError.config_schemaneedsconfig_namespace(otherwiseTypeError), and the schema must subclassthesis.core.config.validators.BaseConfig.Cohort vs patient scope is validated. A
scope="cohort"workflow may not declarePatientFile/PatientDir; ascope="patient"workflow may not declareCohortDir. Use the scope-appropriate declarations (CohortDir,CohortPatients,DataFile, …).
Runnable examples#
Every example below is a real, runnable file in the repository. Examples without
a @requires input can be exercised without real patient data via --dry-run,
which builds the graph and runs preflight checks but does not execute the
pipeline; an example with a required input surfaces that existence check under
--dry-run instead (see the user_workflow_example.py row).
The commands use -c default to match the rest of these guides — that is your
local config, set up by copying config/default.example.yaml. On a stock
checkout where only the *.example.yaml configs are tracked, substitute
-c default.example (this is exactly why example_nipype_workflow.py loads
default.example internally, so it runs straight from a fresh clone).
Example |
What it shows |
How to run it |
|---|---|---|
|
The smallest in-tree workflow — a single |
|
|
A normal Nipype pipeline before (plain) and after (annotated with the framework decorators). The fastest way to see exactly which annotations a plain script needs. |
Before: |
|
An out-of-tree |
|
|
Running a registered workflow programmatically from Python (no CLI) — fetch the adapter from |
|
The plain/annotated pair is the quickest way to read the diff between an
ordinary Nipype script and its framework-annotated form: run
examples/plain_nipype_workflow.py to see the un-annotated “before”, then
compare it line-for-line with examples/annotated_nipype_workflow.py, the
decorated “after” that the CLI can discover and dry-run. The full step-by-step
rationale for that conversion is in
Adapting a standalone Nipype script.
Note
Running a workflow programmatically (as in examples/example_nipype_workflow.py)
goes through the registry adapter, not the decorated factory directly. The
@workflow decorator returns your original keyword-only function unchanged, so
calling build_workflow(config, context) raises TypeError. Fetch the
registered (config, context) -> Workflow adapter instead:
import thesis.workflows.minimal # noqa: F401 — import for @workflow side effect
from thesis.core.registry import WORKFLOW_REGISTRY
entry = WORKFLOW_REGISTRY.get("minimal") # WorkflowEntry with .factory / .verifier
wf = entry.factory(config, context) # adapter resolves @requires/@produces paths
Where to look next#
Adapting a standalone Nipype script — a worked before→after port of a real-world script.
Project Architecture Overview — the registry, dispatch, and where the build step fits in the data flow.
Nipype Integration Guide — how the framework wraps and dispatches Nipype, contracts, and the execution model.
docs/api/core.decorators,docs/api/core.path_declarations,docs/api/core.contracts— API reference.src/thesis/workflows/minimal.py— smallest in-tree example.src/thesis/workflows/atlas/workflow.py— a fully-fledged cohort workflow with custom@verifychecks.