"""Declarative path types for the workflow decorator API.
These types let workflow authors declare which patient inputs, outputs,
and working files their workflow needs without manually unpacking
``config`` and ``context``. The ``@workflow`` decorator's synthesized
adapter resolves each declaration against the current
:class:`~thesis.core.context.ProcessingContext` at workflow-build time
and injects the resolved :class:`pathlib.Path` as a keyword argument
to the decorated function.
See ``markdowns/plans/workflow-decorator/01-design-spec.md`` and
``02-api-reference.md`` for the full design.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union
from thesis.core.exceptions import ConfigurationError
if TYPE_CHECKING: # pragma: no cover - typing only
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
__all__ = [
"PathDeclaration",
"PatientFile",
"PatientDir",
"OutputDir",
"WorkingFile",
"CohortDir",
"PriorOutput",
"DataFile",
"DataDir",
"ExternalFile",
"GlobMatch",
"GlobGroup",
"GlobGroupResult",
"ConfigList",
"ConfigListItem",
"CohortPatients",
"CohortPatient",
"_config_lookup",
]
_VALID_FALLBACK_DIR_NAMES = frozenset({"input_dir", "output_dir", "working_dir", "data_dir", "."})
def _resolve_named_dir(name: str, context: "ProcessingContext") -> Optional[Path]:
"""Resolve a fallback-dir name like ``"input_dir"`` to the corresponding
directory on *context*.
Returns ``None`` when the named directory is not configured on *context*
(e.g. ``working_dir`` may be ``None`` in some edge contexts).
"""
if name == ".":
return Path.cwd()
if name not in _VALID_FALLBACK_DIR_NAMES:
raise ConfigurationError(
f"Invalid fallback_dirs entry {name!r}. Valid names: "
f"{sorted(_VALID_FALLBACK_DIR_NAMES)}."
)
return getattr(context, name, None)
def _validate_fallback_dirs(fallback_dirs: Optional[List[str]]) -> None:
"""Validate that every entry in *fallback_dirs* is a known dir name."""
if not fallback_dirs:
return
for entry in fallback_dirs:
if entry not in _VALID_FALLBACK_DIR_NAMES:
raise ConfigurationError(
f"Invalid fallback_dirs entry {entry!r}. Valid names: "
f"{sorted(_VALID_FALLBACK_DIR_NAMES)}."
)
def _config_lookup(config: Any, dotted: str) -> Any:
"""Walk a dotted attribute path on a config object.
Returns the resolved value or ``None`` if any intermediate attribute
is missing or itself ``None``. Empty paths return ``None``. ``dict``
values along the path are traversed via ``.get`` so config sections
declared as ``Dict[str, Any]`` (e.g. ``PipelineConfig.preprocess``)
work the same as Pydantic-validated sections.
Args:
config: A :class:`~thesis.core.config.PipelineConfig` (or any
object that supports ``getattr``/``__getitem__``).
dotted: Dotted attribute path, e.g. ``"hcp.t1_image"``.
Returns:
The resolved value, or ``None`` when the path cannot be followed.
"""
if not dotted:
return None
current: Any = config
for segment in dotted.split("."):
if current is None:
return None
if isinstance(current, dict):
current = current.get(segment)
else:
current = getattr(current, segment, None)
return current
def _build_config_path_chain(
config_path: Optional[str],
config_paths: Optional[Union[str, List[str]]],
) -> List[str]:
"""Return the ordered list of dotted config paths to consult.
``config_paths`` (string or list) is consulted first, then
``config_path`` is appended if it is set and not already present. Shared
by :class:`PatientFile`, :class:`PatientDir`, and :class:`ExternalFile`.
"""
chain: List[str] = []
if config_paths is not None:
if isinstance(config_paths, str):
chain.append(config_paths)
else:
chain.extend(config_paths)
if config_path is not None and config_path not in chain:
chain.append(config_path)
return chain
def _resolve_config_name(
config: Any,
chain: List[str],
default: Optional[str] = None,
*,
skip_empty: bool = False,
) -> Optional[str]:
"""Return the first non-``None`` config value along *chain*, else *default*.
Args:
config: Config object to look up values on.
chain: Ordered dotted paths (from :func:`_build_config_path_chain`).
default: Fallback returned when no config value resolves.
skip_empty: When ``True``, an empty-string config value is treated the
same as unset (used by :class:`ExternalFile` so a blank config value
does not resolve to a base directory).
"""
for path in chain:
value = _config_lookup(config, path)
if value is None:
continue
if skip_empty and str(value) == "":
continue
return str(value)
return default
def _require_path_source(
cls_name: str,
*,
has_default: bool,
config_path: Optional[str],
config_paths: Optional[Union[str, List[str]]],
optional: bool,
) -> None:
"""Raise ``ConfigurationError`` when no path source is configured.
Shared ``__post_init__`` guard for declarations that need at least one of
``default`` / ``config_path`` / ``config_paths`` (or ``optional=True``).
"""
if has_default or config_path is not None or config_paths is not None or optional:
return
raise ConfigurationError(
f"{cls_name} requires at least one of 'default', 'config_path', "
"'config_paths', or 'optional=True' — otherwise no path can be derived."
)
def _substitute_patient_id(name: str, context: "ProcessingContext") -> str:
"""Apply ``{patient_id}`` template substitution to a path component.
No-op when *name* contains no ``{`` placeholders. When it does, the
only supported key is ``patient_id`` — any other placeholder raises
``KeyError``. ``str.format_map`` with a single-key dict is used so
missing keys surface immediately rather than silently passing through.
"""
if "{" not in name:
return name
return name.format_map({"patient_id": context.patient_id})
def _search_dirs_for_file(
filename: str,
dir_names: List[str],
context: "ProcessingContext",
) -> Optional[Path]:
"""Search *dir_names* in order for *filename*; return first existing match.
Each entry in *dir_names* is resolved via :func:`_resolve_named_dir` to
a base directory on *context* (``input_dir``, ``output_dir`` …). The
first ``base / filename`` that exists wins. Returns ``None`` if no
candidate exists.
"""
for name in dir_names:
base = _resolve_named_dir(name, context)
if base is None:
continue
candidate = base / filename
if candidate.exists():
return candidate
return None
def _search_dirs_for_dir(
dirname: str,
dir_names: List[str],
context: "ProcessingContext",
) -> Optional[Path]:
"""Search *dir_names* in order for a subdirectory *dirname*."""
for name in dir_names:
base = _resolve_named_dir(name, context)
if base is None:
continue
candidate = base / dirname
if candidate.exists() and candidate.is_dir():
return candidate
return None
[docs]
class PathDeclaration:
"""Base marker class for path declarations.
Subclasses implement :meth:`resolve` (Path resolution against a
:class:`ProcessingContext`) and :meth:`existence_errors` (implicit
preflight check used by the synthesized verifier).
"""
# Tag used by ``@workflow(scope="cohort")`` validation. Patient-side
# declarations set ``_kind = "patient"``; cohort-side and shared
# declarations set ``_kind = "cohort"`` or ``_kind = "any"``.
_kind: str = "any"
[docs]
def resolve(
self, config: "PipelineConfig", context: "ProcessingContext"
) -> Any: # pragma: no cover - overridden
"""Resolve the declaration to a concrete value for the workflow body.
The return type varies by subclass: single-Path primitives return
:class:`pathlib.Path`; glob primitives return ``list[Path]`` or a
namespace object; structured primitives return a list of dataclass
instances.
"""
raise NotImplementedError
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
"""Return any preflight error strings (empty when nothing to check)."""
return []
[docs]
@dataclass(frozen=True)
class PatientFile(PathDeclaration):
"""Declare a per-patient input file under ``context.input_dir``.
Args:
default: Filename to use when no ``config_path``/``config_paths``
value resolves to non-``None``. Relative to ``context.input_dir``
(or the first matching ``fallback_dirs`` entry). May contain a
``{patient_id}`` placeholder.
config_path: Single dotted attribute path on the
:class:`~thesis.core.config.PipelineConfig` whose value, if
non-``None``, overrides the default. Kept for backward
compatibility with Phase A/A.2/B call sites — prefer
``config_paths`` for new code.
config_paths: Single dotted path *or* list of dotted paths
consulted in priority order. The first non-``None`` value wins.
When both ``config_path`` and ``config_paths`` are given,
``config_paths`` takes precedence; if it yields nothing,
``config_path`` is consulted next.
fallback_dirs: Optional ordered list of base-directory names to
search if the resolved filename does not exist at
``context.input_dir``. Valid names: ``"input_dir"``,
``"output_dir"``, ``"working_dir"``, ``"data_dir"``, ``"."``.
When set, the names listed are the *complete* search order —
include ``"input_dir"`` explicitly if you want it searched.
optional: When ``True``, missing paths resolve to ``None`` and the
implicit existence check is skipped.
Raises:
ConfigurationError: When ``default``, ``config_path``,
``config_paths`` are all unset and ``optional=False`` — there
is no way to derive a path. Also raised when ``fallback_dirs``
contains an unknown directory name.
"""
default: Optional[str] = None
config_path: Optional[str] = None
config_paths: Optional[Union[str, List[str]]] = None
fallback_dirs: Optional[List[str]] = None
optional: bool = False
_kind: str = "patient"
def __post_init__(self) -> None:
_require_path_source(
"PatientFile",
has_default=self.default is not None,
config_path=self.config_path,
config_paths=self.config_paths,
optional=self.optional,
)
_validate_fallback_dirs(self.fallback_dirs)
def _config_path_chain(self) -> List[str]:
"""Return the ordered list of dotted config paths to consult."""
return _build_config_path_chain(self.config_path, self.config_paths)
def _resolved_name(self, config: "PipelineConfig") -> Optional[str]:
return _resolve_config_name(config, self._config_path_chain(), self.default)
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Optional[Path]:
name = self._resolved_name(config)
if name is None:
return None
name = _substitute_patient_id(name, context)
primary = context.get_input_path(name)
if self.fallback_dirs and not primary.exists():
found = _search_dirs_for_file(name, self.fallback_dirs, context)
if found is not None:
return found
# No fallback hit — fall through to returning the canonical
# path under input_dir so existence_errors can report it.
return primary
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
resolved = self.resolve(config, context)
if resolved is None or not resolved.exists():
return [f"Required input '{name}' not found at: {resolved}"]
if not resolved.is_file():
return [f"Required input '{name}' is not a regular file: {resolved}"]
return []
[docs]
@dataclass(frozen=True)
class PatientDir(PathDeclaration):
"""Declare a per-patient input directory under ``context.input_dir``.
Mirrors :class:`PatientFile` (including ``config_paths`` and
``fallback_dirs``) but enforces that the resolved path is a directory.
Supports ``{patient_id}`` template substitution.
"""
default: Optional[str] = None
config_path: Optional[str] = None
config_paths: Optional[Union[str, List[str]]] = None
fallback_dirs: Optional[List[str]] = None
optional: bool = False
_kind: str = "patient"
def __post_init__(self) -> None:
_require_path_source(
"PatientDir",
has_default=self.default is not None,
config_path=self.config_path,
config_paths=self.config_paths,
optional=self.optional,
)
_validate_fallback_dirs(self.fallback_dirs)
def _config_path_chain(self) -> List[str]:
return _build_config_path_chain(self.config_path, self.config_paths)
def _resolved_name(self, config: "PipelineConfig") -> Optional[str]:
return _resolve_config_name(config, self._config_path_chain(), self.default)
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Optional[Path]:
name = self._resolved_name(config)
if name is None:
return None
name = _substitute_patient_id(name, context)
primary = context.get_input_path(name)
if self.fallback_dirs and not (primary.exists() and primary.is_dir()):
found = _search_dirs_for_dir(name, self.fallback_dirs, context)
if found is not None:
return found
return primary
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
resolved = self.resolve(config, context)
if resolved is None or not resolved.exists():
return [f"Required directory '{name}' not found at: {resolved}"]
if not resolved.is_dir():
return [f"Required path '{name}' is not a directory: {resolved}"]
return []
[docs]
@dataclass(frozen=True)
class OutputDir(PathDeclaration):
"""Declare an output subdirectory under ``context.output_dir``.
Resolution creates the directory (``mkdir(parents=True,
exist_ok=True)``). No implicit existence check is generated.
"""
subdir: str = ""
_kind: str = "any"
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path:
if context.output_dir is None: # pragma: no cover - defensive
raise ConfigurationError("output_dir is not set on ProcessingContext")
target = context.output_dir / self.subdir if self.subdir else context.output_dir
target.mkdir(parents=True, exist_ok=True)
return target
[docs]
@dataclass(frozen=True)
class WorkingFile(PathDeclaration):
"""Declare a temporary file under ``context.working_dir``.
Valid in both ``scope="patient"`` and ``scope="cohort"``. No implicit
existence check is generated.
"""
name: str = ""
_kind: str = "any"
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path:
return context.get_working_path(self.name)
[docs]
@dataclass(frozen=True)
class CohortDir(PathDeclaration):
"""Declare a cohort-level output directory.
Resolves to ``context.output_dir / subdir`` (the cohort dispatch path
in ``cli.py`` sets ``output_dir`` to the cohort root for cohort-scope
workflows). The directory is created on resolution.
"""
subdir: str = ""
_kind: str = "cohort"
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path:
if context.output_dir is None: # pragma: no cover - defensive
raise ConfigurationError("output_dir is not set on ProcessingContext")
target = context.output_dir / self.subdir if self.subdir else context.output_dir
target.mkdir(parents=True, exist_ok=True)
return target
[docs]
@dataclass(frozen=True)
class PriorOutput(PathDeclaration):
"""Declare an existing file (or set of files) inside ``context.output_dir``
produced by an upstream workflow.
Used by downstream workflows (qc, tract_similarity) that consume the
HCP / preprocess / atlas results stored under the patient's output
tree.
Two resolution modes:
- **Single file** — set *filename*. Resolution returns a single
:class:`pathlib.Path` (``context.output_dir / subdir / filename``).
Supports ``{patient_id}`` template substitution.
- **Glob discovery** — set *glob_pattern*. Resolution returns a sorted
``list[Path]`` of matches under ``context.output_dir / subdir`` (or
recursively if *recursive* is ``True``). Returns ``[]`` when no
match and ``optional=True``.
Args:
filename: Single-file mode filename, relative to
``context.output_dir`` or *subdir*.
glob_pattern: Glob expression for the multi-file mode. Mutually
exclusive with *filename* (at least one must be set).
subdir: Optional subdirectory under ``context.output_dir``.
fallback_dirs: Optional ordered list of base-directory names to
search if the primary location (``output_dir``) does not
satisfy the lookup. Valid names: see ``PatientFile``.
recursive: When ``True`` and *glob_pattern* is set, use
:meth:`Path.rglob` instead of :meth:`Path.glob`.
optional: Skip the implicit existence check.
Raises:
ConfigurationError: When neither *filename* nor *glob_pattern* is
set, or when *fallback_dirs* contains an unknown name.
"""
filename: Optional[str] = None
glob_pattern: Optional[str] = None
subdir: Optional[str] = None
fallback_dirs: Optional[List[str]] = None
recursive: bool = False
optional: bool = False
_kind: str = "any"
def __post_init__(self) -> None:
if not self.filename and not self.glob_pattern:
raise ConfigurationError(
"PriorOutput requires either 'filename' or 'glob_pattern' " "to be set."
)
_validate_fallback_dirs(self.fallback_dirs)
def _candidate_bases(self, context: "ProcessingContext") -> List[Path]:
"""Return the ordered list of base directories to search."""
if context.output_dir is None: # pragma: no cover - defensive
raise ConfigurationError("output_dir is not set on ProcessingContext")
subdir = _substitute_patient_id(self.subdir, context) if self.subdir else None
bases: List[Path] = []
primary = context.output_dir / subdir if subdir else context.output_dir
bases.append(primary)
if self.fallback_dirs:
for name in self.fallback_dirs:
base = _resolve_named_dir(name, context)
if base is None:
continue
candidate = base / subdir if subdir else base
if candidate not in bases:
bases.append(candidate)
return bases
[docs]
def resolve(
self, config: "PipelineConfig", context: "ProcessingContext"
) -> Union[Path, List[Path]]:
bases = self._candidate_bases(context)
if self.glob_pattern is not None:
pattern = _substitute_patient_id(self.glob_pattern, context)
for base in bases:
if not base.exists():
continue
matches = sorted(base.rglob(pattern) if self.recursive else base.glob(pattern))
if matches:
return matches
return []
# Single-file mode
filename = _substitute_patient_id(self.filename or "", context)
# Search bases for the file. Primary is bases[0]; if it exists, return.
for base in bases:
candidate = base / filename
if candidate.exists():
return candidate
# Nothing found — return the canonical (primary) path so
# existence_errors can report it.
return bases[0] / filename
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
resolved = self.resolve(config, context)
if self.glob_pattern is not None:
if not isinstance(resolved, list) or len(resolved) == 0:
bases = self._candidate_bases(context)
pattern = _substitute_patient_id(self.glob_pattern, context)
return [
f"Required upstream output '{name}' "
f"(pattern {pattern!r}) not found under: {bases[0]}"
]
return []
# Single-file mode
assert isinstance(resolved, Path)
if not resolved.exists():
return [f"Required upstream output '{name}' not found at: {resolved}"]
if not resolved.is_file():
return [f"Required upstream output '{name}' is not a regular file: {resolved}"]
return []
def _check_data_path_traversal(resolved: Path, base: Path, label: str) -> None:
"""Raise ConfigurationError when *resolved* escapes *base*."""
try:
resolved.resolve().relative_to(base.resolve())
except ValueError as exc:
raise ConfigurationError(
f"Path traversal detected: '{resolved}' escapes {label} '{base}'"
) from exc
[docs]
@dataclass(frozen=True)
class DataFile(PathDeclaration):
"""Declare an input file under ``context.data_dir``.
Used for cohort-shared assets (templates, atlases, reference images)
that live in the project's data directory rather than the per-patient
input tree. Supports ``{patient_id}`` substitution.
Implicit existence check: the resolved path must exist and be a file
(unless ``optional=True``). Path-traversal is enforced — the resolved
path must remain under ``context.data_dir``.
"""
filename: str = ""
optional: bool = False
_kind: str = "any"
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path:
if context.data_dir is None: # pragma: no cover - defensive
raise ConfigurationError("data_dir is not set on ProcessingContext")
filename = _substitute_patient_id(self.filename, context)
candidate = Path(filename)
# Absolute path → explicit location; honour as-is (the traversal guard
# only protects relative escapes under data_dir).
if candidate.is_absolute():
return candidate
resolved = context.data_dir / candidate
_check_data_path_traversal(resolved, context.data_dir, "data_dir")
return resolved
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
resolved = self.resolve(config, context)
if not resolved.exists():
return [f"Required data file '{name}' not found at: {resolved}"]
if not resolved.is_file():
return [f"Required data file '{name}' is not a regular file: {resolved}"]
return []
[docs]
@dataclass(frozen=True)
class DataDir(PathDeclaration):
"""Declare an input directory under ``context.data_dir``.
Mirrors :class:`DataFile` but enforces that the resolved path is a
directory. Supports ``{patient_id}`` substitution and path-traversal
safety.
"""
dirname: str = ""
optional: bool = False
_kind: str = "any"
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Path:
if context.data_dir is None: # pragma: no cover - defensive
raise ConfigurationError("data_dir is not set on ProcessingContext")
dirname = _substitute_patient_id(self.dirname, context)
candidate = Path(dirname)
# Absolute path → explicit location; honour as-is (the traversal guard
# only protects relative escapes under data_dir).
if candidate.is_absolute():
return candidate
resolved = context.data_dir / candidate
_check_data_path_traversal(resolved, context.data_dir, "data_dir")
return resolved
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
resolved = self.resolve(config, context)
if not resolved.exists():
return [f"Required data directory '{name}' not found at: {resolved}"]
if not resolved.is_dir():
return [f"Required path '{name}' is not a directory: {resolved}"]
return []
[docs]
@dataclass(frozen=True)
class ExternalFile(PathDeclaration):
"""Declare an input file whose path comes from a config value and may live
anywhere on disk (absolute, ``~``, ``$ENV``, or relative to a base dir).
Unlike :class:`PatientFile` / :class:`DataFile`, this is **not** anchored to
the per-patient ``input_dir`` (or ``data_dir``) and applies **no
path-traversal guard**. It is for cohort-shared, out-of-tree assets whose
location is supplied by configuration — e.g. a registration template, a
transform warp, or an atlas reference image — which are routinely absolute
paths outside the patient tree (where ``PatientFile`` would raise).
Resolution: the first non-empty value from ``config_paths`` / ``config_path``
is read; ``{patient_id}`` is substituted; ``~`` and ``$ENV`` are expanded;
absolute paths are used as-is; relative paths resolve against the first
available ``base_dirs`` entry (default ``data_dir`` then cwd). Implicit
existence check (must be a regular file) unless ``optional``.
Args:
config_path: Single dotted config path (e.g. ``"registration.fixed_image"``).
config_paths: Single path or ordered list consulted before ``config_path``.
base_dirs: Ordered base-dir names for resolving *relative* values. Valid
names: ``"input_dir"``, ``"output_dir"``, ``"working_dir"``,
``"data_dir"``, ``"."``. Defaults to ``["data_dir", "."]``.
optional: When ``True``, an unset value resolves to ``None`` and the
implicit existence check is skipped.
Raises:
ConfigurationError: When neither ``config_path`` nor ``config_paths`` is
set (and not ``optional``), or ``base_dirs`` has an unknown name.
"""
config_path: Optional[str] = None
config_paths: Optional[Union[str, List[str]]] = None
base_dirs: Optional[List[str]] = None
optional: bool = False
_kind: str = "any"
def __post_init__(self) -> None:
if self.config_path is None and self.config_paths is None and not self.optional:
raise ConfigurationError(
"ExternalFile requires at least one of 'config_path', 'config_paths', "
"or 'optional=True' — otherwise no path can be derived."
)
_validate_fallback_dirs(self.base_dirs)
def _config_path_chain(self) -> List[str]:
return _build_config_path_chain(self.config_path, self.config_paths)
def _resolved_name(self, config: "PipelineConfig") -> Optional[str]:
# Treat an empty string the same as unset so a blank config value
# does not resolve to a base directory.
return _resolve_config_name(config, self._config_path_chain(), skip_empty=True)
def _base(self, context: "ProcessingContext") -> Path:
for name in self.base_dirs or ["data_dir", "."]:
base = _resolve_named_dir(name, context)
if base is not None:
return base
return Path.cwd()
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> Optional[Path]:
from thesis.core.utils import resolve_path
name = self._resolved_name(config)
if name is None:
return None
name = _substitute_patient_id(name, context)
return resolve_path(self._base(context), name)
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
label = " / ".join(self._config_path_chain()) or name
resolved = self.resolve(config, context)
if resolved is None:
return [f"Required input '{name}' ({label}) is not configured."]
if not resolved.exists():
return [f"Required input '{name}' ({label}) not found at: {resolved}"]
if not resolved.is_file():
return [f"Required input '{name}' ({label}) is not a regular file: {resolved}"]
return []
# ---------------------------------------------------------------------------
# Phase A.3 primitives: glob discovery, structured lists, cohort iteration
# ---------------------------------------------------------------------------
def _glob_candidate_bases(
primary_dir: Optional["PathDeclaration"],
fallback_dirs: Optional[List[str]],
config: "PipelineConfig",
context: "ProcessingContext",
) -> List[Path]:
"""Return the ordered list of base directories to glob in.
The primary directory (or ``context.input_dir`` when *primary_dir* is
``None``) comes first, followed by any resolvable ``fallback_dirs`` not
already present. Shared by :class:`GlobMatch` and :class:`GlobGroup`.
"""
bases: List[Path] = []
if primary_dir is not None:
primary = primary_dir.resolve(config, context)
if isinstance(primary, Path):
bases.append(primary)
elif context.input_dir is not None:
bases.append(context.input_dir)
if fallback_dirs:
for name in fallback_dirs:
base = _resolve_named_dir(name, context)
if base is None:
continue
if base not in bases:
bases.append(base)
return bases
def _glob_in_base(base: Path, pattern: str, recursive_fallback: bool) -> List[Path]:
"""Glob *pattern* within *base*, optionally falling back to ``rglob``.
Shared by :class:`GlobMatch` and :class:`GlobGroup`.
"""
if not base.exists():
return []
matches = sorted(base.glob(pattern))
if not matches and recursive_fallback:
matches = sorted(base.rglob(pattern))
return matches
[docs]
@dataclass(frozen=True)
class GlobMatch(PathDeclaration):
"""Discover files matching a glob pattern, with directory-fallback search.
Resolution returns a sorted ``list[Path]``. The implicit existence
check enforces ``len(matches) >= min_matches`` unless ``optional`` is
set.
Args:
pattern: Glob expression (e.g. ``"merged_th*samples.nii.gz"``).
``{patient_id}`` is substituted.
primary_dir: First directory to search. When ``None``, the search
starts from ``context.input_dir``.
fallback_dirs: Ordered list of base-directory names tried when
the primary yields no matches. Valid names: ``"input_dir"``,
``"output_dir"``, ``"working_dir"``, ``"data_dir"``, ``"."``.
recursive_fallback: When ``True``, each candidate base is also
searched via :meth:`Path.rglob` before moving to the next
entry.
min_matches: Minimum match count for the implicit existence
check (default ``1``).
optional: Skip the implicit existence check; resolution may
return ``[]``.
Raises:
ConfigurationError: When *pattern* is empty or *fallback_dirs*
contains an unknown name.
"""
pattern: str = ""
primary_dir: Optional[PathDeclaration] = None
fallback_dirs: Optional[List[str]] = None
recursive_fallback: bool = False
min_matches: int = 1
optional: bool = False
_kind: str = "any"
def __post_init__(self) -> None:
if not self.pattern:
raise ConfigurationError("GlobMatch requires a non-empty 'pattern'.")
if self.min_matches < 0:
raise ConfigurationError(f"GlobMatch.min_matches must be >= 0, got {self.min_matches}")
_validate_fallback_dirs(self.fallback_dirs)
def _candidate_bases(
self, config: "PipelineConfig", context: "ProcessingContext"
) -> List[Path]:
return _glob_candidate_bases(self.primary_dir, self.fallback_dirs, config, context)
def _glob_in(self, base: Path, pattern: str) -> List[Path]:
return _glob_in_base(base, pattern, self.recursive_fallback)
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> List[Path]:
pattern = _substitute_patient_id(self.pattern, context)
for base in self._candidate_bases(config, context):
matches = self._glob_in(base, pattern)
if matches:
return matches
return []
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
matches = self.resolve(config, context)
if len(matches) < self.min_matches:
bases = self._candidate_bases(config, context)
primary = bases[0] if bases else None
return [
f"Required glob '{name}' (pattern {self.pattern!r}) "
f"matched {len(matches)} files (need >= {self.min_matches}) "
f"under: {primary}"
]
return []
[docs]
class GlobGroupResult:
"""Result object returned by :meth:`GlobGroup.resolve`.
Exposes one attribute per declared item (``list[Path]``) plus
``_found_dir`` (``Path | None``). Iteration is not supported; access
items by name.
"""
__slots__ = ("_items", "_found_dir")
[docs]
def __init__(self, items: dict, found_dir: Optional[Path]) -> None:
object.__setattr__(self, "_items", dict(items))
object.__setattr__(self, "_found_dir", found_dir)
def __getattr__(self, key: str) -> Any:
# Only called when normal lookup misses; __slots__ exclude __dict__
items = object.__getattribute__(self, "_items")
if key in items:
return items[key]
raise AttributeError(key)
def __repr__(self) -> str:
return f"GlobGroupResult(_found_dir={self._found_dir!r}, items={self._items!r})"
def __eq__(self, other: object) -> bool:
if not isinstance(other, GlobGroupResult):
return NotImplemented
return bool(self._items == other._items and self._found_dir == other._found_dir)
[docs]
@dataclass(frozen=True)
class GlobGroup(PathDeclaration):
"""Resolve a group of related globs sharing the same search directory.
The key insight: *all* items in the group are resolved against the
same base directory. If the first item has no matches in the primary
directory, the entire group falls through to the next candidate. This
mirrors the directory-discovery semantics of ``prepare_hcp_paths``.
Args:
items: Mapping of result-attribute name to glob pattern.
``{patient_id}`` is substituted in each pattern.
primary_dir: First directory to search (default: ``context.input_dir``).
fallback_dirs: Ordered fallback names (see :class:`GlobMatch`).
recursive_fallback: When ``True``, ``rglob`` is tried before
advancing to the next candidate base.
optional: When ``True``, skip the implicit existence check.
Returns:
A :class:`GlobGroupResult` with one attribute per ``items`` key
(each a ``list[Path]``) plus ``_found_dir``.
Raises:
ConfigurationError: When *items* is empty or *fallback_dirs*
contains an unknown name.
"""
items: dict = field(default_factory=dict)
primary_dir: Optional[PathDeclaration] = None
fallback_dirs: Optional[List[str]] = None
recursive_fallback: bool = False
optional: bool = False
_kind: str = "any"
def __post_init__(self) -> None:
if not self.items:
raise ConfigurationError("GlobGroup requires a non-empty 'items' mapping.")
for key, pat in self.items.items():
if not isinstance(key, str) or not isinstance(pat, str):
raise ConfigurationError(
"GlobGroup.items must map str -> str (got "
f"{type(key).__name__} -> {type(pat).__name__})."
)
_validate_fallback_dirs(self.fallback_dirs)
def _candidate_bases(
self, config: "PipelineConfig", context: "ProcessingContext"
) -> List[Path]:
return _glob_candidate_bases(self.primary_dir, self.fallback_dirs, config, context)
def _glob_in(self, base: Path, pattern: str) -> List[Path]:
return _glob_in_base(base, pattern, self.recursive_fallback)
[docs]
def resolve(self, config: "PipelineConfig", context: "ProcessingContext") -> GlobGroupResult:
item_list = list(self.items.items())
first_key, first_pat = item_list[0]
first_pat_sub = _substitute_patient_id(first_pat, context)
for base in self._candidate_bases(config, context):
first_matches = self._glob_in(base, first_pat_sub)
if not first_matches:
continue
resolved: dict = {first_key: first_matches}
for key, pat in item_list[1:]:
pat_sub = _substitute_patient_id(pat, context)
resolved[key] = self._glob_in(base, pat_sub)
return GlobGroupResult(resolved, base)
empty: dict = {key: [] for key, _ in item_list}
return GlobGroupResult(empty, None)
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
result = self.resolve(config, context)
errors: list[str] = []
if result._found_dir is None:
patterns = ", ".join(repr(p) for p in self.items.values())
return [
f"Required glob group '{name}' (patterns {patterns}) "
f"matched nothing in any candidate directory."
]
for key in self.items:
if not getattr(result, key):
errors.append(
f"Required glob group '{name}.{key}' "
f"(pattern {self.items[key]!r}) matched no files in {result._found_dir}."
)
return errors
[docs]
class ConfigListItem:
"""Single resolved item from a :class:`ConfigList`.
Attributes are populated dynamically from the parent declaration's
``file_fields`` / ``dir_fields`` / ``str_fields``. Values are
:class:`pathlib.Path` (or ``list[Path]``) for file / dir fields and
``str`` for string fields. Missing keys resolve to ``None``.
"""
__slots__ = ("_fields",)
[docs]
def __init__(self, fields: dict) -> None:
object.__setattr__(self, "_fields", dict(fields))
def __getattr__(self, key: str) -> Any:
fields = object.__getattribute__(self, "_fields")
if key in fields:
return fields[key]
raise AttributeError(key)
def __repr__(self) -> str:
return f"ConfigListItem({self._fields!r})"
def __eq__(self, other: object) -> bool:
if not isinstance(other, ConfigListItem):
return NotImplemented
return bool(self._fields == other._fields)
[docs]
@dataclass(frozen=True)
class ConfigList(PathDeclaration):
"""Iterate a structured YAML list and resolve per-item paths.
Useful for jobs / batch specifications where the YAML carries a list
of dicts and each dict contains filenames the workflow needs as
:class:`pathlib.Path` objects.
Args:
config_path: Dotted path on the
:class:`~thesis.core.config.PipelineConfig` to the list.
file_fields: YAML keys whose values should be resolved as files
(relative paths anchored under ``context.input_dir`` or one
of *fallback_dirs*; absolute paths are kept as-is). List
values become ``list[Path]``.
dir_fields: YAML keys whose values should be resolved as
directories. Same anchoring rules as *file_fields*.
str_fields: YAML keys kept as plain ``str`` (no resolution).
fallback_dirs: Same semantics as :class:`PatientFile`. Defaults
to ``["input_dir"]`` if unset.
optional: When ``True``, missing config path or empty list is
permitted; otherwise the implicit existence check fails.
Returns:
``list[ConfigListItem]`` — one per YAML list entry.
Raises:
ConfigurationError: When *config_path* is empty or
*fallback_dirs* contains an unknown name.
"""
config_path: str = ""
file_fields: Tuple[str, ...] = ()
dir_fields: Tuple[str, ...] = ()
str_fields: Tuple[str, ...] = ()
fallback_dirs: Optional[List[str]] = None
optional: bool = False
_kind: str = "any"
def __post_init__(self) -> None:
if not self.config_path:
raise ConfigurationError("ConfigList requires a non-empty 'config_path'.")
# Tolerate list inputs by normalising to tuples for hashability.
for field_name in ("file_fields", "dir_fields", "str_fields"):
value = getattr(self, field_name)
if isinstance(value, list):
object.__setattr__(self, field_name, tuple(value))
all_fields = (*self.file_fields, *self.dir_fields, *self.str_fields)
seen: set = set()
for f in all_fields:
if f in seen:
raise ConfigurationError(
f"ConfigList field {f!r} appears in more than one of "
"file_fields / dir_fields / str_fields."
)
seen.add(f)
_validate_fallback_dirs(self.fallback_dirs)
def _resolve_one_path(
self,
raw: Any,
context: "ProcessingContext",
must_be_dir: bool,
) -> Optional[Path]:
"""Resolve a single string into a Path using fallback search."""
if raw is None:
return None
text = str(raw)
candidate = Path(text)
if candidate.is_absolute():
return candidate
dir_names = self.fallback_dirs or ["input_dir"]
if must_be_dir:
found = _search_dirs_for_dir(text, dir_names, context)
else:
found = _search_dirs_for_file(text, dir_names, context)
if found is not None:
return found
# Anchor to first valid named dir for canonical error path.
for name in dir_names:
base = _resolve_named_dir(name, context)
if base is not None:
return base / text
return candidate # pragma: no cover - all named dirs are None
def _resolve_field(
self,
raw: Any,
context: "ProcessingContext",
kind: str,
) -> Any:
if kind == "str":
return None if raw is None else str(raw)
must_be_dir = kind == "dir"
if isinstance(raw, list):
return [self._resolve_one_path(v, context, must_be_dir) for v in raw]
return self._resolve_one_path(raw, context, must_be_dir)
def _raw_list(self, config: "PipelineConfig") -> Optional[List[Any]]:
raw = _config_lookup(config, self.config_path)
if raw is None:
return None
if not isinstance(raw, list):
raise ConfigurationError(
f"ConfigList expects a list at {self.config_path!r}, " f"got {type(raw).__name__}."
)
return raw
[docs]
def resolve(
self, config: "PipelineConfig", context: "ProcessingContext"
) -> List[ConfigListItem]:
raw_list = self._raw_list(config)
if not raw_list:
return []
items: List[ConfigListItem] = []
for entry in raw_list:
fields: dict = {}
getter = (
entry.get
if isinstance(entry, dict)
else lambda k, default=None, e=entry: getattr(e, k, default)
)
for f in self.file_fields:
fields[f] = self._resolve_field(getter(f), context, "file")
for f in self.dir_fields:
fields[f] = self._resolve_field(getter(f), context, "dir")
for f in self.str_fields:
fields[f] = self._resolve_field(getter(f), context, "str")
items.append(ConfigListItem(fields))
return items
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
raw_list = self._raw_list(config)
if not raw_list:
return [f"Required ConfigList '{name}' at {self.config_path!r} is " "empty or unset."]
errors: list[str] = []
items = self.resolve(config, context)
for idx, item in enumerate(items):
for f in self.file_fields:
value = getattr(item, f, None)
if value is None:
errors.append(f"ConfigList '{name}'[{idx}].{f}: missing required file.")
continue
for path in value if isinstance(value, list) else [value]:
if not path.exists() or not path.is_file():
errors.append(
f"ConfigList '{name}'[{idx}].{f}: file not found " f"at {path}."
)
for f in self.dir_fields:
value = getattr(item, f, None)
if value is None:
errors.append(f"ConfigList '{name}'[{idx}].{f}: missing required directory.")
continue
for path in value if isinstance(value, list) else [value]:
if not path.exists() or not path.is_dir():
errors.append(
f"ConfigList '{name}'[{idx}].{f}: directory not found " f"at {path}."
)
return errors
[docs]
class CohortPatient:
"""Single per-patient entry returned by :class:`CohortPatients`.
Attributes:
patient_id: Subdirectory name treated as the patient identifier.
patient_dir: Absolute path to the patient's root directory.
Additional attributes named after the keys of
:attr:`CohortPatients.file_patterns` carry the first matching
:class:`pathlib.Path` for that pattern.
"""
__slots__ = ("_fields",)
[docs]
def __init__(self, fields: dict) -> None:
object.__setattr__(self, "_fields", dict(fields))
def __getattr__(self, key: str) -> Any:
fields = object.__getattribute__(self, "_fields")
if key in fields:
return fields[key]
raise AttributeError(key)
def __repr__(self) -> str:
return f"CohortPatient({self._fields!r})"
def __eq__(self, other: object) -> bool:
if not isinstance(other, CohortPatient):
return NotImplemented
return bool(self._fields == other._fields)
[docs]
@dataclass(frozen=True)
class CohortPatients(PathDeclaration):
"""Iterate per-patient subdirectories under a cohort root.
Args:
root_dir: Cohort root (default: ``context.output_dir``).
subdir: Optional per-patient subdirectory that must exist for
the patient to be included.
exclude: Patient-id names to skip (case-sensitive).
min_patients: Minimum patient count for the implicit existence
check.
file_patterns: Optional ``key -> glob`` map. Patients missing
*any* pattern are excluded. Each pattern resolves to the
first (sorted) match under the patient's directory and is
exposed as ``patient.<key>``.
optional: Skip the implicit existence check.
"""
root_dir: Optional[PathDeclaration] = None
subdir: Optional[str] = None
exclude: Tuple[str, ...] = ("cohort", "atlas", "_meta")
min_patients: int = 1
file_patterns: Optional[dict] = None
optional: bool = False
_kind: str = "cohort"
def __post_init__(self) -> None:
if self.min_patients < 0:
raise ConfigurationError(
f"CohortPatients.min_patients must be >= 0, got {self.min_patients}"
)
if self.file_patterns is not None:
for k, v in self.file_patterns.items():
if not isinstance(k, str) or not isinstance(v, str):
raise ConfigurationError("CohortPatients.file_patterns must map str -> str.")
# Normalise list -> tuple for the exclude field.
if isinstance(self.exclude, list):
object.__setattr__(self, "exclude", tuple(self.exclude))
def _root(self, config: "PipelineConfig", context: "ProcessingContext") -> Optional[Path]:
if self.root_dir is not None:
resolved = self.root_dir.resolve(config, context)
return resolved if isinstance(resolved, Path) else None
return context.output_dir
def _is_candidate(self, path: Path) -> bool:
if not path.is_dir():
return False
if path.name in self.exclude:
return False
if path.name.startswith("."):
return False
return True
[docs]
def resolve(
self, config: "PipelineConfig", context: "ProcessingContext"
) -> List[CohortPatient]:
root = self._root(config, context)
if root is None or not root.exists():
return []
patients: List[CohortPatient] = []
for child in sorted(root.iterdir()):
if not self._is_candidate(child):
continue
patient_dir = child
if self.subdir is not None:
target = patient_dir / self.subdir
if not target.exists() or not target.is_dir():
continue
patient_dir = target
fields: dict = {
"patient_id": child.name,
"patient_dir": patient_dir,
}
include = True
if self.file_patterns:
for key, pat in self.file_patterns.items():
matches = sorted(patient_dir.glob(pat))
if not matches:
include = False
break
fields[key] = matches[0]
if include:
patients.append(CohortPatient(fields))
return patients
[docs]
def existence_errors(
self,
config: "PipelineConfig",
context: "ProcessingContext",
name: str,
) -> list[str]:
if self.optional:
return []
patients = self.resolve(config, context)
if len(patients) < self.min_patients:
root = self._root(config, context)
return [
f"Required cohort '{name}' yielded {len(patients)} patients "
f"(need >= {self.min_patients}) under: {root}"
]
return []