Source code for thesis.core.config.manager

"""Configuration manager for hierarchical config loading and merging."""

from collections.abc import Mapping
from pathlib import Path
from typing import Optional, Union

from pydantic import ValidationError as PydanticValidationError

from thesis.core.exceptions import ConfigurationError
from thesis.core.logging import get_logger
from thesis.core.utils import to_path

from .loaders import ConfigDict, load_yaml, merge_configs, save_yaml
from .validators import PipelineConfig

logger = get_logger(__name__)

__all__ = ["ConfigManager"]


[docs] class ConfigManager: """ Manages configuration loading, merging, and validation. Supports hierarchical configuration with defaults, environment-specific overrides, and patient-specific configs. Uses Pydantic for validation and type safety. Config hierarchy (later configs override earlier): 1. Default config 2. Hardware config 3. Protocol config 4. Patient-specific config 5. Runtime overrides Example: >>> manager = ConfigManager(config_dir="./config") >>> config = manager.load_config("default") >>> config = manager.add_overrides(config, patient_id="DTI_LDF001") """
[docs] def __init__(self, config_dir: Optional[Union[str, Path]] = None): """ Initialize the configuration manager. Args: config_dir: Base directory for config files. If None, uses './config' """ if config_dir is None: config_dir = Path("config") self.config_dir = to_path(config_dir) self._config_cache: dict[ tuple[str, Optional[str], Optional[str], tuple[str, ...]], PipelineConfig ] = {} logger.debug(f"ConfigManager initialized with dir: {self.config_dir}")
[docs] def load_config( self, config_name: str = "default", patient_id: Optional[str] = None, protocol: Optional[str] = None, overrides: Optional[Mapping[str, object]] = None, protocol_required: bool = False, ) -> PipelineConfig: """ Load a configuration with optional overrides. Args: config_name: Name of the base config to load patient_id: Optional patient ID for patient-specific config protocol: Optional protocol name for protocol-specific config overrides: Optional dictionary of runtime overrides protocol_required: If True, raise ``ConfigurationError`` when ``protocol`` is given but no matching file exists. Set this when the protocol was explicitly requested by the user (e.g. via ``--protocol``) rather than supplied as a workflow's ``default_protocol`` fallback. Returns: Validated PipelineConfig object Raises: ConfigurationError: If the base ``config_name`` file is missing, or if ``protocol_required`` is True and the protocol file is missing. Example: >>> config = manager.load_config( ... config_name="default", ... patient_id="DTI_LDF001", ... overrides={"preprocessing": {"threads": 8}} ... ) """ # Check cache. The registry fingerprint is part of the key so a config # cached before a workflow namespace was registered is not served once # the registry changes (its validated/instantiated namespaces differ). from thesis.core.config.namespace_registry import NAMESPACE_REGISTRY registry_fingerprint = tuple(NAMESPACE_REGISTRY.list()) cache_key = (config_name, patient_id, protocol, registry_fingerprint) if cache_key in self._config_cache and not overrides: logger.debug(f"Loading config from cache: {cache_key}") return self._config_cache[cache_key] # Load base config configs_to_merge = [] # 1. Default/base config — explicitly requested, so a missing file is # treated as a hard error rather than silently running on defaults. base_config = self._load_config_file(config_name, required=True) if base_config: configs_to_merge.append(base_config) logger.debug(f"Loaded base config: {config_name}") # 2. Hardware config hardware_config = self._load_config_file("hardware", subdir=None) if hardware_config: configs_to_merge.append(hardware_config) logger.debug("Loaded hardware config") # 3. Protocol config — required only when explicitly requested by the # user; a workflow's default_protocol fallback may legitimately have # no dedicated protocols/ file. if protocol: protocol_config = self._load_config_file( protocol, subdir="protocols", required=protocol_required ) if protocol_config: configs_to_merge.append(protocol_config) logger.debug(f"Loaded protocol config: {protocol}") # 4. Patient-specific config — optional (not every patient has an # override), but log at INFO either way so it is visible which patient # config was or was not applied. if patient_id: patient_config = self._load_config_file(patient_id, subdir="patients") if patient_config: configs_to_merge.append(patient_config) logger.info(f"Applied patient config: config/patients/{patient_id}.yaml") else: logger.info( f"No patient config for '{patient_id}' " f"(config/patients/{patient_id}.yaml not found); " f"using base/protocol config values." ) # Merge all configs merged_dict = merge_configs(*configs_to_merge) # 5. Apply runtime overrides if overrides: merged_dict = merge_configs(merged_dict, dict(overrides)) logger.debug(f"Applied {len(overrides)} runtime overrides") # Add patient_id and protocol to merged config if patient_id: merged_dict["patient_id"] = patient_id if protocol: merged_dict["protocol"] = protocol # Validate with Pydantic try: config = PipelineConfig.from_dict(merged_dict) except PydanticValidationError as e: logger.error(f"Configuration validation failed: {e}") raise ConfigurationError(f"Invalid configuration: {e}") from e # Cache if no runtime overrides if not overrides: self._config_cache[cache_key] = config logger.info(f"Configuration loaded successfully: {config_name}") return config
def _load_config_file( self, name: str, subdir: Optional[str] = None, required: bool = False ) -> Optional[ConfigDict]: """ Load a single config file. Args: name: Config file name (without .yaml extension) subdir: Optional subdirectory within config_dir required: If True, raise ``ConfigurationError`` when the file is missing instead of silently returning ``None``. Use for explicitly user-requested configs (e.g. ``-c`` / ``--protocol``) where a missing file almost always means a typo. Returns: Config dictionary or None if file doesn't exist (and not required) Raises: ConfigurationError: If ``required`` is True and no matching ``.yaml``/``.yml`` file exists. """ # Try both .yaml and .yml extensions base_dir = self.config_dir / subdir if subdir else self.config_dir config_path = base_dir / f"{name}.yaml" if not config_path.exists(): config_path = base_dir / f"{name}.yml" if not config_path.exists(): if required: label = f"{subdir.rstrip('s')} config" if subdir else "config" available = self.list_configs(subdir=subdir) hint = ( f" Available {label}s: {', '.join(available)}." if available else f" No {label}s found in {base_dir}/." ) raise ConfigurationError( f"Requested {label} '{name}' not found " f"(looked for {base_dir / name}.yaml/.yml).{hint}" ) logger.debug(f"Config file not found: {base_dir / name}.yaml/.yml") return None try: config = load_yaml(config_path) logger.debug(f"Loaded config file: {config_path}") return config except Exception as e: logger.error(f"Error loading config {config_path}: {e}") raise
[docs] def load_config_dict(self, name: str, subdir: Optional[str] = None) -> Optional[ConfigDict]: """Load a raw config dictionary without validation. Args: name: Config file name without the extension. subdir: Optional config subdirectory. Returns: Raw config dictionary, or ``None`` when the file does not exist. """ return self._load_config_file(name, subdir=subdir)
[docs] def save_config( self, config: Union[PipelineConfig, ConfigDict], name: str, subdir: Optional[str] = None ) -> Path: """ Save a configuration to a YAML file. Args: config: Configuration to save (PipelineConfig or dict) name: Name for the config file subdir: Optional subdirectory Returns: Path to saved file """ if subdir: config_path = self.config_dir / subdir / f"{name}.yaml" else: config_path = self.config_dir / f"{name}.yaml" config_path.parent.mkdir(parents=True, exist_ok=True) # Convert PipelineConfig to dict if needed if isinstance(config, PipelineConfig): config_dict = config.to_dict() else: config_dict = config save_yaml(config_dict, config_path) logger.info(f"Config saved to: {config_path}") return config_path
[docs] def list_configs(self, subdir: Optional[str] = None) -> list[str]: """ List available config files. Args: subdir: Optional subdirectory to search Returns: List of config names (without .yaml extension) """ if subdir: search_dir = self.config_dir / subdir else: search_dir = self.config_dir if not search_dir.exists(): return [] configs = { f.stem for f in search_dir.iterdir() if f.is_file() and f.suffix in (".yaml", ".yml") } return sorted(configs)
[docs] def get_config_dir(self, subdir: Optional[str] = None) -> Path: """ Get the path to a config directory. Args: subdir: Optional subdirectory Returns: Path object """ if subdir: return self.config_dir / subdir return self.config_dir
[docs] def clear_cache(self) -> None: """Clear the configuration cache.""" self._config_cache.clear() logger.debug("Config cache cleared")