"""Workflow decorator API.
This module provides the small set of decorators that let workflow
authors register a workflow without manually constructing a
:class:`~thesis.core.registry.WorkflowEntry` and without unpacking
``config`` / ``context`` by hand.
The decorators are:
- :func:`workflow` — outermost; reads accumulated metadata, synthesizes
an adapter, and registers the workflow.
- :func:`requires` — declares input paths (``PatientFile`` / ``PatientDir``).
- :func:`produces` — declares output paths (``OutputDir`` / ``WorkingFile``
/ ``CohortDir``).
- :func:`verify` — attaches preflight check callables.
The inner three decorators are pure metadata-attachers. ``@workflow`` is
the only decorator with side effects.
Example:
A complete patient-scope workflow that requires a T1, produces an
output subdirectory, and adds a custom preflight check::
from pathlib import Path
from nipype import Workflow
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile
def _check_modality(config, context, **kwargs) -> list[str]:
t1: Path = kwargs["t1"]
if t1.suffix not in {".gz", ".nii"}:
return [f"t1 must be NIfTI, got {t1.suffix!r}"]
return []
@workflow(
name="my_workflow",
description="One-line summary shown by `thesis list-workflows`.",
protocol="my_workflow",
scope="patient",
)
@requires(
t1=PatientFile(
default="T1w/T1w_acpc_dc_restore.nii.gz",
config_paths=["my_workflow.t1_image", "hcp.t1_image"],
),
)
@produces(out_dir=OutputDir("my_workflow"))
@verify(_check_modality)
def build_workflow(
*,
t1: Path,
out_dir: Path,
config: PipelineConfig,
context: ProcessingContext,
) -> Workflow:
return Workflow(name=f"my_workflow_{context.patient_id}")
Decoration order: ``@workflow`` must be **outermost**; the inner
metadata decorators can appear in any order. Resolved paths are
injected as keyword-only arguments named after the decorator keys.
``config`` / ``context`` are forwarded only when the body declares
them.
See ``markdowns/plans/workflow-decorator/01-design-spec.md`` for the
full design rationale.
"""
from __future__ import annotations
import inspect
import os
from dataclasses import dataclass, field
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Literal,
Optional,
)
from thesis.core.logging import get_logger
from thesis.core.path_declarations import (
CohortDir,
PathDeclaration,
PatientDir,
PatientFile,
)
from thesis.core.registry import WORKFLOW_REGISTRY, WorkflowEntry
if TYPE_CHECKING: # pragma: no cover - typing only
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
logger = get_logger(__name__)
__all__ = ["workflow", "requires", "produces", "verify"]
# A verifier callable. The historical signature is
# ``(config, context) -> list[str]``; from Phase A.3 onwards verifiers
# may also accept resolved declared kwargs (either via a matching named
# parameter or a ``**kwargs`` catch-all). The adapter inspects the
# signature at decoration time and chooses the call form.
VerifierFn = Callable[..., List[str]]
@dataclass
class _Meta:
"""Accumulated metadata attached to a decorated function as
``fn.__thesis_meta__``."""
requires: Dict[str, PathDeclaration] = field(default_factory=dict)
produces: Dict[str, PathDeclaration] = field(default_factory=dict)
verifiers: List[VerifierFn] = field(default_factory=list)
def _meta(fn: Callable[..., Any]) -> _Meta:
"""Return (creating if needed) the ``_Meta`` instance attached to *fn*."""
meta = getattr(fn, "__thesis_meta__", None)
if meta is None:
meta = _Meta()
setattr(fn, "__thesis_meta__", meta)
return meta
# ---------------------------------------------------------------------------
# Inner decorators (metadata-only)
# ---------------------------------------------------------------------------
[docs]
def requires(**path_decls: PathDeclaration) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Declare input paths for a workflow.
Each keyword argument names a kwarg that the decorated workflow body
will receive, mapped to a :class:`~thesis.core.path_declarations.PathDeclaration`.
Later ``@requires`` decorations override earlier same-name entries.
"""
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
meta = _meta(fn)
for name, decl in path_decls.items():
meta.requires[name] = decl
return fn
return decorator
[docs]
def produces(**path_decls: PathDeclaration) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Declare output paths for a workflow.
Output declarations don't generate implicit existence checks. The
adapter still resolves them (creating directories as needed) and
injects the resulting :class:`pathlib.Path` as a kwarg.
"""
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
meta = _meta(fn)
for name, decl in path_decls.items():
meta.produces[name] = decl
return fn
return decorator
[docs]
def verify(*checks: VerifierFn) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Attach preflight check callables to a workflow.
Each check has signature ``(config, context) -> list[str]`` and
returns an empty list on success or a list of human-readable error
strings on failure. Checks run after the implicit existence checks
generated from ``@requires`` declarations.
"""
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
meta = _meta(fn)
meta.verifiers.extend(checks)
return fn
return decorator
# ---------------------------------------------------------------------------
# Helper: scope validation
# ---------------------------------------------------------------------------
def _validate_scope(name: str, scope: str, meta: _Meta) -> None:
"""Reject declarations that conflict with the requested scope."""
if scope == "cohort":
for kw, decl in meta.requires.items():
if isinstance(decl, (PatientFile, PatientDir)):
raise TypeError(
f"Workflow '{name}' has scope='cohort' but @requires "
f"declares per-patient input '{kw}'. Cohort-scope "
f"workflows must not declare PatientFile / PatientDir "
f"inputs (they have no patient to bind to)."
)
if scope == "patient":
for kw, decl in meta.produces.items():
if isinstance(decl, CohortDir):
raise TypeError(
f"Workflow '{name}' has scope='patient' but @produces "
f"declares CohortDir '{kw}'. Use OutputDir for "
f"per-patient outputs."
)
# ---------------------------------------------------------------------------
# Helper: verifier synthesis
# ---------------------------------------------------------------------------
def _call_verifier(
fn: Callable[..., List[str]],
config: "PipelineConfig",
context: "ProcessingContext",
declared: Dict[str, Any],
) -> List[str]:
"""Invoke a user verifier, opting it into kwarg injection if the
signature accepts it.
Backward-compatible: a verifier whose signature is the historical
``(config, context)`` is called unchanged. A verifier that declares
``**kwargs`` or names matching any declared kwarg receives the
resolved declared values too.
"""
try:
sig = inspect.signature(fn)
except (TypeError, ValueError):
return fn(config, context)
has_var_kw = any(p.kind is inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values())
if has_var_kw:
return fn(config, context, **declared)
accepted: Dict[str, Any] = {
name: value for name, value in declared.items() if name in sig.parameters
}
if accepted:
return fn(config, context, **accepted)
return fn(config, context)
def _build_verifier(meta: _Meta) -> Optional[Callable[..., List[str]]]:
"""Compose the implicit existence checks with explicit ``@verify`` checks.
Returns ``None`` when there is nothing to verify, matching today's
convention for workflows that don't carry a verifier.
The composite verifier mirrors the adapter: it resolves every
declared path once, runs implicit existence checks, then dispatches
each explicit verifier through :func:`_call_verifier` which forwards
the resolved kwargs when the verifier opts in via its signature.
"""
implicit_targets = list(meta.requires.items())
# A declaration contributes an implicit check only when it carries an
# ``optional`` flag that is currently ``False``. Output-only types
# (``OutputDir``, ``WorkingFile``, ``CohortDir``) have no ``optional``
# flag and their ``existence_errors`` returns ``[]`` — they should
# not by themselves force a verifier into existence.
has_implicit = any(
hasattr(decl, "optional") and not getattr(decl, "optional") for _, decl in implicit_targets
)
if not has_implicit and not meta.verifiers:
return None
explicit_checks = list(meta.verifiers)
all_decls = list(meta.requires.items()) + list(meta.produces.items())
def composite(config: "PipelineConfig", context: "ProcessingContext") -> List[str]:
errors: List[str] = []
for name, decl in implicit_targets:
errors.extend(decl.existence_errors(config, context, name))
if explicit_checks:
declared: Dict[str, Any] = {
name: decl.resolve(config, context) for name, decl in all_decls
}
for check in explicit_checks:
errors.extend(_call_verifier(check, config, context, declared))
return errors
return composite
# ---------------------------------------------------------------------------
# Helper: adapter synthesis
# ---------------------------------------------------------------------------
def _build_adapter(
fn: Callable[..., Any], meta: _Meta
) -> Callable[["PipelineConfig", "ProcessingContext"], Any]:
"""Wrap *fn* in a ``(config, context) -> Workflow`` adapter.
The adapter resolves declared paths against the
:class:`ProcessingContext` and injects them — plus ``config`` /
``context`` if the wrapped function's signature accepts them — as
keyword arguments.
"""
sig = inspect.signature(fn)
params = sig.parameters
accepts_config = "config" in params
accepts_context = "context" in params
def adapter(config: "PipelineConfig", context: "ProcessingContext") -> Any:
kwargs: Dict[str, Any] = {}
for name, decl in meta.requires.items():
kwargs[name] = decl.resolve(config, context)
for name, decl in meta.produces.items():
kwargs[name] = decl.resolve(config, context)
if accepts_config:
kwargs["config"] = config
if accepts_context:
kwargs["context"] = context
return fn(**kwargs)
# Pin the adapter's introspected signature to (config, context). Without
# this, inspect.signature() follows __wrapped__ to the underlying
# kwarg-only function, which breaks cli.py:_build_workflow's positional-
# arg dispatch (it would call the adapter with zero args).
adapter.__signature__ = inspect.Signature( # type: ignore[attr-defined]
parameters=[
inspect.Parameter("config", inspect.Parameter.POSITIONAL_OR_KEYWORD),
inspect.Parameter("context", inspect.Parameter.POSITIONAL_OR_KEYWORD),
]
)
adapter.__wrapped__ = fn # type: ignore[attr-defined]
return adapter
# ---------------------------------------------------------------------------
# @workflow — outermost decorator
# ---------------------------------------------------------------------------
[docs]
def workflow(
name: str,
*,
description: str = "",
protocol: Optional[str] = None,
default_config: Optional[str] = None,
scope: Literal["patient", "cohort"] = "patient",
config_namespace: Optional[str] = None,
config_schema: Optional[type] = None,
**metadata: Any, # forward-compat hooks, ignored today
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Register a function as a workflow factory.
Must be the outermost decorator. Reads the accumulated
``__thesis_meta__`` attached by ``@requires`` / ``@produces`` /
``@verify``, synthesizes a ``(config, context) -> Workflow`` adapter
and a composite verifier, and registers a
:class:`~thesis.core.registry.WorkflowEntry` with
:data:`~thesis.core.registry.WORKFLOW_REGISTRY`.
Args:
name: CLI identifier; must be unique in the registry.
description: Human-readable description for ``thesis list-workflows``.
protocol: Default protocol name (``WorkflowEntry.default_protocol``).
default_config: Default config name when ``-c`` is omitted.
scope: ``"patient"`` (default) or ``"cohort"``. ``"cohort"`` sets
``is_cohort_level=True`` and rejects per-patient path
declarations at decoration time.
config_namespace: Optional top-level YAML key the workflow owns.
When set, registers a Pydantic schema under this key with
:data:`~thesis.core.config.namespace_registry.NAMESPACE_REGISTRY`
so :class:`~thesis.core.config.PipelineConfig` can validate the
section without ``core/config/validators.py`` knowing about it.
config_schema: Optional :class:`BaseConfig` subclass that defines
the schema for *config_namespace*. When omitted (but
*config_namespace* is set), the schema is auto-derived from
``@requires`` config_paths via
:func:`~thesis.core.config.derive.derive_namespace_model`.
**metadata: Reserved for future extensions (ignored).
Returns:
The original decorated function, unmodified except for the
``__thesis_meta__`` attribute that lower decorators attached.
Raises:
TypeError: When ``scope="cohort"`` and ``@requires`` declares a
``PatientFile`` / ``PatientDir``, when ``scope="patient"``
and ``@produces`` declares a ``CohortDir``, or when
``config_schema`` is set without ``config_namespace``.
ValueError: When ``THESIS_STRICT_REGISTRY=1`` and *name* is
already registered, or the namespace is already registered.
"""
del metadata # not used in v1
if scope not in ("patient", "cohort"):
raise ValueError(f"scope must be 'patient' or 'cohort', got {scope!r}")
if config_schema is not None and config_namespace is None:
raise TypeError(
"config_schema requires config_namespace to be set — there is "
"no key to register the schema under."
)
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
meta = _meta(fn)
_validate_scope(name, scope, meta)
if os.environ.get("THESIS_STRICT_REGISTRY") == "1" and name in WORKFLOW_REGISTRY:
raise ValueError(f"Strict registry mode: workflow '{name}' is already registered.")
if config_namespace is not None:
# Imported lazily to keep ``core.decorators`` importable even
# before the config package is fully initialised.
from thesis.core.config.derive import derive_namespace_model
from thesis.core.config.namespace_registry import NAMESPACE_REGISTRY
if (
os.environ.get("THESIS_STRICT_REGISTRY") == "1"
and config_namespace in NAMESPACE_REGISTRY
):
raise ValueError(
f"Strict registry mode: namespace '{config_namespace}' "
"is already registered."
)
schema = config_schema or derive_namespace_model(config_namespace, meta)
NAMESPACE_REGISTRY.register(config_namespace, schema)
adapter = _build_adapter(fn, meta)
verifier = _build_verifier(meta)
entry = WorkflowEntry(
name=name,
factory=adapter,
verifier=verifier,
description=description,
default_protocol=protocol,
default_config=default_config,
is_cohort_level=(scope == "cohort"),
)
WORKFLOW_REGISTRY.register(entry)
return fn
return decorator