Source code for thesis.core.registry

"""
Component registry for the thesis framework.

Provides a registry system for modules, processors, and other components
to enable dynamic loading and plugin architecture.
"""

from dataclasses import dataclass
from typing import Callable, Dict, List, Optional

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.logging import get_logger

logger = get_logger(__name__)

__all__ = [
    "WorkflowEntry",
    "WorkflowRegistry",
    "WORKFLOW_REGISTRY",
]


# ---------------------------------------------------------------------------
# Workflow registry
# ---------------------------------------------------------------------------


[docs] @dataclass class WorkflowEntry: """Metadata and factory for a registered workflow. Attributes: name: Short identifier used on the CLI (e.g. ``"hcp"``). factory: Callable that accepts ``(config, context)`` and returns a Nipype ``Workflow``. verifier: Optional callable ``(config, context) -> List[str]`` that performs pre-run requirement checks. An empty list means all clear. description: Human-readable description shown by ``thesis list-workflows``. default_protocol: Protocol name used when neither the CLI ``--protocol`` flag nor a per-patient config supplies one. default_config: Config name to use when no ``-c`` flag is provided (``None`` falls back to the CLI default, ``"default"``). """ name: str factory: Callable[[PipelineConfig, ProcessingContext], object] verifier: Optional[Callable[[PipelineConfig, ProcessingContext], list[str]]] = None description: str = "" default_protocol: Optional[str] = None default_config: Optional[str] = None is_cohort_level: bool = False
[docs] class WorkflowRegistry: """Registry for :class:`WorkflowEntry` objects. Each workflow self-registers at module import time. The recommended path is the :func:`~thesis.core.decorators.workflow` decorator, which builds a :class:`WorkflowEntry` and calls :meth:`register` for you:: from thesis.core.decorators import workflow @workflow(name="hcp", description="HCP tractography workflow.", protocol="hcp") def build_workflow(*, config, context): ... Direct registration via :meth:`register` is still supported for tests or introspection tooling:: >>> WORKFLOW_REGISTRY.register(WorkflowEntry( ... name="hcp", ... factory=build_workflow, ... description="HCP tractography workflow.", ... default_protocol="hcp", ... )) >>> entry = WORKFLOW_REGISTRY.get("hcp") """
[docs] def __init__(self) -> None: self._entries: Dict[str, WorkflowEntry] = {}
[docs] def register(self, entry: WorkflowEntry) -> None: """Register a workflow entry. Args: entry: The :class:`WorkflowEntry` to register. """ if entry.name in self._entries: logger.warning(f"Overwriting existing workflow registration: {entry.name}") self._entries[entry.name] = entry logger.debug(f"Registered workflow: {entry.name}")
[docs] def get(self, name: str) -> WorkflowEntry: """Return the entry for *name*. Args: name: Registered workflow name. Returns: The matching :class:`WorkflowEntry`. Raises: KeyError: If *name* is not registered, with a helpful message listing available workflows. """ if name not in self._entries: available = ", ".join(sorted(self._entries.keys())) or "<none>" raise KeyError(f"Workflow '{name}' not found in registry. Available: {available}") return self._entries[name]
[docs] def has(self, name: str) -> bool: """Return ``True`` if *name* is registered.""" return name in self._entries
[docs] def list(self) -> List[str]: """Return sorted list of registered workflow names.""" return sorted(self._entries.keys())
[docs] def all_entries(self) -> List[WorkflowEntry]: """Return all entries sorted by name.""" return [self._entries[k] for k in sorted(self._entries.keys())]
def __contains__(self, name: str) -> bool: return name in self._entries def __len__(self) -> int: return len(self._entries) def __repr__(self) -> str: return f"WorkflowRegistry(workflows={self.list()})"
WORKFLOW_REGISTRY = WorkflowRegistry()