"""Configuration manager for hierarchical config loading and merging."""
from collections.abc import Mapping
from pathlib import Path
from typing import Optional, Union
from pydantic import ValidationError as PydanticValidationError
from thesis.core.exceptions import ConfigurationError
from thesis.core.logging import get_logger
from thesis.core.utils import to_path
from .loaders import ConfigDict, load_yaml, merge_configs, save_yaml
from .validators import PipelineConfig
logger = get_logger(__name__)
__all__ = ["ConfigManager"]
[docs]
class ConfigManager:
"""
Manages configuration loading, merging, and validation.
Supports hierarchical configuration with defaults, environment-specific
overrides, and patient-specific configs. Uses Pydantic for validation
and type safety.
Config hierarchy (later configs override earlier):
1. Default config
2. Hardware config
3. Protocol config
4. Patient-specific config
5. Runtime overrides
Example:
>>> manager = ConfigManager(config_dir="./config")
>>> config = manager.load_config("default")
>>> config = manager.add_overrides(config, patient_id="DTI_LDF001")
"""
[docs]
def __init__(self, config_dir: Optional[Union[str, Path]] = None):
"""
Initialize the configuration manager.
Args:
config_dir: Base directory for config files. If None, uses './config'
"""
if config_dir is None:
config_dir = Path("config")
self.config_dir = to_path(config_dir)
self._config_cache: dict[
tuple[str, Optional[str], Optional[str], tuple[str, ...]], PipelineConfig
] = {}
logger.debug(f"ConfigManager initialized with dir: {self.config_dir}")
[docs]
def load_config(
self,
config_name: str = "default",
patient_id: Optional[str] = None,
protocol: Optional[str] = None,
overrides: Optional[Mapping[str, object]] = None,
protocol_required: bool = False,
) -> PipelineConfig:
"""
Load a configuration with optional overrides.
Args:
config_name: Name of the base config to load
patient_id: Optional patient ID for patient-specific config
protocol: Optional protocol name for protocol-specific config
overrides: Optional dictionary of runtime overrides
protocol_required: If True, raise ``ConfigurationError`` when
``protocol`` is given but no matching file exists. Set this when
the protocol was explicitly requested by the user (e.g. via
``--protocol``) rather than supplied as a workflow's
``default_protocol`` fallback.
Returns:
Validated PipelineConfig object
Raises:
ConfigurationError: If the base ``config_name`` file is missing, or
if ``protocol_required`` is True and the protocol file is missing.
Example:
>>> config = manager.load_config(
... config_name="default",
... patient_id="DTI_LDF001",
... overrides={"preprocessing": {"threads": 8}}
... )
"""
# Check cache. The registry fingerprint is part of the key so a config
# cached before a workflow namespace was registered is not served once
# the registry changes (its validated/instantiated namespaces differ).
from thesis.core.config.namespace_registry import NAMESPACE_REGISTRY
registry_fingerprint = tuple(NAMESPACE_REGISTRY.list())
cache_key = (config_name, patient_id, protocol, registry_fingerprint)
if cache_key in self._config_cache and not overrides:
logger.debug(f"Loading config from cache: {cache_key}")
return self._config_cache[cache_key]
# Load base config
configs_to_merge = []
# 1. Default/base config — explicitly requested, so a missing file is
# treated as a hard error rather than silently running on defaults.
base_config = self._load_config_file(config_name, required=True)
if base_config:
configs_to_merge.append(base_config)
logger.debug(f"Loaded base config: {config_name}")
# 2. Hardware config
hardware_config = self._load_config_file("hardware", subdir=None)
if hardware_config:
configs_to_merge.append(hardware_config)
logger.debug("Loaded hardware config")
# 3. Protocol config — required only when explicitly requested by the
# user; a workflow's default_protocol fallback may legitimately have
# no dedicated protocols/ file.
if protocol:
protocol_config = self._load_config_file(
protocol, subdir="protocols", required=protocol_required
)
if protocol_config:
configs_to_merge.append(protocol_config)
logger.debug(f"Loaded protocol config: {protocol}")
# 4. Patient-specific config — optional (not every patient has an
# override), but log at INFO either way so it is visible which patient
# config was or was not applied.
if patient_id:
patient_config = self._load_config_file(patient_id, subdir="patients")
if patient_config:
configs_to_merge.append(patient_config)
logger.info(f"Applied patient config: config/patients/{patient_id}.yaml")
else:
logger.info(
f"No patient config for '{patient_id}' "
f"(config/patients/{patient_id}.yaml not found); "
f"using base/protocol config values."
)
# Merge all configs
merged_dict = merge_configs(*configs_to_merge)
# 5. Apply runtime overrides
if overrides:
merged_dict = merge_configs(merged_dict, dict(overrides))
logger.debug(f"Applied {len(overrides)} runtime overrides")
# Add patient_id and protocol to merged config
if patient_id:
merged_dict["patient_id"] = patient_id
if protocol:
merged_dict["protocol"] = protocol
# Validate with Pydantic
try:
config = PipelineConfig.from_dict(merged_dict)
except PydanticValidationError as e:
logger.error(f"Configuration validation failed: {e}")
raise ConfigurationError(f"Invalid configuration: {e}") from e
# Cache if no runtime overrides
if not overrides:
self._config_cache[cache_key] = config
logger.info(f"Configuration loaded successfully: {config_name}")
return config
def _load_config_file(
self, name: str, subdir: Optional[str] = None, required: bool = False
) -> Optional[ConfigDict]:
"""
Load a single config file.
Args:
name: Config file name (without .yaml extension)
subdir: Optional subdirectory within config_dir
required: If True, raise ``ConfigurationError`` when the file is
missing instead of silently returning ``None``. Use for
explicitly user-requested configs (e.g. ``-c`` / ``--protocol``)
where a missing file almost always means a typo.
Returns:
Config dictionary or None if file doesn't exist (and not required)
Raises:
ConfigurationError: If ``required`` is True and no matching
``.yaml``/``.yml`` file exists.
"""
# Try both .yaml and .yml extensions
base_dir = self.config_dir / subdir if subdir else self.config_dir
config_path = base_dir / f"{name}.yaml"
if not config_path.exists():
config_path = base_dir / f"{name}.yml"
if not config_path.exists():
if required:
label = f"{subdir.rstrip('s')} config" if subdir else "config"
available = self.list_configs(subdir=subdir)
hint = (
f" Available {label}s: {', '.join(available)}."
if available
else f" No {label}s found in {base_dir}/."
)
raise ConfigurationError(
f"Requested {label} '{name}' not found "
f"(looked for {base_dir / name}.yaml/.yml).{hint}"
)
logger.debug(f"Config file not found: {base_dir / name}.yaml/.yml")
return None
try:
config = load_yaml(config_path)
logger.debug(f"Loaded config file: {config_path}")
return config
except Exception as e:
logger.error(f"Error loading config {config_path}: {e}")
raise
[docs]
def load_config_dict(self, name: str, subdir: Optional[str] = None) -> Optional[ConfigDict]:
"""Load a raw config dictionary without validation.
Args:
name: Config file name without the extension.
subdir: Optional config subdirectory.
Returns:
Raw config dictionary, or ``None`` when the file does not exist.
"""
return self._load_config_file(name, subdir=subdir)
[docs]
def save_config(
self, config: Union[PipelineConfig, ConfigDict], name: str, subdir: Optional[str] = None
) -> Path:
"""
Save a configuration to a YAML file.
Args:
config: Configuration to save (PipelineConfig or dict)
name: Name for the config file
subdir: Optional subdirectory
Returns:
Path to saved file
"""
if subdir:
config_path = self.config_dir / subdir / f"{name}.yaml"
else:
config_path = self.config_dir / f"{name}.yaml"
config_path.parent.mkdir(parents=True, exist_ok=True)
# Convert PipelineConfig to dict if needed
if isinstance(config, PipelineConfig):
config_dict = config.to_dict()
else:
config_dict = config
save_yaml(config_dict, config_path)
logger.info(f"Config saved to: {config_path}")
return config_path
[docs]
def list_configs(self, subdir: Optional[str] = None) -> list[str]:
"""
List available config files.
Args:
subdir: Optional subdirectory to search
Returns:
List of config names (without .yaml extension)
"""
if subdir:
search_dir = self.config_dir / subdir
else:
search_dir = self.config_dir
if not search_dir.exists():
return []
configs = {
f.stem for f in search_dir.iterdir() if f.is_file() and f.suffix in (".yaml", ".yml")
}
return sorted(configs)
[docs]
def get_config_dir(self, subdir: Optional[str] = None) -> Path:
"""
Get the path to a config directory.
Args:
subdir: Optional subdirectory
Returns:
Path object
"""
if subdir:
return self.config_dir / subdir
return self.config_dir
[docs]
def clear_cache(self) -> None:
"""Clear the configuration cache."""
self._config_cache.clear()
logger.debug("Config cache cleared")