Workflow Decorators#
The @workflow, @requires, @produces, and @verify decorators
wrap a workflow factory so it self-registers with
thesis.core.registry.WORKFLOW_REGISTRY and exposes declarative path
metadata used for both runtime resolution and preflight verification.
Quick reference#
Decorator |
Purpose |
|---|---|
|
Outermost. Registers a |
|
Declares input paths. Each kwarg names a kwarg the workflow body receives. Implicit existence checks are generated. |
|
Declares output paths. Resolution creates directories. No implicit existence check is generated. |
|
Attaches preflight-check callables of signature
|
Minimal example#
from nipype import Workflow
from thesis.core.decorators import produces, workflow
from thesis.core.path_declarations import OutputDir
@workflow(name="hello", description="Smallest possible workflow.")
@produces(out_dir=OutputDir(""))
def build_workflow(*, out_dir, context):
wf = Workflow(name=f"hello_{context.patient_id}")
# ... add nodes ...
return wf
Full example#
from pathlib import Path
from nipype import Workflow
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile
def _check_t1_modality(config, context, **kwargs) -> list[str]:
t1: Path = kwargs["t1"]
if t1.suffix not in {".gz", ".nii"}:
return [f"t1 must be NIfTI, got {t1.suffix!r}"]
return []
@workflow(
name="my_workflow",
description="Example workflow.",
protocol="my_workflow",
scope="patient",
)
@requires(
t1=PatientFile(
default="T1w/T1w_acpc_dc_restore.nii.gz",
config_paths=["my_workflow.t1_image", "hcp.t1_image"],
),
)
@produces(out_dir=OutputDir("my_workflow"))
@verify(_check_t1_modality)
def build_workflow(
*,
t1: Path,
out_dir: Path,
config: PipelineConfig,
context: ProcessingContext,
) -> Workflow:
return Workflow(name=f"my_workflow_{context.patient_id}")
Cohort-scope workflows set scope="cohort" and use
CohortDir,
CohortPatients,
DataFile /
DataDir, or
ConfigList for their inputs and
outputs. PatientFile / PatientDir requirements are rejected at
decoration time for cohort workflows.
Setting the environment variable THESIS_STRICT_REGISTRY=1 makes
duplicate name= registrations raise ValueError instead of
overwriting silently — useful in tests that re-import workflow modules.
Module reference#
Workflow decorator API.
This module provides the small set of decorators that let workflow
authors register a workflow without manually constructing a
WorkflowEntry and without unpacking
config / context by hand.
The decorators are:
workflow()— outermost; reads accumulated metadata, synthesizes an adapter, and registers the workflow.requires()— declares input paths (PatientFile/PatientDir).produces()— declares output paths (OutputDir/WorkingFile/CohortDir).verify()— attaches preflight check callables.
The inner three decorators are pure metadata-attachers. @workflow is
the only decorator with side effects.
Example
A complete patient-scope workflow that requires a T1, produces an output subdirectory, and adds a custom preflight check:
from pathlib import Path
from nipype import Workflow
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile
def _check_modality(config, context, **kwargs) -> list[str]:
t1: Path = kwargs["t1"]
if t1.suffix not in {".gz", ".nii"}:
return [f"t1 must be NIfTI, got {t1.suffix!r}"]
return []
@workflow(
name="my_workflow",
description="One-line summary shown by `thesis list-workflows`.",
protocol="my_workflow",
scope="patient",
)
@requires(
t1=PatientFile(
default="T1w/T1w_acpc_dc_restore.nii.gz",
config_paths=["my_workflow.t1_image", "hcp.t1_image"],
),
)
@produces(out_dir=OutputDir("my_workflow"))
@verify(_check_modality)
def build_workflow(
*,
t1: Path,
out_dir: Path,
config: PipelineConfig,
context: ProcessingContext,
) -> Workflow:
return Workflow(name=f"my_workflow_{context.patient_id}")
Decoration order: @workflow must be outermost; the inner
metadata decorators can appear in any order. Resolved paths are
injected as keyword-only arguments named after the decorator keys.
config / context are forwarded only when the body declares
them.
See markdowns/plans/workflow-decorator/01-design-spec.md for the
full design rationale.
- thesis.core.decorators.workflow(name, *, description='', protocol=None, default_config=None, scope='patient', config_namespace=None, config_schema=None, **metadata)[source]#
Register a function as a workflow factory.
Must be the outermost decorator. Reads the accumulated
__thesis_meta__attached by@requires/@produces/@verify, synthesizes a(config, context) -> Workflowadapter and a composite verifier, and registers aWorkflowEntrywithWORKFLOW_REGISTRY.- Parameters:
name (
str) – CLI identifier; must be unique in the registry.description (
str) – Human-readable description forthesis list-workflows.protocol (
Optional[str]) – Default protocol name (WorkflowEntry.default_protocol).default_config (
Optional[str]) – Default config name when-cis omitted.scope (
Literal['patient','cohort']) –"patient"(default) or"cohort"."cohort"setsis_cohort_level=Trueand rejects per-patient path declarations at decoration time.config_namespace (
Optional[str]) – Optional top-level YAML key the workflow owns. When set, registers a Pydantic schema under this key withNAMESPACE_REGISTRYsoPipelineConfigcan validate the section withoutcore/config/validators.pyknowing about it.config_schema (
Optional[type]) – OptionalBaseConfigsubclass that defines the schema for config_namespace. When omitted (but config_namespace is set), the schema is auto-derived from@requiresconfig_paths viaderive_namespace_model().**metadata (
Any) – Reserved for future extensions (ignored).
- Return type:
- Returns:
The original decorated function, unmodified except for the
__thesis_meta__attribute that lower decorators attached.- Raises:
TypeError – When
scope="cohort"and@requiresdeclares aPatientFile/PatientDir, whenscope="patient"and@producesdeclares aCohortDir, or whenconfig_schemais set withoutconfig_namespace.ValueError – When
THESIS_STRICT_REGISTRY=1and name is already registered, or the namespace is already registered.
- thesis.core.decorators.requires(**path_decls)[source]#
Declare input paths for a workflow.
Each keyword argument names a kwarg that the decorated workflow body will receive, mapped to a
PathDeclaration. Later@requiresdecorations override earlier same-name entries.
- thesis.core.decorators.produces(**path_decls)[source]#
Declare output paths for a workflow.
Output declarations don’t generate implicit existence checks. The adapter still resolves them (creating directories as needed) and injects the resulting
pathlib.Pathas a kwarg.
- thesis.core.decorators.verify(*checks)[source]#
Attach preflight check callables to a workflow.
Each check has signature
(config, context) -> list[str]and returns an empty list on success or a list of human-readable error strings on failure. Checks run after the implicit existence checks generated from@requiresdeclarations.