Source code for thesis.core.path_declarations

"""Declarative path types for the workflow decorator API.

These types let workflow authors declare which patient inputs, outputs,
and working files their workflow needs without manually unpacking
``config`` and ``context``. The ``@workflow`` decorator's synthesized
adapter resolves each declaration against the current
:class:`~thesis.core.context.ProcessingContext` at workflow-build time
and injects the resolved :class:`pathlib.Path` as a keyword argument
to the decorated function.

See ``markdowns/plans/workflow-decorator/01-design-spec.md`` and
``02-api-reference.md`` for the full design.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union

from thesis.core.exceptions import ConfigurationError

if TYPE_CHECKING:  # pragma: no cover - typing only
    from thesis.core.config import PipelineConfig
    from thesis.core.context import ProcessingContext

__all__ = [
    "PathDeclaration",
    "PatientFile",
    "PatientDir",
    "OutputDir",
    "WorkingFile",
    "CohortDir",
    "PriorOutput",
    "DataFile",
    "DataDir",
    "ExternalFile",
    "GlobMatch",
    "GlobGroup",
    "GlobGroupResult",
    "ConfigList",
    "ConfigListItem",
    "CohortPatients",
    "CohortPatient",
    "_config_lookup",
]


_VALID_FALLBACK_DIR_NAMES = frozenset({"input_dir", "output_dir", "working_dir", "data_dir", "."})


def _resolve_named_dir(name: str, context: "ProcessingContext") -> Optional[Path]:
    """Resolve a fallback-dir name like ``"input_dir"`` to the corresponding
    directory on *context*.

    Returns ``None`` when the named directory is not configured on *context*
    (e.g. ``working_dir`` may be ``None`` in some edge contexts).
    """
    if name == ".":
        return Path.cwd()
    if name not in _VALID_FALLBACK_DIR_NAMES:
        raise ConfigurationError(
            f"Invalid fallback_dirs entry {name!r}. Valid names: "
            f"{sorted(_VALID_FALLBACK_DIR_NAMES)}."
        )
    return getattr(context, name, None)


def _validate_fallback_dirs(fallback_dirs: Optional[List[str]]) -> None:
    """Validate that every entry in *fallback_dirs* is a known dir name."""
    if not fallback_dirs:
        return
    for entry in fallback_dirs:
        if entry not in _VALID_FALLBACK_DIR_NAMES:
            raise ConfigurationError(
                f"Invalid fallback_dirs entry {entry!r}. Valid names: "
                f"{sorted(_VALID_FALLBACK_DIR_NAMES)}."
            )


def _config_lookup(config: Any, dotted: str) -> Any:
    """Walk a dotted attribute path on a config object.

    Returns the resolved value or ``None`` if any intermediate attribute
    is missing or itself ``None``. Empty paths return ``None``. ``dict``
    values along the path are traversed via ``.get`` so config sections
    declared as ``Dict[str, Any]`` (e.g. ``PipelineConfig.preprocess``)
    work the same as Pydantic-validated sections.

    Args:
        config: A :class:`~thesis.core.config.PipelineConfig` (or any
            object that supports ``getattr``/``__getitem__``).
        dotted: Dotted attribute path, e.g. ``"hcp.t1_image"``.

    Returns:
        The resolved value, or ``None`` when the path cannot be followed.
    """
    if not dotted:
        return None
    current: Any = config
    for segment in dotted.split("."):
        if current is None:
            return None
        if isinstance(current, dict):
            current = current.get(segment)
        else:
            current = getattr(current, segment, None)
    return current


def _build_config_path_chain(
    config_path: Optional[str],
    config_paths: Optional[Union[str, List[str]]],
) -> List[str]:
    """Return the ordered list of dotted config paths to consult.

    ``config_paths`` (string or list) is consulted first, then
    ``config_path`` is appended if it is set and not already present. Shared
    by :class:`PatientFile`, :class:`PatientDir`, and :class:`ExternalFile`.
    """
    chain: List[str] = []
    if config_paths is not None:
        if isinstance(config_paths, str):
            chain.append(config_paths)
        else:
            chain.extend(config_paths)
    if config_path is not None and config_path not in chain:
        chain.append(config_path)
    return chain


def _resolve_config_name(
    config: Any,
    chain: List[str],
    default: Optional[str] = None,
    *,
    skip_empty: bool = False,
) -> Optional[str]:
    """Return the first non-``None`` config value along *chain*, else *default*.

    Args:
        config: Config object to look up values on.
        chain: Ordered dotted paths (from :func:`_build_config_path_chain`).
        default: Fallback returned when no config value resolves.
        skip_empty: When ``True``, an empty-string config value is treated the
            same as unset (used by :class:`ExternalFile` so a blank config value
            does not resolve to a base directory).
    """
    for path in chain:
        value = _config_lookup(config, path)
        if value is None:
            continue
        if skip_empty and str(value) == "":
            continue
        return str(value)
    return default


def _require_path_source(
    cls_name: str,
    *,
    has_default: bool,
    config_path: Optional[str],
    config_paths: Optional[Union[str, List[str]]],
    optional: bool,
) -> None:
    """Raise ``ConfigurationError`` when no path source is configured.

    Shared ``__post_init__`` guard for declarations that need at least one of
    ``default`` / ``config_path`` / ``config_paths`` (or ``optional=True``).
    """
    if has_default or config_path is not None or config_paths is not None or optional:
        return
    raise ConfigurationError(
        f"{cls_name} requires at least one of 'default', 'config_path', "
        "'config_paths', or 'optional=True' — otherwise no path can be derived."
    )


def _substitute_patient_id(name: str, context: "ProcessingContext") -> str:
    """Apply ``{patient_id}`` template substitution to a path component.

    No-op when *name* contains no ``{`` placeholders. When it does, the
    only supported key is ``patient_id`` — any other placeholder raises
    ``KeyError``. ``str.format_map`` with a single-key dict is used so
    missing keys surface immediately rather than silently passing through.
    """
    if "{" not in name:
        return name
    return name.format_map({"patient_id": context.patient_id})


def _search_dirs_for_file(
    filename: str,
    dir_names: List[str],
    context: "ProcessingContext",
) -> Optional[Path]:
    """Search *dir_names* in order for *filename*; return first existing match.

    Each entry in *dir_names* is resolved via :func:`_resolve_named_dir` to
    a base directory on *context* (``input_dir``, ``output_dir`` …). The
    first ``base / filename`` that exists wins. Returns ``None`` if no
    candidate exists.
    """
    for name in dir_names:
        base = _resolve_named_dir(name, context)
        if base is None:
            continue
        candidate = base / filename
        if candidate.exists():
            return candidate
    return None


def _search_dirs_for_dir(
    dirname: str,
    dir_names: List[str],
    context: "ProcessingContext",
) -> Optional[Path]:
    """Search *dir_names* in order for a subdirectory *dirname*."""
    for name in dir_names:
        base = _resolve_named_dir(name, context)
        if base is None:
            continue
        candidate = base / dirname
        if candidate.exists() and candidate.is_dir():
            return candidate
    return None


[docs] class PathDeclaration: """Base marker class for path declarations. Subclasses implement :meth:`resolve` (Path resolution against a :class:`ProcessingContext`) and :meth:`existence_errors` (implicit preflight check used by the synthesized verifier). """ # Tag used by ``@workflow(scope="cohort")`` validation. Patient-side # declarations set ``_kind = "patient"``; cohort-side and shared # declarations set ``_kind = "cohort"`` or ``_kind = "any"``. _kind: str = "any"
[docs] def resolve( self, config: "PipelineConfig", context: "ProcessingContext" ) -> Any: # pragma: no cover - overridden """Resolve the declaration to a concrete value for the workflow body. The return type varies by subclass: single-Path primitives return :class:`pathlib.Path`; glob primitives return ``list[Path]`` or a namespace object; structured primitives return a list of dataclass instances. """ raise NotImplementedError
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: """Return any preflight error strings (empty when nothing to check).""" return []
[docs] @dataclass(frozen=True) class PatientFile(PathDeclaration): """Declare a per-patient input file under ``context.input_dir``. Args: default: Filename to use when no ``config_path``/``config_paths`` value resolves to non-``None``. Relative to ``context.input_dir`` (or the first matching ``fallback_dirs`` entry). May contain a ``{patient_id}`` placeholder. config_path: Single dotted attribute path on the :class:`~thesis.core.config.PipelineConfig` whose value, if non-``None``, overrides the default. Kept for backward compatibility with Phase A/A.2/B call sites — prefer ``config_paths`` for new code. config_paths: Single dotted path *or* list of dotted paths consulted in priority order. The first non-``None`` value wins. When both ``config_path`` and ``config_paths`` are given, ``config_paths`` takes precedence; if it yields nothing, ``config_path`` is consulted next. fallback_dirs: Optional ordered list of base-directory names to search if the resolved filename does not exist at ``context.input_dir``. Valid names: ``"input_dir"``, ``"output_dir"``, ``"working_dir"``, ``"data_dir"``, ``"."``. When set, the names listed are the *complete* search order — include ``"input_dir"`` explicitly if you want it searched. optional: When ``True``, missing paths resolve to ``None`` and the implicit existence check is skipped. Raises: ConfigurationError: When ``default``, ``config_path``, ``config_paths`` are all unset and ``optional=False`` — there is no way to derive a path. Also raised when ``fallback_dirs`` contains an unknown directory name. """ default: Optional[str] = None config_path: Optional[str] = None config_paths: Optional[Union[str, List[str]]] = None fallback_dirs: Optional[List[str]] = None optional: bool = False _kind: str = "patient" def __post_init__(self) -> None: _require_path_source( "PatientFile", has_default=self.default is not None, config_path=self.config_path, config_paths=self.config_paths, optional=self.optional, ) _validate_fallback_dirs(self.fallback_dirs) def _config_path_chain(self) -> List[str]: """Return the ordered list of dotted config paths to consult.""" return _build_config_path_chain(self.config_path, self.config_paths) def _resolved_name(self, config: "PipelineConfig") -> Optional[str]: return _resolve_config_name(config, self._config_path_chain(), self.default)
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Optional[Path]: name = self._resolved_name(config) if name is None: return None name = _substitute_patient_id(name, context) primary = context.get_input_path(name) if self.fallback_dirs and not primary.exists(): found = _search_dirs_for_file(name, self.fallback_dirs, context) if found is not None: return found # No fallback hit — fall through to returning the canonical # path under input_dir so existence_errors can report it. return primary
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] resolved = self.resolve(config, context) if resolved is None or not resolved.exists(): return [f"Required input '{name}' not found at: {resolved}"] if not resolved.is_file(): return [f"Required input '{name}' is not a regular file: {resolved}"] return []
[docs] @dataclass(frozen=True) class PatientDir(PathDeclaration): """Declare a per-patient input directory under ``context.input_dir``. Mirrors :class:`PatientFile` (including ``config_paths`` and ``fallback_dirs``) but enforces that the resolved path is a directory. Supports ``{patient_id}`` template substitution. """ default: Optional[str] = None config_path: Optional[str] = None config_paths: Optional[Union[str, List[str]]] = None fallback_dirs: Optional[List[str]] = None optional: bool = False _kind: str = "patient" def __post_init__(self) -> None: _require_path_source( "PatientDir", has_default=self.default is not None, config_path=self.config_path, config_paths=self.config_paths, optional=self.optional, ) _validate_fallback_dirs(self.fallback_dirs) def _config_path_chain(self) -> List[str]: return _build_config_path_chain(self.config_path, self.config_paths) def _resolved_name(self, config: "PipelineConfig") -> Optional[str]: return _resolve_config_name(config, self._config_path_chain(), self.default)
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Optional[Path]: name = self._resolved_name(config) if name is None: return None name = _substitute_patient_id(name, context) primary = context.get_input_path(name) if self.fallback_dirs and not (primary.exists() and primary.is_dir()): found = _search_dirs_for_dir(name, self.fallback_dirs, context) if found is not None: return found return primary
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] resolved = self.resolve(config, context) if resolved is None or not resolved.exists(): return [f"Required directory '{name}' not found at: {resolved}"] if not resolved.is_dir(): return [f"Required path '{name}' is not a directory: {resolved}"] return []
[docs] @dataclass(frozen=True) class OutputDir(PathDeclaration): """Declare an output subdirectory under ``context.output_dir``. Resolution creates the directory (``mkdir(parents=True, exist_ok=True)``). No implicit existence check is generated. """ subdir: str = "" _kind: str = "any"
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path: if context.output_dir is None: # pragma: no cover - defensive raise ConfigurationError("output_dir is not set on ProcessingContext") target = context.output_dir / self.subdir if self.subdir else context.output_dir target.mkdir(parents=True, exist_ok=True) return target
[docs] @dataclass(frozen=True) class WorkingFile(PathDeclaration): """Declare a temporary file under ``context.working_dir``. Valid in both ``scope="patient"`` and ``scope="cohort"``. No implicit existence check is generated. """ name: str = "" _kind: str = "any"
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path: return context.get_working_path(self.name)
[docs] @dataclass(frozen=True) class CohortDir(PathDeclaration): """Declare a cohort-level output directory. Resolves to ``context.output_dir / subdir`` (the cohort dispatch path in ``cli.py`` sets ``output_dir`` to the cohort root for cohort-scope workflows). The directory is created on resolution. """ subdir: str = "" _kind: str = "cohort"
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path: if context.output_dir is None: # pragma: no cover - defensive raise ConfigurationError("output_dir is not set on ProcessingContext") target = context.output_dir / self.subdir if self.subdir else context.output_dir target.mkdir(parents=True, exist_ok=True) return target
[docs] @dataclass(frozen=True) class PriorOutput(PathDeclaration): """Declare an existing file (or set of files) inside ``context.output_dir`` produced by an upstream workflow. Used by downstream workflows (qc, tract_similarity) that consume the HCP / preprocess / atlas results stored under the patient's output tree. Two resolution modes: - **Single file** — set *filename*. Resolution returns a single :class:`pathlib.Path` (``context.output_dir / subdir / filename``). Supports ``{patient_id}`` template substitution. - **Glob discovery** — set *glob_pattern*. Resolution returns a sorted ``list[Path]`` of matches under ``context.output_dir / subdir`` (or recursively if *recursive* is ``True``). Returns ``[]`` when no match and ``optional=True``. Args: filename: Single-file mode filename, relative to ``context.output_dir`` or *subdir*. glob_pattern: Glob expression for the multi-file mode. Mutually exclusive with *filename* (at least one must be set). subdir: Optional subdirectory under ``context.output_dir``. fallback_dirs: Optional ordered list of base-directory names to search if the primary location (``output_dir``) does not satisfy the lookup. Valid names: see ``PatientFile``. recursive: When ``True`` and *glob_pattern* is set, use :meth:`Path.rglob` instead of :meth:`Path.glob`. optional: Skip the implicit existence check. Raises: ConfigurationError: When neither *filename* nor *glob_pattern* is set, or when *fallback_dirs* contains an unknown name. """ filename: Optional[str] = None glob_pattern: Optional[str] = None subdir: Optional[str] = None fallback_dirs: Optional[List[str]] = None recursive: bool = False optional: bool = False _kind: str = "any" def __post_init__(self) -> None: if not self.filename and not self.glob_pattern: raise ConfigurationError( "PriorOutput requires either 'filename' or 'glob_pattern' " "to be set." ) _validate_fallback_dirs(self.fallback_dirs) def _candidate_bases(self, context: "ProcessingContext") -> List[Path]: """Return the ordered list of base directories to search.""" if context.output_dir is None: # pragma: no cover - defensive raise ConfigurationError("output_dir is not set on ProcessingContext") subdir = _substitute_patient_id(self.subdir, context) if self.subdir else None bases: List[Path] = [] primary = context.output_dir / subdir if subdir else context.output_dir bases.append(primary) if self.fallback_dirs: for name in self.fallback_dirs: base = _resolve_named_dir(name, context) if base is None: continue candidate = base / subdir if subdir else base if candidate not in bases: bases.append(candidate) return bases
[docs] def resolve( self, config: "PipelineConfig", context: "ProcessingContext" ) -> Union[Path, List[Path]]: bases = self._candidate_bases(context) if self.glob_pattern is not None: pattern = _substitute_patient_id(self.glob_pattern, context) for base in bases: if not base.exists(): continue matches = sorted(base.rglob(pattern) if self.recursive else base.glob(pattern)) if matches: return matches return [] # Single-file mode filename = _substitute_patient_id(self.filename or "", context) # Search bases for the file. Primary is bases[0]; if it exists, return. for base in bases: candidate = base / filename if candidate.exists(): return candidate # Nothing found — return the canonical (primary) path so # existence_errors can report it. return bases[0] / filename
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] resolved = self.resolve(config, context) if self.glob_pattern is not None: if not isinstance(resolved, list) or len(resolved) == 0: bases = self._candidate_bases(context) pattern = _substitute_patient_id(self.glob_pattern, context) return [ f"Required upstream output '{name}' " f"(pattern {pattern!r}) not found under: {bases[0]}" ] return [] # Single-file mode assert isinstance(resolved, Path) if not resolved.exists(): return [f"Required upstream output '{name}' not found at: {resolved}"] if not resolved.is_file(): return [f"Required upstream output '{name}' is not a regular file: {resolved}"] return []
def _check_data_path_traversal(resolved: Path, base: Path, label: str) -> None: """Raise ConfigurationError when *resolved* escapes *base*.""" try: resolved.resolve().relative_to(base.resolve()) except ValueError as exc: raise ConfigurationError( f"Path traversal detected: '{resolved}' escapes {label} '{base}'" ) from exc
[docs] @dataclass(frozen=True) class DataFile(PathDeclaration): """Declare an input file under ``context.data_dir``. Used for cohort-shared assets (templates, atlases, reference images) that live in the project's data directory rather than the per-patient input tree. Supports ``{patient_id}`` substitution. Implicit existence check: the resolved path must exist and be a file (unless ``optional=True``). Path-traversal is enforced — the resolved path must remain under ``context.data_dir``. """ filename: str = "" optional: bool = False _kind: str = "any"
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path: if context.data_dir is None: # pragma: no cover - defensive raise ConfigurationError("data_dir is not set on ProcessingContext") filename = _substitute_patient_id(self.filename, context) candidate = Path(filename) # Absolute path → explicit location; honour as-is (the traversal guard # only protects relative escapes under data_dir). if candidate.is_absolute(): return candidate resolved = context.data_dir / candidate _check_data_path_traversal(resolved, context.data_dir, "data_dir") return resolved
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] resolved = self.resolve(config, context) if not resolved.exists(): return [f"Required data file '{name}' not found at: {resolved}"] if not resolved.is_file(): return [f"Required data file '{name}' is not a regular file: {resolved}"] return []
[docs] @dataclass(frozen=True) class DataDir(PathDeclaration): """Declare an input directory under ``context.data_dir``. Mirrors :class:`DataFile` but enforces that the resolved path is a directory. Supports ``{patient_id}`` substitution and path-traversal safety. """ dirname: str = "" optional: bool = False _kind: str = "any"
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path: if context.data_dir is None: # pragma: no cover - defensive raise ConfigurationError("data_dir is not set on ProcessingContext") dirname = _substitute_patient_id(self.dirname, context) candidate = Path(dirname) # Absolute path → explicit location; honour as-is (the traversal guard # only protects relative escapes under data_dir). if candidate.is_absolute(): return candidate resolved = context.data_dir / candidate _check_data_path_traversal(resolved, context.data_dir, "data_dir") return resolved
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] resolved = self.resolve(config, context) if not resolved.exists(): return [f"Required data directory '{name}' not found at: {resolved}"] if not resolved.is_dir(): return [f"Required path '{name}' is not a directory: {resolved}"] return []
[docs] @dataclass(frozen=True) class ExternalFile(PathDeclaration): """Declare an input file whose path comes from a config value and may live anywhere on disk (absolute, ``~``, ``$ENV``, or relative to a base dir). Unlike :class:`PatientFile` / :class:`DataFile`, this is **not** anchored to the per-patient ``input_dir`` (or ``data_dir``) and applies **no path-traversal guard**. It is for cohort-shared, out-of-tree assets whose location is supplied by configuration — e.g. a registration template, a transform warp, or an atlas reference image — which are routinely absolute paths outside the patient tree (where ``PatientFile`` would raise). Resolution: the first non-empty value from ``config_paths`` / ``config_path`` is read; ``{patient_id}`` is substituted; ``~`` and ``$ENV`` are expanded; absolute paths are used as-is; relative paths resolve against the first available ``base_dirs`` entry (default ``data_dir`` then cwd). Implicit existence check (must be a regular file) unless ``optional``. Args: config_path: Single dotted config path (e.g. ``"registration.fixed_image"``). config_paths: Single path or ordered list consulted before ``config_path``. base_dirs: Ordered base-dir names for resolving *relative* values. Valid names: ``"input_dir"``, ``"output_dir"``, ``"working_dir"``, ``"data_dir"``, ``"."``. Defaults to ``["data_dir", "."]``. optional: When ``True``, an unset value resolves to ``None`` and the implicit existence check is skipped. Raises: ConfigurationError: When neither ``config_path`` nor ``config_paths`` is set (and not ``optional``), or ``base_dirs`` has an unknown name. """ config_path: Optional[str] = None config_paths: Optional[Union[str, List[str]]] = None base_dirs: Optional[List[str]] = None optional: bool = False _kind: str = "any" def __post_init__(self) -> None: if self.config_path is None and self.config_paths is None and not self.optional: raise ConfigurationError( "ExternalFile requires at least one of 'config_path', 'config_paths', " "or 'optional=True' — otherwise no path can be derived." ) _validate_fallback_dirs(self.base_dirs) def _config_path_chain(self) -> List[str]: return _build_config_path_chain(self.config_path, self.config_paths) def _resolved_name(self, config: "PipelineConfig") -> Optional[str]: # Treat an empty string the same as unset so a blank config value # does not resolve to a base directory. return _resolve_config_name(config, self._config_path_chain(), skip_empty=True) def _base(self, context: "ProcessingContext") -> Path: for name in self.base_dirs or ["data_dir", "."]: base = _resolve_named_dir(name, context) if base is not None: return base return Path.cwd()
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Optional[Path]: from thesis.core.utils import resolve_path name = self._resolved_name(config) if name is None: return None name = _substitute_patient_id(name, context) return resolve_path(self._base(context), name)
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] label = " / ".join(self._config_path_chain()) or name resolved = self.resolve(config, context) if resolved is None: return [f"Required input '{name}' ({label}) is not configured."] if not resolved.exists(): return [f"Required input '{name}' ({label}) not found at: {resolved}"] if not resolved.is_file(): return [f"Required input '{name}' ({label}) is not a regular file: {resolved}"] return []
# --------------------------------------------------------------------------- # Phase A.3 primitives: glob discovery, structured lists, cohort iteration # --------------------------------------------------------------------------- def _glob_candidate_bases( primary_dir: Optional["PathDeclaration"], fallback_dirs: Optional[List[str]], config: "PipelineConfig", context: "ProcessingContext", ) -> List[Path]: """Return the ordered list of base directories to glob in. The primary directory (or ``context.input_dir`` when *primary_dir* is ``None``) comes first, followed by any resolvable ``fallback_dirs`` not already present. Shared by :class:`GlobMatch` and :class:`GlobGroup`. """ bases: List[Path] = [] if primary_dir is not None: primary = primary_dir.resolve(config, context) if isinstance(primary, Path): bases.append(primary) elif context.input_dir is not None: bases.append(context.input_dir) if fallback_dirs: for name in fallback_dirs: base = _resolve_named_dir(name, context) if base is None: continue if base not in bases: bases.append(base) return bases def _glob_in_base(base: Path, pattern: str, recursive_fallback: bool) -> List[Path]: """Glob *pattern* within *base*, optionally falling back to ``rglob``. Shared by :class:`GlobMatch` and :class:`GlobGroup`. """ if not base.exists(): return [] matches = sorted(base.glob(pattern)) if not matches and recursive_fallback: matches = sorted(base.rglob(pattern)) return matches
[docs] @dataclass(frozen=True) class GlobMatch(PathDeclaration): """Discover files matching a glob pattern, with directory-fallback search. Resolution returns a sorted ``list[Path]``. The implicit existence check enforces ``len(matches) >= min_matches`` unless ``optional`` is set. Args: pattern: Glob expression (e.g. ``"merged_th*samples.nii.gz"``). ``{patient_id}`` is substituted. primary_dir: First directory to search. When ``None``, the search starts from ``context.input_dir``. fallback_dirs: Ordered list of base-directory names tried when the primary yields no matches. Valid names: ``"input_dir"``, ``"output_dir"``, ``"working_dir"``, ``"data_dir"``, ``"."``. recursive_fallback: When ``True``, each candidate base is also searched via :meth:`Path.rglob` before moving to the next entry. min_matches: Minimum match count for the implicit existence check (default ``1``). optional: Skip the implicit existence check; resolution may return ``[]``. Raises: ConfigurationError: When *pattern* is empty or *fallback_dirs* contains an unknown name. """ pattern: str = "" primary_dir: Optional[PathDeclaration] = None fallback_dirs: Optional[List[str]] = None recursive_fallback: bool = False min_matches: int = 1 optional: bool = False _kind: str = "any" def __post_init__(self) -> None: if not self.pattern: raise ConfigurationError("GlobMatch requires a non-empty 'pattern'.") if self.min_matches < 0: raise ConfigurationError(f"GlobMatch.min_matches must be >= 0, got {self.min_matches}") _validate_fallback_dirs(self.fallback_dirs) def _candidate_bases( self, config: "PipelineConfig", context: "ProcessingContext" ) -> List[Path]: return _glob_candidate_bases(self.primary_dir, self.fallback_dirs, config, context) def _glob_in(self, base: Path, pattern: str) -> List[Path]: return _glob_in_base(base, pattern, self.recursive_fallback)
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> List[Path]: pattern = _substitute_patient_id(self.pattern, context) for base in self._candidate_bases(config, context): matches = self._glob_in(base, pattern) if matches: return matches return []
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] matches = self.resolve(config, context) if len(matches) < self.min_matches: bases = self._candidate_bases(config, context) primary = bases[0] if bases else None return [ f"Required glob '{name}' (pattern {self.pattern!r}) " f"matched {len(matches)} files (need >= {self.min_matches}) " f"under: {primary}" ] return []
[docs] class GlobGroupResult: """Result object returned by :meth:`GlobGroup.resolve`. Exposes one attribute per declared item (``list[Path]``) plus ``_found_dir`` (``Path | None``). Iteration is not supported; access items by name. """ __slots__ = ("_items", "_found_dir")
[docs] def __init__(self, items: dict, found_dir: Optional[Path]) -> None: object.__setattr__(self, "_items", dict(items)) object.__setattr__(self, "_found_dir", found_dir)
def __getattr__(self, key: str) -> Any: # Only called when normal lookup misses; __slots__ exclude __dict__ items = object.__getattribute__(self, "_items") if key in items: return items[key] raise AttributeError(key) def __repr__(self) -> str: return f"GlobGroupResult(_found_dir={self._found_dir!r}, items={self._items!r})" def __eq__(self, other: object) -> bool: if not isinstance(other, GlobGroupResult): return NotImplemented return bool(self._items == other._items and self._found_dir == other._found_dir)
[docs] @dataclass(frozen=True) class GlobGroup(PathDeclaration): """Resolve a group of related globs sharing the same search directory. The key insight: *all* items in the group are resolved against the same base directory. If the first item has no matches in the primary directory, the entire group falls through to the next candidate. This mirrors the directory-discovery semantics of ``prepare_hcp_paths``. Args: items: Mapping of result-attribute name to glob pattern. ``{patient_id}`` is substituted in each pattern. primary_dir: First directory to search (default: ``context.input_dir``). fallback_dirs: Ordered fallback names (see :class:`GlobMatch`). recursive_fallback: When ``True``, ``rglob`` is tried before advancing to the next candidate base. optional: When ``True``, skip the implicit existence check. Returns: A :class:`GlobGroupResult` with one attribute per ``items`` key (each a ``list[Path]``) plus ``_found_dir``. Raises: ConfigurationError: When *items* is empty or *fallback_dirs* contains an unknown name. """ items: dict = field(default_factory=dict) primary_dir: Optional[PathDeclaration] = None fallback_dirs: Optional[List[str]] = None recursive_fallback: bool = False optional: bool = False _kind: str = "any" def __post_init__(self) -> None: if not self.items: raise ConfigurationError("GlobGroup requires a non-empty 'items' mapping.") for key, pat in self.items.items(): if not isinstance(key, str) or not isinstance(pat, str): raise ConfigurationError( "GlobGroup.items must map str -> str (got " f"{type(key).__name__} -> {type(pat).__name__})." ) _validate_fallback_dirs(self.fallback_dirs) def _candidate_bases( self, config: "PipelineConfig", context: "ProcessingContext" ) -> List[Path]: return _glob_candidate_bases(self.primary_dir, self.fallback_dirs, config, context) def _glob_in(self, base: Path, pattern: str) -> List[Path]: return _glob_in_base(base, pattern, self.recursive_fallback)
[docs] def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> GlobGroupResult: item_list = list(self.items.items()) first_key, first_pat = item_list[0] first_pat_sub = _substitute_patient_id(first_pat, context) for base in self._candidate_bases(config, context): first_matches = self._glob_in(base, first_pat_sub) if not first_matches: continue resolved: dict = {first_key: first_matches} for key, pat in item_list[1:]: pat_sub = _substitute_patient_id(pat, context) resolved[key] = self._glob_in(base, pat_sub) return GlobGroupResult(resolved, base) empty: dict = {key: [] for key, _ in item_list} return GlobGroupResult(empty, None)
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] result = self.resolve(config, context) errors: list[str] = [] if result._found_dir is None: patterns = ", ".join(repr(p) for p in self.items.values()) return [ f"Required glob group '{name}' (patterns {patterns}) " f"matched nothing in any candidate directory." ] for key in self.items: if not getattr(result, key): errors.append( f"Required glob group '{name}.{key}' " f"(pattern {self.items[key]!r}) matched no files in {result._found_dir}." ) return errors
[docs] class ConfigListItem: """Single resolved item from a :class:`ConfigList`. Attributes are populated dynamically from the parent declaration's ``file_fields`` / ``dir_fields`` / ``str_fields``. Values are :class:`pathlib.Path` (or ``list[Path]``) for file / dir fields and ``str`` for string fields. Missing keys resolve to ``None``. """ __slots__ = ("_fields",)
[docs] def __init__(self, fields: dict) -> None: object.__setattr__(self, "_fields", dict(fields))
def __getattr__(self, key: str) -> Any: fields = object.__getattribute__(self, "_fields") if key in fields: return fields[key] raise AttributeError(key) def __repr__(self) -> str: return f"ConfigListItem({self._fields!r})" def __eq__(self, other: object) -> bool: if not isinstance(other, ConfigListItem): return NotImplemented return bool(self._fields == other._fields)
[docs] @dataclass(frozen=True) class ConfigList(PathDeclaration): """Iterate a structured YAML list and resolve per-item paths. Useful for jobs / batch specifications where the YAML carries a list of dicts and each dict contains filenames the workflow needs as :class:`pathlib.Path` objects. Args: config_path: Dotted path on the :class:`~thesis.core.config.PipelineConfig` to the list. file_fields: YAML keys whose values should be resolved as files (relative paths anchored under ``context.input_dir`` or one of *fallback_dirs*; absolute paths are kept as-is). List values become ``list[Path]``. dir_fields: YAML keys whose values should be resolved as directories. Same anchoring rules as *file_fields*. str_fields: YAML keys kept as plain ``str`` (no resolution). fallback_dirs: Same semantics as :class:`PatientFile`. Defaults to ``["input_dir"]`` if unset. optional: When ``True``, missing config path or empty list is permitted; otherwise the implicit existence check fails. Returns: ``list[ConfigListItem]`` — one per YAML list entry. Raises: ConfigurationError: When *config_path* is empty or *fallback_dirs* contains an unknown name. """ config_path: str = "" file_fields: Tuple[str, ...] = () dir_fields: Tuple[str, ...] = () str_fields: Tuple[str, ...] = () fallback_dirs: Optional[List[str]] = None optional: bool = False _kind: str = "any" def __post_init__(self) -> None: if not self.config_path: raise ConfigurationError("ConfigList requires a non-empty 'config_path'.") # Tolerate list inputs by normalising to tuples for hashability. for field_name in ("file_fields", "dir_fields", "str_fields"): value = getattr(self, field_name) if isinstance(value, list): object.__setattr__(self, field_name, tuple(value)) all_fields = (*self.file_fields, *self.dir_fields, *self.str_fields) seen: set = set() for f in all_fields: if f in seen: raise ConfigurationError( f"ConfigList field {f!r} appears in more than one of " "file_fields / dir_fields / str_fields." ) seen.add(f) _validate_fallback_dirs(self.fallback_dirs) def _resolve_one_path( self, raw: Any, context: "ProcessingContext", must_be_dir: bool, ) -> Optional[Path]: """Resolve a single string into a Path using fallback search.""" if raw is None: return None text = str(raw) candidate = Path(text) if candidate.is_absolute(): return candidate dir_names = self.fallback_dirs or ["input_dir"] if must_be_dir: found = _search_dirs_for_dir(text, dir_names, context) else: found = _search_dirs_for_file(text, dir_names, context) if found is not None: return found # Anchor to first valid named dir for canonical error path. for name in dir_names: base = _resolve_named_dir(name, context) if base is not None: return base / text return candidate # pragma: no cover - all named dirs are None def _resolve_field( self, raw: Any, context: "ProcessingContext", kind: str, ) -> Any: if kind == "str": return None if raw is None else str(raw) must_be_dir = kind == "dir" if isinstance(raw, list): return [self._resolve_one_path(v, context, must_be_dir) for v in raw] return self._resolve_one_path(raw, context, must_be_dir) def _raw_list(self, config: "PipelineConfig") -> Optional[List[Any]]: raw = _config_lookup(config, self.config_path) if raw is None: return None if not isinstance(raw, list): raise ConfigurationError( f"ConfigList expects a list at {self.config_path!r}, " f"got {type(raw).__name__}." ) return raw
[docs] def resolve( self, config: "PipelineConfig", context: "ProcessingContext" ) -> List[ConfigListItem]: raw_list = self._raw_list(config) if not raw_list: return [] items: List[ConfigListItem] = [] for entry in raw_list: fields: dict = {} getter = ( entry.get if isinstance(entry, dict) else lambda k, default=None, e=entry: getattr(e, k, default) ) for f in self.file_fields: fields[f] = self._resolve_field(getter(f), context, "file") for f in self.dir_fields: fields[f] = self._resolve_field(getter(f), context, "dir") for f in self.str_fields: fields[f] = self._resolve_field(getter(f), context, "str") items.append(ConfigListItem(fields)) return items
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] raw_list = self._raw_list(config) if not raw_list: return [f"Required ConfigList '{name}' at {self.config_path!r} is " "empty or unset."] errors: list[str] = [] items = self.resolve(config, context) for idx, item in enumerate(items): for f in self.file_fields: value = getattr(item, f, None) if value is None: errors.append(f"ConfigList '{name}'[{idx}].{f}: missing required file.") continue for path in value if isinstance(value, list) else [value]: if not path.exists() or not path.is_file(): errors.append( f"ConfigList '{name}'[{idx}].{f}: file not found " f"at {path}." ) for f in self.dir_fields: value = getattr(item, f, None) if value is None: errors.append(f"ConfigList '{name}'[{idx}].{f}: missing required directory.") continue for path in value if isinstance(value, list) else [value]: if not path.exists() or not path.is_dir(): errors.append( f"ConfigList '{name}'[{idx}].{f}: directory not found " f"at {path}." ) return errors
[docs] class CohortPatient: """Single per-patient entry returned by :class:`CohortPatients`. Attributes: patient_id: Subdirectory name treated as the patient identifier. patient_dir: Absolute path to the patient's root directory. Additional attributes named after the keys of :attr:`CohortPatients.file_patterns` carry the first matching :class:`pathlib.Path` for that pattern. """ __slots__ = ("_fields",)
[docs] def __init__(self, fields: dict) -> None: object.__setattr__(self, "_fields", dict(fields))
def __getattr__(self, key: str) -> Any: fields = object.__getattribute__(self, "_fields") if key in fields: return fields[key] raise AttributeError(key) def __repr__(self) -> str: return f"CohortPatient({self._fields!r})" def __eq__(self, other: object) -> bool: if not isinstance(other, CohortPatient): return NotImplemented return bool(self._fields == other._fields)
[docs] @dataclass(frozen=True) class CohortPatients(PathDeclaration): """Iterate per-patient subdirectories under a cohort root. Args: root_dir: Cohort root (default: ``context.output_dir``). subdir: Optional per-patient subdirectory that must exist for the patient to be included. exclude: Patient-id names to skip (case-sensitive). min_patients: Minimum patient count for the implicit existence check. file_patterns: Optional ``key -> glob`` map. Patients missing *any* pattern are excluded. Each pattern resolves to the first (sorted) match under the patient's directory and is exposed as ``patient.<key>``. optional: Skip the implicit existence check. """ root_dir: Optional[PathDeclaration] = None subdir: Optional[str] = None exclude: Tuple[str, ...] = ("cohort", "atlas", "_meta") min_patients: int = 1 file_patterns: Optional[dict] = None optional: bool = False _kind: str = "cohort" def __post_init__(self) -> None: if self.min_patients < 0: raise ConfigurationError( f"CohortPatients.min_patients must be >= 0, got {self.min_patients}" ) if self.file_patterns is not None: for k, v in self.file_patterns.items(): if not isinstance(k, str) or not isinstance(v, str): raise ConfigurationError("CohortPatients.file_patterns must map str -> str.") # Normalise list -> tuple for the exclude field. if isinstance(self.exclude, list): object.__setattr__(self, "exclude", tuple(self.exclude)) def _root(self, config: "PipelineConfig", context: "ProcessingContext") -> Optional[Path]: if self.root_dir is not None: resolved = self.root_dir.resolve(config, context) return resolved if isinstance(resolved, Path) else None return context.output_dir def _is_candidate(self, path: Path) -> bool: if not path.is_dir(): return False if path.name in self.exclude: return False if path.name.startswith("."): return False return True
[docs] def resolve( self, config: "PipelineConfig", context: "ProcessingContext" ) -> List[CohortPatient]: root = self._root(config, context) if root is None or not root.exists(): return [] patients: List[CohortPatient] = [] for child in sorted(root.iterdir()): if not self._is_candidate(child): continue patient_dir = child if self.subdir is not None: target = patient_dir / self.subdir if not target.exists() or not target.is_dir(): continue patient_dir = target fields: dict = { "patient_id": child.name, "patient_dir": patient_dir, } include = True if self.file_patterns: for key, pat in self.file_patterns.items(): matches = sorted(patient_dir.glob(pat)) if not matches: include = False break fields[key] = matches[0] if include: patients.append(CohortPatient(fields)) return patients
[docs] def existence_errors( self, config: "PipelineConfig", context: "ProcessingContext", name: str, ) -> list[str]: if self.optional: return [] patients = self.resolve(config, context) if len(patients) < self.min_patients: root = self._root(config, context) return [ f"Required cohort '{name}' yielded {len(patients)} patients " f"(need >= {self.min_patients}) under: {root}" ] return []