Source code for thesis.core.decorators

"""Workflow decorator API.

This module provides the small set of decorators that let workflow
authors register a workflow without manually constructing a
:class:`~thesis.core.registry.WorkflowEntry` and without unpacking
``config`` / ``context`` by hand.

The decorators are:

- :func:`workflow` — outermost; reads accumulated metadata, synthesizes
  an adapter, and registers the workflow.
- :func:`requires` — declares input paths (``PatientFile`` / ``PatientDir``).
- :func:`produces` — declares output paths (``OutputDir`` / ``WorkingFile``
  / ``CohortDir``).
- :func:`verify` — attaches preflight check callables.

The inner three decorators are pure metadata-attachers. ``@workflow`` is
the only decorator with side effects.

Example:
    A complete patient-scope workflow that requires a T1, produces an
    output subdirectory, and adds a custom preflight check::

        from pathlib import Path

        from nipype import Workflow

        from thesis.core.config import PipelineConfig
        from thesis.core.context import ProcessingContext
        from thesis.core.decorators import produces, requires, verify, workflow
        from thesis.core.path_declarations import OutputDir, PatientFile


        def _check_modality(config, context, **kwargs) -> list[str]:
            t1: Path = kwargs["t1"]
            if t1.suffix not in {".gz", ".nii"}:
                return [f"t1 must be NIfTI, got {t1.suffix!r}"]
            return []


        @workflow(
            name="my_workflow",
            description="One-line summary shown by `thesis list-workflows`.",
            protocol="my_workflow",
            scope="patient",
        )
        @requires(
            t1=PatientFile(
                default="T1w/T1w_acpc_dc_restore.nii.gz",
                config_paths=["my_workflow.t1_image", "hcp.t1_image"],
            ),
        )
        @produces(out_dir=OutputDir("my_workflow"))
        @verify(_check_modality)
        def build_workflow(
            *,
            t1: Path,
            out_dir: Path,
            config: PipelineConfig,
            context: ProcessingContext,
        ) -> Workflow:
            return Workflow(name=f"my_workflow_{context.patient_id}")

    Decoration order: ``@workflow`` must be **outermost**; the inner
    metadata decorators can appear in any order. Resolved paths are
    injected as keyword-only arguments named after the decorator keys.
    ``config`` / ``context`` are forwarded only when the body declares
    them.

See ``markdowns/plans/workflow-decorator/01-design-spec.md`` for the
full design rationale.
"""

from __future__ import annotations

import inspect
import os
from dataclasses import dataclass, field
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    Literal,
    Optional,
)

from thesis.core.logging import get_logger
from thesis.core.path_declarations import (
    CohortDir,
    PathDeclaration,
    PatientDir,
    PatientFile,
)
from thesis.core.registry import WORKFLOW_REGISTRY, WorkflowEntry

if TYPE_CHECKING:  # pragma: no cover - typing only
    from thesis.core.config import PipelineConfig
    from thesis.core.context import ProcessingContext

logger = get_logger(__name__)

__all__ = ["workflow", "requires", "produces", "verify"]


# A verifier callable. The historical signature is
# ``(config, context) -> list[str]``; from Phase A.3 onwards verifiers
# may also accept resolved declared kwargs (either via a matching named
# parameter or a ``**kwargs`` catch-all). The adapter inspects the
# signature at decoration time and chooses the call form.
VerifierFn = Callable[..., List[str]]


@dataclass
class _Meta:
    """Accumulated metadata attached to a decorated function as
    ``fn.__thesis_meta__``."""

    requires: Dict[str, PathDeclaration] = field(default_factory=dict)
    produces: Dict[str, PathDeclaration] = field(default_factory=dict)
    verifiers: List[VerifierFn] = field(default_factory=list)


def _meta(fn: Callable[..., Any]) -> _Meta:
    """Return (creating if needed) the ``_Meta`` instance attached to *fn*."""
    meta = getattr(fn, "__thesis_meta__", None)
    if meta is None:
        meta = _Meta()
        setattr(fn, "__thesis_meta__", meta)
    return meta


# ---------------------------------------------------------------------------
# Inner decorators (metadata-only)
# ---------------------------------------------------------------------------


[docs] def requires(**path_decls: PathDeclaration) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Declare input paths for a workflow. Each keyword argument names a kwarg that the decorated workflow body will receive, mapped to a :class:`~thesis.core.path_declarations.PathDeclaration`. Later ``@requires`` decorations override earlier same-name entries. """ def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: meta = _meta(fn) for name, decl in path_decls.items(): meta.requires[name] = decl return fn return decorator
[docs] def produces(**path_decls: PathDeclaration) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Declare output paths for a workflow. Output declarations don't generate implicit existence checks. The adapter still resolves them (creating directories as needed) and injects the resulting :class:`pathlib.Path` as a kwarg. """ def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: meta = _meta(fn) for name, decl in path_decls.items(): meta.produces[name] = decl return fn return decorator
[docs] def verify(*checks: VerifierFn) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Attach preflight check callables to a workflow. Each check has signature ``(config, context) -> list[str]`` and returns an empty list on success or a list of human-readable error strings on failure. Checks run after the implicit existence checks generated from ``@requires`` declarations. """ def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: meta = _meta(fn) meta.verifiers.extend(checks) return fn return decorator
# --------------------------------------------------------------------------- # Helper: scope validation # --------------------------------------------------------------------------- def _validate_scope(name: str, scope: str, meta: _Meta) -> None: """Reject declarations that conflict with the requested scope.""" if scope == "cohort": for kw, decl in meta.requires.items(): if isinstance(decl, (PatientFile, PatientDir)): raise TypeError( f"Workflow '{name}' has scope='cohort' but @requires " f"declares per-patient input '{kw}'. Cohort-scope " f"workflows must not declare PatientFile / PatientDir " f"inputs (they have no patient to bind to)." ) if scope == "patient": for kw, decl in meta.produces.items(): if isinstance(decl, CohortDir): raise TypeError( f"Workflow '{name}' has scope='patient' but @produces " f"declares CohortDir '{kw}'. Use OutputDir for " f"per-patient outputs." ) # --------------------------------------------------------------------------- # Helper: verifier synthesis # --------------------------------------------------------------------------- def _call_verifier( fn: Callable[..., List[str]], config: "PipelineConfig", context: "ProcessingContext", declared: Dict[str, Any], ) -> List[str]: """Invoke a user verifier, opting it into kwarg injection if the signature accepts it. Backward-compatible: a verifier whose signature is the historical ``(config, context)`` is called unchanged. A verifier that declares ``**kwargs`` or names matching any declared kwarg receives the resolved declared values too. """ try: sig = inspect.signature(fn) except (TypeError, ValueError): return fn(config, context) has_var_kw = any(p.kind is inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()) if has_var_kw: return fn(config, context, **declared) accepted: Dict[str, Any] = { name: value for name, value in declared.items() if name in sig.parameters } if accepted: return fn(config, context, **accepted) return fn(config, context) def _build_verifier(meta: _Meta) -> Optional[Callable[..., List[str]]]: """Compose the implicit existence checks with explicit ``@verify`` checks. Returns ``None`` when there is nothing to verify, matching today's convention for workflows that don't carry a verifier. The composite verifier mirrors the adapter: it resolves every declared path once, runs implicit existence checks, then dispatches each explicit verifier through :func:`_call_verifier` which forwards the resolved kwargs when the verifier opts in via its signature. """ implicit_targets = list(meta.requires.items()) # A declaration contributes an implicit check only when it carries an # ``optional`` flag that is currently ``False``. Output-only types # (``OutputDir``, ``WorkingFile``, ``CohortDir``) have no ``optional`` # flag and their ``existence_errors`` returns ``[]`` — they should # not by themselves force a verifier into existence. has_implicit = any( hasattr(decl, "optional") and not getattr(decl, "optional") for _, decl in implicit_targets ) if not has_implicit and not meta.verifiers: return None explicit_checks = list(meta.verifiers) all_decls = list(meta.requires.items()) + list(meta.produces.items()) def composite(config: "PipelineConfig", context: "ProcessingContext") -> List[str]: errors: List[str] = [] for name, decl in implicit_targets: errors.extend(decl.existence_errors(config, context, name)) if explicit_checks: declared: Dict[str, Any] = { name: decl.resolve(config, context) for name, decl in all_decls } for check in explicit_checks: errors.extend(_call_verifier(check, config, context, declared)) return errors return composite # --------------------------------------------------------------------------- # Helper: adapter synthesis # --------------------------------------------------------------------------- def _build_adapter( fn: Callable[..., Any], meta: _Meta ) -> Callable[["PipelineConfig", "ProcessingContext"], Any]: """Wrap *fn* in a ``(config, context) -> Workflow`` adapter. The adapter resolves declared paths against the :class:`ProcessingContext` and injects them — plus ``config`` / ``context`` if the wrapped function's signature accepts them — as keyword arguments. """ sig = inspect.signature(fn) params = sig.parameters accepts_config = "config" in params accepts_context = "context" in params def adapter(config: "PipelineConfig", context: "ProcessingContext") -> Any: kwargs: Dict[str, Any] = {} for name, decl in meta.requires.items(): kwargs[name] = decl.resolve(config, context) for name, decl in meta.produces.items(): kwargs[name] = decl.resolve(config, context) if accepts_config: kwargs["config"] = config if accepts_context: kwargs["context"] = context return fn(**kwargs) # Pin the adapter's introspected signature to (config, context). Without # this, inspect.signature() follows __wrapped__ to the underlying # kwarg-only function, which breaks cli.py:_build_workflow's positional- # arg dispatch (it would call the adapter with zero args). adapter.__signature__ = inspect.Signature( # type: ignore[attr-defined] parameters=[ inspect.Parameter("config", inspect.Parameter.POSITIONAL_OR_KEYWORD), inspect.Parameter("context", inspect.Parameter.POSITIONAL_OR_KEYWORD), ] ) adapter.__wrapped__ = fn # type: ignore[attr-defined] return adapter # --------------------------------------------------------------------------- # @workflow — outermost decorator # ---------------------------------------------------------------------------
[docs] def workflow( name: str, *, description: str = "", protocol: Optional[str] = None, default_config: Optional[str] = None, scope: Literal["patient", "cohort"] = "patient", config_namespace: Optional[str] = None, config_schema: Optional[type] = None, **metadata: Any, # forward-compat hooks, ignored today ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """Register a function as a workflow factory. Must be the outermost decorator. Reads the accumulated ``__thesis_meta__`` attached by ``@requires`` / ``@produces`` / ``@verify``, synthesizes a ``(config, context) -> Workflow`` adapter and a composite verifier, and registers a :class:`~thesis.core.registry.WorkflowEntry` with :data:`~thesis.core.registry.WORKFLOW_REGISTRY`. Args: name: CLI identifier; must be unique in the registry. description: Human-readable description for ``thesis list-workflows``. protocol: Default protocol name (``WorkflowEntry.default_protocol``). default_config: Default config name when ``-c`` is omitted. scope: ``"patient"`` (default) or ``"cohort"``. ``"cohort"`` sets ``is_cohort_level=True`` and rejects per-patient path declarations at decoration time. config_namespace: Optional top-level YAML key the workflow owns. When set, registers a Pydantic schema under this key with :data:`~thesis.core.config.namespace_registry.NAMESPACE_REGISTRY` so :class:`~thesis.core.config.PipelineConfig` can validate the section without ``core/config/validators.py`` knowing about it. config_schema: Optional :class:`BaseConfig` subclass that defines the schema for *config_namespace*. When omitted (but *config_namespace* is set), the schema is auto-derived from ``@requires`` config_paths via :func:`~thesis.core.config.derive.derive_namespace_model`. **metadata: Reserved for future extensions (ignored). Returns: The original decorated function, unmodified except for the ``__thesis_meta__`` attribute that lower decorators attached. Raises: TypeError: When ``scope="cohort"`` and ``@requires`` declares a ``PatientFile`` / ``PatientDir``, when ``scope="patient"`` and ``@produces`` declares a ``CohortDir``, or when ``config_schema`` is set without ``config_namespace``. ValueError: When ``THESIS_STRICT_REGISTRY=1`` and *name* is already registered, or the namespace is already registered. """ del metadata # not used in v1 if scope not in ("patient", "cohort"): raise ValueError(f"scope must be 'patient' or 'cohort', got {scope!r}") if config_schema is not None and config_namespace is None: raise TypeError( "config_schema requires config_namespace to be set — there is " "no key to register the schema under." ) def decorator(fn: Callable[..., Any]) -> Callable[..., Any]: meta = _meta(fn) _validate_scope(name, scope, meta) if os.environ.get("THESIS_STRICT_REGISTRY") == "1" and name in WORKFLOW_REGISTRY: raise ValueError(f"Strict registry mode: workflow '{name}' is already registered.") if config_namespace is not None: # Imported lazily to keep ``core.decorators`` importable even # before the config package is fully initialised. from thesis.core.config.derive import derive_namespace_model from thesis.core.config.namespace_registry import NAMESPACE_REGISTRY if ( os.environ.get("THESIS_STRICT_REGISTRY") == "1" and config_namespace in NAMESPACE_REGISTRY ): raise ValueError( f"Strict registry mode: namespace '{config_namespace}' " "is already registered." ) schema = config_schema or derive_namespace_model(config_namespace, meta) NAMESPACE_REGISTRY.register(config_namespace, schema) adapter = _build_adapter(fn, meta) verifier = _build_verifier(meta) entry = WorkflowEntry( name=name, factory=adapter, verifier=verifier, description=description, default_protocol=protocol, default_config=default_config, is_cohort_level=(scope == "cohort"), ) WORKFLOW_REGISTRY.register(entry) return fn return decorator