Source code for thesis.core.registry
"""
Component registry for the thesis framework.
Provides a registry system for modules, processors, and other components
to enable dynamic loading and plugin architecture.
"""
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.logging import get_logger
logger = get_logger(__name__)
__all__ = [
"WorkflowEntry",
"WorkflowRegistry",
"WORKFLOW_REGISTRY",
]
# ---------------------------------------------------------------------------
# Workflow registry
# ---------------------------------------------------------------------------
[docs]
@dataclass
class WorkflowEntry:
"""Metadata and factory for a registered workflow.
Attributes:
name: Short identifier used on the CLI (e.g. ``"hcp"``).
factory: Callable that accepts ``(config, context)`` and returns a
Nipype ``Workflow``.
verifier: Optional callable ``(config, context) -> List[str]`` that
performs pre-run requirement checks. An empty list means all
clear.
description: Human-readable description shown by ``thesis list-workflows``.
default_protocol: Protocol name used when neither the CLI ``--protocol``
flag nor a per-patient config supplies one.
default_config: Config name to use when no ``-c`` flag is provided
(``None`` falls back to the CLI default, ``"default"``).
"""
name: str
factory: Callable[[PipelineConfig, ProcessingContext], object]
verifier: Optional[Callable[[PipelineConfig, ProcessingContext], list[str]]] = None
description: str = ""
default_protocol: Optional[str] = None
default_config: Optional[str] = None
is_cohort_level: bool = False
[docs]
class WorkflowRegistry:
"""Registry for :class:`WorkflowEntry` objects.
Each workflow self-registers at module import time. The recommended path
is the :func:`~thesis.core.decorators.workflow` decorator, which builds a
:class:`WorkflowEntry` and calls :meth:`register` for you::
from thesis.core.decorators import workflow
@workflow(name="hcp", description="HCP tractography workflow.", protocol="hcp")
def build_workflow(*, config, context):
...
Direct registration via :meth:`register` is still supported for tests or
introspection tooling::
>>> WORKFLOW_REGISTRY.register(WorkflowEntry(
... name="hcp",
... factory=build_workflow,
... description="HCP tractography workflow.",
... default_protocol="hcp",
... ))
>>> entry = WORKFLOW_REGISTRY.get("hcp")
"""
[docs]
def __init__(self) -> None:
self._entries: Dict[str, WorkflowEntry] = {}
[docs]
def register(self, entry: WorkflowEntry) -> None:
"""Register a workflow entry.
Args:
entry: The :class:`WorkflowEntry` to register.
"""
if entry.name in self._entries:
logger.warning(f"Overwriting existing workflow registration: {entry.name}")
self._entries[entry.name] = entry
logger.debug(f"Registered workflow: {entry.name}")
[docs]
def get(self, name: str) -> WorkflowEntry:
"""Return the entry for *name*.
Args:
name: Registered workflow name.
Returns:
The matching :class:`WorkflowEntry`.
Raises:
KeyError: If *name* is not registered, with a helpful message
listing available workflows.
"""
if name not in self._entries:
available = ", ".join(sorted(self._entries.keys())) or "<none>"
raise KeyError(f"Workflow '{name}' not found in registry. Available: {available}")
return self._entries[name]
[docs]
def has(self, name: str) -> bool:
"""Return ``True`` if *name* is registered."""
return name in self._entries
[docs]
def list(self) -> List[str]:
"""Return sorted list of registered workflow names."""
return sorted(self._entries.keys())
[docs]
def all_entries(self) -> List[WorkflowEntry]:
"""Return all entries sorted by name."""
return [self._entries[k] for k in sorted(self._entries.keys())]
def __contains__(self, name: str) -> bool:
return name in self._entries
def __len__(self) -> int:
return len(self._entries)
def __repr__(self) -> str:
return f"WorkflowRegistry(workflows={self.list()})"
WORKFLOW_REGISTRY = WorkflowRegistry()