Building a workflow#

This is the complete reference for building a workflow with the thesis decorator API. It applies equally to the package’s built-in workflows under src/thesis/workflows/ and to out-of-tree scripts you run with --script or auto-discover from paths.scripts_dir.

A workflow is a single factory function that builds and returns a raw nipype.Workflow. You decorate it to:

  • give it a name and metadata (@workflow),

  • declare the inputs it needs and where they come from (@requires),

  • declare the outputs it writes (@produces), and

  • (optionally) attach preflight checks (@verify).

The framework does the rest: it resolves your declared paths into pathlib.Path objects, injects them as keyword arguments, runs your preflight checks, registers the workflow so the CLI can find it, and hands the returned graph to the Nipype executor. Your factory never calls .run() — it only constructs the graph.

Tip

If you are porting an existing standalone Nipype script into this framework, read the step-by-step tutorial first: Adapting a standalone Nipype script. It maps every “messy real-world” pattern (hardcoded paths, eager .run(), sys.modules hacks) onto the idiom described here.

TL;DR#

# Run an out-of-tree script directly:
thesis run --script ./my_workflow.py -p DTI_001 -c default
# config/default.yaml — or auto-discover a whole directory of scripts:
paths:
  scripts_dir: ./my_workflows
thesis list-workflows          # discovers everything in paths.scripts_dir
thesis run -w my_workflow -p DTI_001 -c default

The four decorators#

All four live in thesis.core.decorators. Three of them — @requires, @produces, @verify — are pure metadata attachers: they accumulate declarations on the function and have no other side effect. Only @workflow does real work (it builds the adapter and registers the workflow).

Important

Decoration order matters in exactly one way: @workflow must be the outermost decorator. The inner three (@requires, @produces, @verify) may appear in any order relative to each other.

@workflow(name="my_workflow", ...)   # OUTERMOST — the only one with side effects
@requires(t1=PatientFile(...))       # inner three: any order
@produces(out_dir=OutputDir(...))
@verify(_check_inputs)
def build_workflow(*, t1, out_dir, config, context):
    ...

@workflow(...) — register the workflow#

The outermost decorator. It reads the metadata the inner decorators attached, synthesises a (config, context) -> Workflow adapter, builds a composite verifier, and registers the result in WORKFLOW_REGISTRY at import time. It returns the original function unchanged.

@workflow(
    name="my_workflow",            # required: unique CLI id / registry key
    description="One-liner shown by `thesis list-workflows`.",
    protocol="hcp",                # optional: default protocol (-> default_protocol)
    default_config="my_workflow",  # optional: default config when -c is omitted
    scope="patient",               # "patient" (default) or "cohort"
    config_namespace="my_workflow",# optional: own a top-level YAML key (see below)
    config_schema=MyWorkflowConfig,# optional: Pydantic schema for that key
)

Parameter

Effect

name

Required. Unique registry key; thesis run -w <name> and --script dispatch by it.

description

Shown by thesis list-workflows.

protocol

Becomes WorkflowEntry.default_protocol; the config protocol layer used when --protocol is omitted. Defaults to None.

default_config

The default -c config name when the user omits it.

scope

"patient" (default) or "cohort". scope="cohort" sets is_cohort_level=True, so the CLI ignores -p/--patient-id and --all and dispatches a single cohort run.

config_namespace

A top-level YAML key this workflow owns (see Owning a config section).

config_schema

A thesis.core.config.validators.BaseConfig subclass that validates that key. Requires config_namespace.

Errors and validation at decoration time:

  • ValueError if scope is neither "patient" nor "cohort".

  • TypeError if config_schema is set without config_namespace.

  • TypeError (scope/declaration mismatch): a scope="cohort" workflow may not declare a PatientFile/PatientDir requirement, and a scope="patient" workflow may not declare a CohortDir output.

  • With THESIS_STRICT_REGISTRY=1, a duplicate workflow name or duplicate config namespace raises ValueError instead of warning — useful for catching accidental double-imports in tests.

@requires(**path_decls) — declare inputs#

Each keyword names a keyword argument your factory body will receive, mapped to a path declaration. The adapter resolves each declaration against the active ProcessingContext and injects the resulting Path (or None, or list[Path], depending on the declaration type).

@requires(
    t1=PatientFile(default="T1w/T1w_acpc_dc_restore.nii.gz"),
    diffusion=PatientDir(default="T1w/Diffusion"),
)
def build_workflow(*, t1: Path, diffusion: Path, ...): ...

Every non-optional @requires declaration also generates an implicit existence check that runs before your workflow is built. There is no boilerplate to write for “the input file must exist”.

@produces(**path_decls) — declare outputs#

Same shape as @requires, but for outputs. Output declarations generate no existence check (the file does not exist yet). The adapter still resolves them — and OutputDir/CohortDir create the directory (mkdir(parents=True, exist_ok=True)) on resolve — then injects the resulting Path.

@produces(out_dir=OutputDir("my_workflow"))   # -> context.output_dir / "my_workflow", created
def build_workflow(*, out_dir: Path, ...): ...

@verify(*checks) — custom preflight checks#

Attaches one or more preflight callables. Each returns a list[str] of human-readable error messages (an empty list means “all clear”). They run after the implicit existence checks from @requires.

A verifier may use either signature:

  • The historical (config, context) -> list[str].

  • An opt-in form that also receives resolved declared values: add a **kwargs catch-all (it receives all declared values keyed by their declaration name), or name a parameter exactly after a declared key.

def _check_t1_is_nifti(config, context, **kwargs) -> list[str]:
    t1: Path = kwargs["t1"]            # same key as @requires(t1=...)
    if t1 is not None and t1.suffix not in {".gz", ".nii"}:
        return [f"t1 must be NIfTI, got {t1.suffix!r}"]
    return []

Note

When a workflow has no required inputs and no @verify checks, the composite verifier is None (the “no verifier” convention). Otherwise it resolves all declared paths once, runs the existence checks, then dispatches your explicit checks.


Path declarations#

Path declarations (thesis.core.path_declarations) describe where an input comes from or where an output goes — declaratively, without hardcoding absolute paths. The @workflow adapter calls each declaration’s resolve() against the ProcessingContext and passes the result to your body.

The most common ones:

Declaration

Resolves to

Anchored at

Implicit existence check

PatientFile

Path or None

context.input_dir

must exist + be a file, unless optional

PatientDir

Path or None

context.input_dir

must exist + be a dir, unless optional

OutputDir

Path (created)

context.output_dir / subdir

none

WorkingFile

Path

context.working_dir (valid in patient and cohort scope)

none

CohortDir

Path (created)

context.output_dir / subdir (cohort root)

none

PriorOutput

Path or list[Path] (glob)

context.output_dir [/ subdir]

exists/file, or glob ≥1; unless optional

DataFile

Path

context.data_dir (traversal-guarded)

exists + file, unless optional

DataDir

Path

context.data_dir (traversal-guarded)

exists + dir, unless optional

ExternalFile

Path or None

config value; absolute/~/$ENV/base_dirs

exists + file, unless optional

GlobMatch

list[Path]

primary_dir (or input_dir) + fallback_dirs

len min_matches (default 1), unless optional

GlobGroup

GlobGroupResult

shared base dir; group falls through together

each item non-empty, unless optional

ConfigList

list[ConfigListItem]

YAML list at config_path

per-item file/dir existence, unless optional

CohortPatients

list[CohortPatient]

root_dir (or context.output_dir)

len min_patients, unless optional

See docs/api/core.path_declarations for the exhaustive list and per-field reference.

Where a value comes from: the config chain#

PatientFile, PatientDir, and ExternalFile let you point at one or more config keys, falling back to a default:

  • config_paths (a str or list of dotted keys) is consulted in order; config_path is appended to the chain if not already present.

  • The first non-None config value wins; if all are absent, the declaration’s default is used.

  • A declaration must supply at least one of default, config_path, config_paths, or optional=True — otherwise ConfigurationError is raised at construction.

t1=PatientFile(
    default="T1w/T1w_acpc_dc_restore.nii.gz",   # last-resort fallback
    config_paths=["my_workflow.t1_image", "hcp.t1_image"],  # first non-None wins
)

{patient_id} templating#

Filenames, directory names, subdir names, and glob patterns support the {patient_id} placeholder, which is substituted from context.patient_id:

dwi_ap=PatientFile(default="{patient_id}_dmri_AP.nii.gz")

Any other {placeholder} raises KeyError — only {patient_id} is valid.

fallback_dirs#

When set on PatientFile/PatientDir, fallback_dirs becomes the complete search order — "input_dir" is no longer implied, so include it explicitly if you want it. Valid names: "input_dir", "output_dir", "working_dir", "data_dir", ".".

mask=PatientFile(
    default="brain_mask.nii.gz",
    fallback_dirs=["input_dir", "output_dir", "data_dir"],  # searched in this order
)

A complete, annotated example#

This is the smallest correct decorated workflow that exercises every decorator and the most common declarations. Save it as my_workflow.py and run it out-of-tree.

"""my_workflow.py — a minimal correct decorator-based thesis workflow.

Run it out-of-tree:
    thesis run --script ./my_workflow.py -p P001 -c default
Or place it in paths.scripts_dir and it appears in `thesis list-workflows`.
"""

from pathlib import Path

from nipype import Node, Workflow
from nipype.interfaces.utility import Function

from thesis.core.config import PipelineConfig            # type-only / optional
from thesis.core.context import ProcessingContext        # type-only / optional
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile


# --- Function-node body: runs in a SEPARATE process -------------------------
# Rule: NO loguru here (it can't be pickled). Use print(). All imports LOCAL.
def _copy_header_note(t1_path: str, out_dir: str, patient_id: str) -> str:
    from pathlib import Path  # local import is mandatory inside Function nodes

    note = Path(out_dir) / f"{patient_id}_note.txt"
    note.parent.mkdir(parents=True, exist_ok=True)
    note.write_text(f"processed {t1_path}\n", encoding="utf-8")
    print(f"[my_workflow] wrote {note}")  # NOT logger.* — print only
    return str(note)


# --- Optional custom preflight check ----------------------------------------
# Opts into resolved kwargs via **kwargs. Returns [] on success, else error
# strings. Runs AFTER the implicit existence check from @requires.
def _check_t1_is_nifti(config, context, **kwargs) -> list[str]:
    t1: Path = kwargs["t1"]                  # same name as the @requires key
    if t1 is not None and t1.suffix not in {".gz", ".nii"}:
        return [f"t1 must be NIfTI, got {t1.suffix!r}"]
    return []


# --- The workflow ------------------------------------------------------------
# DECORATION ORDER: @workflow MUST be outermost. Inner three: any order.
@workflow(
    name="my_workflow",               # unique CLI id (registry key)
    description="One-line summary for `thesis list-workflows`.",
    protocol="hcp",                   # -> WorkflowEntry.default_protocol (optional)
    scope="patient",                  # "patient" (default) or "cohort"
)
@requires(
    # Injected as a keyword-only `t1: Path`. Resolves config_paths in order
    # (first non-None wins), else `default`, anchored at input_dir. Generates an
    # IMPLICIT existence check (the file must exist) unless optional=True.
    t1=PatientFile(
        default="T1w/T1w_acpc_dc_restore.nii.gz",
        config_paths=["my_workflow.t1_image", "hcp.t1_image"],
    ),
)
@produces(
    # Injected as `out_dir: Path`. Created on resolve. No existence check.
    out_dir=OutputDir("my_workflow"),     # -> context.output_dir / "my_workflow"
)
@verify(_check_t1_is_nifti)               # runs AFTER the implicit @requires check
def build_workflow(
    *,                                    # body is keyword-only
    t1: Path,                             # from @requires(t1=...)
    out_dir: Path,                        # from @produces(out_dir=...)
    config: PipelineConfig,               # injected ONLY because declared here
    context: ProcessingContext,           # injected ONLY because declared here
) -> Workflow:
    """Build the workflow. Return a raw nipype.Workflow — no base class."""
    wf = Workflow(name=f"my_workflow_{context.patient_id}")

    node = Node(
        Function(
            input_names=["t1_path", "out_dir", "patient_id"],
            output_names=["out_file"],
            function=_copy_header_note,
        ),
        name="note",
    )
    node.inputs.t1_path = str(t1)
    node.inputs.out_dir = str(out_dir)
    node.inputs.patient_id = context.patient_id

    wf.add_nodes([node])
    return wf  # framework executes it; never wf.run()

The smallest in-tree example is src/thesis/workflows/minimal.py (no @requires, no @verify, a single @produces(out_dir=OutputDir("")) — note that OutputDir("") resolves to context.output_dir itself). A complete runnable out-of-tree example lives at examples/user_workflow_example.py.

What the body receives#

The body is keyword-only (*,). Declare only the kwargs you actually use:

  • One keyword per @requires / @produces key, carrying the resolved value.

  • config: PipelineConfig and/or context: ProcessingContext are injected only if your signature names them. The adapter inspects the signature and forwards them on demand.

The body returns a raw nipype.Workflow. There is no base class to inherit — BaseModule, BasePipelineStep, BasePipeline, and NipypeStep do not exist. Just build nipype.Node(...) objects, wire them with wf.connect(...), and return wf.


ProcessingContext: the state carrier#

ProcessingContext (thesis.core.context) is created by the CLI before your workflow is built and carries everything a workflow needs:

Field

Meaning

patient_id

The -p value ("cohort" for cohort-scope workflows).

config

The merged PipelineConfig.

data_dir

Cohort-shared data root (anchors DataFile/DataDir).

input_dir

Per-patient input root (anchors PatientFile/PatientDir).

output_dir

Per-patient (or cohort) output root (anchors OutputDir, etc.).

working_dir

Nipype working directory (anchors WorkingFile).

metadata / results

Free-form dicts for cross-stage handoffs.

output_dir and working_dir are resolved and created automatically when the context is constructed. Path declarations call the context’s traversal-guarded helpers (get_input_path, get_output_path, get_working_path) — you rarely touch these directly; the declarations do it for you. Inside the body, the most common direct use is context.patient_id (e.g. for a unique workflow name).


inputnode / outputnode contracts (for composability)#

A standalone single-patient workflow runs as-is. To embed it in a meta-pipeline (full_pipeline, tract_synthseg), it must publish a stable I/O contract so the meta-workflow can wire it contract-to-contract rather than reaching into internal node names.

Use the three helpers from thesis.core.contracts:

  • attach_inputnode(wf, fields, defaults=...) — create an IdentityInterface inputnode. defaults lets a standalone run resolve statically (a None default is skipped so the trait stays Undefined); a meta-workflow edge overrides a field at run time. A defaults key not in fields raises ValueError.

  • attach_outputnode(wf, fields) — create the outputnode.

  • fan_out(wf, inputnode, field, targets) — connect one inputnode field to several internal consumer (node, port) targets.

Published field-name tuples (so both ends agree on spelling) live in thesis.core.contracts, e.g. ROI_OUTPUT_FIELDS = ("roi_seed", "roi_stop", "roi_avoid", "roi_target").

from thesis.core.contracts import attach_inputnode, attach_outputnode, fan_out

inputnode = attach_inputnode(wf, ["moving_image"], defaults={"moving_image": str(t1)})
fan_out(wf, inputnode, "moving_image", [(reg_node, "moving_image")])

outputnode = attach_outputnode(wf, ["transform", "t1_brain"])
wf.connect(reg_node, "out_transform", outputnode, "transform")

A meta-workflow then connects only the boundary nodes:

meta.connect(upstream_wf, "outputnode.t1_brain", downstream_wf, "inputnode.moving_image")

See nipype_integration.md for the full contracts walk-through and src/thesis/workflows/full_pipeline/_core.py for the worked reference.


Logging rule (and the Function-node exception)#

In normal module code, use loguru via the framework helper — never the stdlib logging module:

from thesis.core.logging import get_logger

logger = get_logger(__name__)

Warning

Inside a Nipype Function node body, use print() — not loguru. Function nodes run in a separate process, and the loguru logger cannot be pickled into the worker. For the same reason, all imports inside a Function node must be local to the function (see _copy_header_note in the example above, which re-imports pathlib locally).


Owning a config section#

By default, PipelineConfig rejects unknown top-level YAML keys. A workflow can claim its own top-level key without editing core/config/validators.py by passing config_namespace to @workflow:

from pydantic import Field

from thesis.core.config.validators import BaseConfig   # the schema base class


class MyWorkflowConfig(BaseConfig):       # BaseConfig forbids extra keys (catches typos)
    threshold: float = Field(default=0.5, ge=0.0, le=1.0)
    label: str = Field(default="run")


@workflow(
    name="my_workflow",
    config_namespace="my_workflow",       # owns the `my_workflow:` YAML key
    config_schema=MyWorkflowConfig,        # validates that key
)
@produces(out_dir=OutputDir("my_workflow"))
def build_workflow(*, out_dir: Path, config: PipelineConfig, context) -> Workflow:
    cfg = config.my_workflow              # typed MyWorkflowConfig instance
    ...

Now this YAML block is accepted and validated:

my_workflow:
  threshold: 0.8
  label: trial-A

How it works: the decorator registers MyWorkflowConfig under "my_workflow" in NAMESPACE_REGISTRY at import time. PipelineConfig consults that registry during validation — a top-level key matching a registered namespace is validated by its schema; an unknown key with no registered namespace is still rejected.

Note

config_schema requires config_namespace (otherwise TypeError). The schema class must subclass thesis.core.config.validators.BaseConfig (a Pydantic v2 BaseModel with extra="forbid"). There is no core/base.py workflow base class — BaseConfig here is only a config-validation base model.

Auto-derived schema (omit config_schema)#

If you set config_namespace but omit config_schema, the schema is auto-derived from the @requires config_paths that point at your own namespace. Every such key becomes an optional str field (the resolver still applies the declaration’s default when the YAML value is absent):

@workflow(name="my_workflow", config_namespace="my_workflow")  # no config_schema
@requires(t1=PatientFile(config_paths=["my_workflow.t1_image", "hcp.t1_image"]))
@produces(out_dir=OutputDir("my_workflow"))
def build_workflow(*, t1: Path, out_dir: Path, context: ProcessingContext) -> Workflow:
    ...

accepts exactly:

my_workflow:
  t1_image: /path/to/T1w.nii.gz   # optional; falls back to the PatientFile default

Limits of auto-derivation:

  • Only keys named in config_paths within your own namespace are derived; foreign-namespace paths (hcp.t1_image above) belong to hcp’s schema and are ignored here.

  • Every derived field is Optional[str]no numeric types, bounds, or cross-field validators. For those, write an explicit config_schema.


Registration and discovery#

Workflows self-register at import time (the @workflow decorator runs on import). There are two supported ways to get the framework to import your file.

paths.scripts_dir (auto-discovery)#

Point your config at a directory; the framework scans it non-recursively for *.py files (skipping names starting with _):

paths:
  scripts_dir: ./my_workflows

thesis list-workflows then lists each discovered script with a (user script: <path>) suffix (load failures are reported but do not abort); thesis run -w my_workflow ... runs it.

Config hierarchy#

User scripts go through the same five-level config merge as built-in workflows:

  1. default.yaml

  2. hardware.yaml

  3. protocol config (from @workflow(protocol=...) or --protocol)

  4. per-patient config config/patients/<patient_id>.yaml

  5. CLI flags / overrides

If your script declares config_namespace, the matching YAML block in any of those layers is validated against your schema at load time.


Common gotchas#

  • @workflow must be outermost. If an inner decorator ends up outermost, the registration never happens. The inner three may be in any order.

  • One @workflow per --script file. A file that registers zero or more than one workflow is rejected with a ClickException; split each @workflow into its own file.

  • No name collisions with built-ins. A user script that registers a name already used by a built-in (e.g. hcp, preprocess) is rejected. Pick a unique name.

  • Never call .run() in the factory. Return the Workflow; the framework executes it. Eager node.run() / wf.run() defeats the DAG scheduler and breaks dry-runs and graph rendering.

  • The returned object must be a nipype.Workflow (the CLI requires a .run attribute). Don’t return a Node, a path, or None.

  • Declare only the kwargs you use. config/context arrive only if your signature names them; a typo’d or missing declared key is a TypeError at call time.

  • print() inside Function nodes, loguru everywhere else — and keep all imports local inside Function-node bodies (they run in a separate process).

  • @requires → implicit existence check; @produces → none. A required input that does not exist fails preflight (unless optional=True); an output directory is created for you on resolve.

  • fallback_dirs replaces the default search order. When you set it on a PatientFile/PatientDir, include "input_dir" explicitly if you still want it searched.

  • Only {patient_id} is a valid template placeholder. Any other {placeholder} raises KeyError.

  • config_schema needs config_namespace (otherwise TypeError), and the schema must subclass thesis.core.config.validators.BaseConfig.

  • Cohort vs patient scope is validated. A scope="cohort" workflow may not declare PatientFile/PatientDir; a scope="patient" workflow may not declare CohortDir. Use the scope-appropriate declarations (CohortDir, CohortPatients, DataFile, …).


Runnable examples#

Every example below is a real, runnable file in the repository. Examples without a @requires input can be exercised without real patient data via --dry-run, which builds the graph and runs preflight checks but does not execute the pipeline; an example with a required input surfaces that existence check under --dry-run instead (see the user_workflow_example.py row).

The commands use -c default to match the rest of these guides — that is your local config, set up by copying config/default.example.yaml. On a stock checkout where only the *.example.yaml configs are tracked, substitute -c default.example (this is exactly why example_nipype_workflow.py loads default.example internally, so it runs straight from a fresh clone).

Example

What it shows

How to run it

src/thesis/workflows/minimal.py

The smallest in-tree workflow — a single @produces(out_dir=OutputDir("")), no @requires, no @verify.

thesis run -w minimal -p P001 -c default --protocol hcp --dry-run

examples/plain_nipype_workflow.py + examples/annotated_nipype_workflow.py

A normal Nipype pipeline before (plain) and after (annotated with the framework decorators). The fastest way to see exactly which annotations a plain script needs.

Before: python examples/plain_nipype_workflow.py
After: thesis run --script examples/annotated_nipype_workflow.py -p P001 -c default --dry-run

examples/user_workflow_example.py

An out-of-tree --script workflow with its own config_namespace + config_schema. Its non-optional @requires(t1=...) check runs under --dry-run, so this one needs a real T1 at data/processed/P001/T1w/... — without it the dry-run fails fast with a PreflightError, which is the point: it demonstrates @requires path resolution.

thesis run --script examples/user_workflow_example.py -p P001 -c default --dry-run

examples/example_nipype_workflow.py

Running a registered workflow programmatically from Python (no CLI) — fetch the adapter from WORKFLOW_REGISTRY and build/run it directly.

python examples/example_nipype_workflow.py

The plain/annotated pair is the quickest way to read the diff between an ordinary Nipype script and its framework-annotated form: run examples/plain_nipype_workflow.py to see the un-annotated “before”, then compare it line-for-line with examples/annotated_nipype_workflow.py, the decorated “after” that the CLI can discover and dry-run. The full step-by-step rationale for that conversion is in Adapting a standalone Nipype script.

Note

Running a workflow programmatically (as in examples/example_nipype_workflow.py) goes through the registry adapter, not the decorated factory directly. The @workflow decorator returns your original keyword-only function unchanged, so calling build_workflow(config, context) raises TypeError. Fetch the registered (config, context) -> Workflow adapter instead:

import thesis.workflows.minimal  # noqa: F401 — import for @workflow side effect
from thesis.core.registry import WORKFLOW_REGISTRY

entry = WORKFLOW_REGISTRY.get("minimal")   # WorkflowEntry with .factory / .verifier
wf = entry.factory(config, context)        # adapter resolves @requires/@produces paths

Where to look next#