Workflow Decorators#

The @workflow, @requires, @produces, and @verify decorators wrap a workflow factory so it self-registers with thesis.core.registry.WORKFLOW_REGISTRY and exposes declarative path metadata used for both runtime resolution and preflight verification.

Quick reference#

Decorator

Purpose

@workflow(...)

Outermost. Registers a WorkflowEntry with the registry and synthesises a (config, context) -> Workflow adapter.

@requires(...)

Declares input paths. Each kwarg names a kwarg the workflow body receives. Implicit existence checks are generated.

@produces(...)

Declares output paths. Resolution creates directories. No implicit existence check is generated.

@verify(*fns)

Attaches preflight-check callables of signature (config, context, **kwargs) -> list[str].

Minimal example#

from nipype import Workflow

from thesis.core.decorators import produces, workflow
from thesis.core.path_declarations import OutputDir


@workflow(name="hello", description="Smallest possible workflow.")
@produces(out_dir=OutputDir(""))
def build_workflow(*, out_dir, context):
    wf = Workflow(name=f"hello_{context.patient_id}")
    # ... add nodes ...
    return wf

Full example#

from pathlib import Path

from nipype import Workflow

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile


def _check_t1_modality(config, context, **kwargs) -> list[str]:
    t1: Path = kwargs["t1"]
    if t1.suffix not in {".gz", ".nii"}:
        return [f"t1 must be NIfTI, got {t1.suffix!r}"]
    return []


@workflow(
    name="my_workflow",
    description="Example workflow.",
    protocol="my_workflow",
    scope="patient",
)
@requires(
    t1=PatientFile(
        default="T1w/T1w_acpc_dc_restore.nii.gz",
        config_paths=["my_workflow.t1_image", "hcp.t1_image"],
    ),
)
@produces(out_dir=OutputDir("my_workflow"))
@verify(_check_t1_modality)
def build_workflow(
    *,
    t1: Path,
    out_dir: Path,
    config: PipelineConfig,
    context: ProcessingContext,
) -> Workflow:
    return Workflow(name=f"my_workflow_{context.patient_id}")

Cohort-scope workflows set scope="cohort" and use CohortDir, CohortPatients, DataFile / DataDir, or ConfigList for their inputs and outputs. PatientFile / PatientDir requirements are rejected at decoration time for cohort workflows.

Setting the environment variable THESIS_STRICT_REGISTRY=1 makes duplicate name= registrations raise ValueError instead of overwriting silently — useful in tests that re-import workflow modules.

Module reference#

Workflow decorator API.

This module provides the small set of decorators that let workflow authors register a workflow without manually constructing a WorkflowEntry and without unpacking config / context by hand.

The decorators are:

  • workflow() — outermost; reads accumulated metadata, synthesizes an adapter, and registers the workflow.

  • requires() — declares input paths (PatientFile / PatientDir).

  • produces() — declares output paths (OutputDir / WorkingFile / CohortDir).

  • verify() — attaches preflight check callables.

The inner three decorators are pure metadata-attachers. @workflow is the only decorator with side effects.

Example

A complete patient-scope workflow that requires a T1, produces an output subdirectory, and adds a custom preflight check:

from pathlib import Path

from nipype import Workflow

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile


def _check_modality(config, context, **kwargs) -> list[str]:
    t1: Path = kwargs["t1"]
    if t1.suffix not in {".gz", ".nii"}:
        return [f"t1 must be NIfTI, got {t1.suffix!r}"]
    return []


@workflow(
    name="my_workflow",
    description="One-line summary shown by `thesis list-workflows`.",
    protocol="my_workflow",
    scope="patient",
)
@requires(
    t1=PatientFile(
        default="T1w/T1w_acpc_dc_restore.nii.gz",
        config_paths=["my_workflow.t1_image", "hcp.t1_image"],
    ),
)
@produces(out_dir=OutputDir("my_workflow"))
@verify(_check_modality)
def build_workflow(
    *,
    t1: Path,
    out_dir: Path,
    config: PipelineConfig,
    context: ProcessingContext,
) -> Workflow:
    return Workflow(name=f"my_workflow_{context.patient_id}")

Decoration order: @workflow must be outermost; the inner metadata decorators can appear in any order. Resolved paths are injected as keyword-only arguments named after the decorator keys. config / context are forwarded only when the body declares them.

See markdowns/plans/workflow-decorator/01-design-spec.md for the full design rationale.

thesis.core.decorators.workflow(name, *, description='', protocol=None, default_config=None, scope='patient', config_namespace=None, config_schema=None, **metadata)[source]#

Register a function as a workflow factory.

Must be the outermost decorator. Reads the accumulated __thesis_meta__ attached by @requires / @produces / @verify, synthesizes a (config, context) -> Workflow adapter and a composite verifier, and registers a WorkflowEntry with WORKFLOW_REGISTRY.

Parameters:
  • name (str) – CLI identifier; must be unique in the registry.

  • description (str) – Human-readable description for thesis list-workflows.

  • protocol (Optional[str]) – Default protocol name (WorkflowEntry.default_protocol).

  • default_config (Optional[str]) – Default config name when -c is omitted.

  • scope (Literal['patient', 'cohort']) – "patient" (default) or "cohort". "cohort" sets is_cohort_level=True and rejects per-patient path declarations at decoration time.

  • config_namespace (Optional[str]) – Optional top-level YAML key the workflow owns. When set, registers a Pydantic schema under this key with NAMESPACE_REGISTRY so PipelineConfig can validate the section without core/config/validators.py knowing about it.

  • config_schema (Optional[type]) – Optional BaseConfig subclass that defines the schema for config_namespace. When omitted (but config_namespace is set), the schema is auto-derived from @requires config_paths via derive_namespace_model().

  • **metadata (Any) – Reserved for future extensions (ignored).

Return type:

Callable[[Callable[..., Any]], Callable[..., Any]]

Returns:

The original decorated function, unmodified except for the __thesis_meta__ attribute that lower decorators attached.

Raises:
  • TypeError – When scope="cohort" and @requires declares a PatientFile / PatientDir, when scope="patient" and @produces declares a CohortDir, or when config_schema is set without config_namespace.

  • ValueError – When THESIS_STRICT_REGISTRY=1 and name is already registered, or the namespace is already registered.

thesis.core.decorators.requires(**path_decls)[source]#

Declare input paths for a workflow.

Each keyword argument names a kwarg that the decorated workflow body will receive, mapped to a PathDeclaration. Later @requires decorations override earlier same-name entries.

Parameters:

path_decls (PathDeclaration)

Return type:

Callable[[Callable[..., Any]], Callable[..., Any]]

thesis.core.decorators.produces(**path_decls)[source]#

Declare output paths for a workflow.

Output declarations don’t generate implicit existence checks. The adapter still resolves them (creating directories as needed) and injects the resulting pathlib.Path as a kwarg.

Parameters:

path_decls (PathDeclaration)

Return type:

Callable[[Callable[..., Any]], Callable[..., Any]]

thesis.core.decorators.verify(*checks)[source]#

Attach preflight check callables to a workflow.

Each check has signature (config, context) -> list[str] and returns an empty list on success or a list of human-readable error strings on failure. Checks run after the implicit existence checks generated from @requires declarations.

Parameters:

checks (Callable[..., List[str]])

Return type:

Callable[[Callable[..., Any]], Callable[..., Any]]