Core Package#
The thesis.core package provides the fundamental building blocks for the
dMRI pipeline: the @workflow / @verify decorator API, declarative path
types, workflow I/O contract helpers, configuration management, processing
context, workflow registry, Nipype integration, and I/O utilities.
thesis.core#
Core infrastructure for the thesis framework.
Provides the workflow registry, @workflow decorator API, declarative path
types, hierarchical configuration loading, processing context, structured
output, Nipype execution bridge, logging, and I/O utilities. Workflow
implementations depend only on this package — there are no cross-workflow
imports.
- class thesis.core.ConfigManager[source]
Bases:
objectManages configuration loading, merging, and validation.
Supports hierarchical configuration with defaults, environment-specific overrides, and patient-specific configs. Uses Pydantic for validation and type safety.
Config hierarchy (later configs override earlier): 1. Default config 2. Hardware config 3. Protocol config 4. Patient-specific config 5. Runtime overrides
Example
>>> manager = ConfigManager(config_dir="./config") >>> config = manager.load_config("default") >>> config = manager.add_overrides(config, patient_id="DTI_LDF001")
- __init__(config_dir=None)[source]
Initialize the configuration manager.
- load_config(config_name='default', patient_id=None, protocol=None, overrides=None, protocol_required=False)[source]
Load a configuration with optional overrides.
- Parameters:
config_name (
str) – Name of the base config to loadpatient_id (
Optional[str]) – Optional patient ID for patient-specific configprotocol (
Optional[str]) – Optional protocol name for protocol-specific configoverrides (
Optional[Mapping[str,object]]) – Optional dictionary of runtime overridesprotocol_required (
bool) – If True, raiseConfigurationErrorwhenprotocolis given but no matching file exists. Set this when the protocol was explicitly requested by the user (e.g. via--protocol) rather than supplied as a workflow’sdefault_protocolfallback.
- Return type:
- Returns:
Validated PipelineConfig object
- Raises:
ConfigurationError – If the base
config_namefile is missing, or ifprotocol_requiredis True and the protocol file is missing.
Example
>>> config = manager.load_config( ... config_name="default", ... patient_id="DTI_LDF001", ... overrides={"preprocessing": {"threads": 8}} ... )
- load_config_dict(name, subdir=None)[source]
Load a raw config dictionary without validation.
- save_config(config, name, subdir=None)[source]
Save a configuration to a YAML file.
- list_configs(subdir=None)[source]
List available config files.
- get_config_dir(subdir=None)[source]
Get the path to a config directory.
- class thesis.core.ProcessingContext[source]
Bases:
objectContext object for medical imaging processing pipelines.
Carries all relevant information about a processing run, including patient ID, data paths, configuration, and intermediate results.
- Variables:
patient_id – Unique patient identifier
config – Configuration for this processing run
data_dir – Shared assets base (config
paths.assets_dir): templates, atlases, ROIs. Not a parent of input_dir/output_dir.input_dir – Per-patient input data directory (
inputs_dir/<patient_id>)output_dir – Per-patient output directory (
output_dir/<patient_id>)working_dir – Temporary working/scratch directory
metadata – Additional metadata
results – Dictionary to store intermediate results
Example
>>> ctx = ProcessingContext( ... patient_id="DTI_LDF001", ... config=my_config, ... data_dir=Path("./data") ... ) >>> ctx.add_result("registration", transform_matrix) >>> transform = ctx.get_result("registration")
- Parameters:
- patient_id: str
- config: PipelineConfig
- data_dir: Path
- add_result(key, value)[source]
Store a processing result.
Example
>>> ctx.add_result("brain_mask", mask_path)
- get_result(key, default=None)[source]
Retrieve a processing result.
- Parameters:
- Return type:
- Returns:
Result data or default
Example
>>> mask_path = ctx.get_result("brain_mask")
- has_result(key)[source]
Check if a result exists.
- add_metadata(key, value)[source]
Add metadata to the context.
Example
>>> ctx.add_metadata("acquisition_date", "2025-01-15")
- get_metadata(key, default=None)[source]
Retrieve metadata.
- get_input_path(filename)[source]
Get full path for an input file.
- Parameters:
filename (
str) – Input filename- Return type:
- Returns:
Full path to input file
- Raises:
ValueError – If a relative filename escapes input_dir.
Example
>>> t1_path = ctx.get_input_path("DTI_LDF001_T1.nii.gz")
- get_output_path(filename, subdir=None)[source]
Get full path for an output file.
- Parameters:
- Return type:
- Returns:
Full path to output file
- Raises:
ValueError – If a relative path escapes output_dir.
Example
>>> reg_path = ctx.get_output_path("T1_registered.nii.gz", "registration")
- get_working_path(filename)[source]
Get full path for a temporary working file.
Example
>>> temp_path = ctx.get_working_path("temp_mask.nii.gz")
- cleanup_working_dir()[source]
Remove all files from the working directory.
Use with caution - this deletes temporary files.
- Return type:
- list_input_files(pattern='*')[source]
List files in the input directory.
- Parameters:
pattern (
str) – Glob pattern for matching files- Return type:
- Returns:
List of matching file paths
Example
>>> nii_files = ctx.list_input_files("*.nii.gz")
- list_output_files(pattern='*')[source]
List files in the output directory.
- to_dict()[source]
Convert context to dictionary.
- __init__(patient_id, config, data_dir, input_dir=None, output_dir=None, working_dir=None, metadata=<factory>, results=<factory>)
- thesis.core.create_context(patient_id, config, data_dir=None, **kwargs)[source]
Factory function to create a processing context.
- Parameters:
patient_id (
str) – Patient identifierconfig (
PipelineConfig) – Configuration objectdata_dir (
Optional[Path]) – Shared assets base (reads configpaths.assets_dirif not provided)**kwargs – Additional arguments passed to ProcessingContext
- Return type:
- Returns:
Initialized ProcessingContext
Example
>>> config = load_config("default") >>> ctx = create_context("DTI_LDF001", config)
- thesis.core.to_path(v)[source]
- Overloads:
v (None) → None
v (Union[str, Path]) → Path
- Parameters:
- Return type:
Path | None
Convert a string or Path to a Path object, expanding ~ and environment variables.
- thesis.core.resolve_path(base, value)[source]
Resolve a path relative to a base directory, expanding ~ and environment variables.
- thesis.core.write_list_file(paths, out_file)[source]
Write a list of paths to a text file, one per line.
- thesis.core.get_logger(name=None)[source]
Get a logger instance for a module.
This function returns the global loguru logger with context about the calling module. The logger is automatically configured on first use.
- Parameters:
name (
Optional[str]) – Name of the module/component (typically __name__)- Return type:
- Returns:
Logger instance with module context
Example
>>> logger = get_logger(__name__) >>> logger.info("Processing image")
- thesis.core.setup_logging(log_dir=None, log_level='INFO', console_output=True, file_output=True, rotation='10 MB', retention='7 days', compression='zip')[source]
Configure the logging system for the application.
- Parameters:
log_dir (
Union[str,Path,None]) – Directory for log files. If None, uses ‘./logs’log_level (
str) – Minimum log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)console_output (
bool) – Whether to output logs to consolefile_output (
bool) – Whether to output logs to filerotation (
str) – When to rotate log files (size or time-based)retention (
str) – How long to keep old log filescompression (
str) – Compression format for rotated logs
- Return type:
Example
>>> setup_logging(log_dir="./logs", log_level="DEBUG") >>> logger.info("Logging configured")
- thesis.core.reset_logging()[source]
Reset the logging system (primarily for testing).
Removes all handlers and resets the initialization flag.
- Return type:
- class thesis.core.WorkflowEntry[source]
Bases:
objectMetadata and factory for a registered workflow.
- Variables:
name – Short identifier used on the CLI (e.g.
"hcp").factory – Callable that accepts
(config, context)and returns a NipypeWorkflow.verifier – Optional callable
(config, context) -> List[str]that performs pre-run requirement checks. An empty list means all clear.description – Human-readable description shown by
thesis list-workflows.default_protocol – Protocol name used when neither the CLI
--protocolflag nor a per-patient config supplies one.default_config – Config name to use when no
-cflag is provided (Nonefalls back to the CLI default,"default").
- Parameters:
name (
str)factory (
Callable[[PipelineConfig,ProcessingContext],object])verifier (
Optional[Callable[[PipelineConfig,ProcessingContext],list[str]]])description (
str)is_cohort_level (
bool)
- name: str
- factory: Callable[[PipelineConfig, ProcessingContext], object]
- verifier: Callable[[PipelineConfig, ProcessingContext], list[str]] | None = None
- description: str = ''
- is_cohort_level: bool = False
- __init__(name, factory, verifier=None, description='', default_protocol=None, default_config=None, is_cohort_level=False)
- Parameters:
name (
str)factory (
Callable[[PipelineConfig,ProcessingContext],object])verifier (
Optional[Callable[[PipelineConfig,ProcessingContext],list[str]]])description (
str)is_cohort_level (
bool)
- Return type:
None
- class thesis.core.WorkflowRegistry[source]
Bases:
objectRegistry for
WorkflowEntryobjects.Each workflow self-registers at module import time. The recommended path is the
workflow()decorator, which builds aWorkflowEntryand callsregister()for you:from thesis.core.decorators import workflow @workflow(name="hcp", description="HCP tractography workflow.", protocol="hcp") def build_workflow(*, config, context): ...
Direct registration via
register()is still supported for tests or introspection tooling:>>> WORKFLOW_REGISTRY.register(WorkflowEntry( ... name="hcp", ... factory=build_workflow, ... description="HCP tractography workflow.", ... default_protocol="hcp", ... )) >>> entry = WORKFLOW_REGISTRY.get("hcp")
- __init__()[source]
- Return type:
None
- register(entry)[source]
Register a workflow entry.
- Parameters:
entry (
WorkflowEntry) – TheWorkflowEntryto register.- Return type:
- get(name)[source]
Return the entry for name.
- all_entries()[source]
Return all entries sorted by name.
- Return type:
- exception thesis.core.ThesisError[source]
Bases:
ExceptionBase exception for all thesis framework errors.
- exception thesis.core.ConfigurationError[source]
Bases:
ThesisErrorRaised when configuration is invalid or missing.
Examples
Missing required configuration file
Invalid configuration values
Configuration validation failure
- exception thesis.core.ValidationError[source]
Bases:
ThesisErrorRaised when input validation fails.
Examples
Invalid image dimensions
Missing required files
Invalid parameter values
- exception thesis.core.ProcessingError[source]
Bases:
ThesisErrorRaised when processing operation fails.
Examples
Algorithm failure
Computation error
Unexpected processing result
- exception thesis.core.RegistrationError[source]
Bases:
ProcessingErrorRaised when image registration fails.
- exception thesis.core.SegmentationError[source]
Bases:
ProcessingErrorRaised when image segmentation fails.
- exception thesis.core.TractographyError[source]
Bases:
ProcessingErrorRaised when tractography processing fails.
- exception thesis.core.FileIOError[source]
Bases:
ThesisErrorRaised when I/O operation fails.
Examples
Unable to read file
Unable to write file
Corrupted data
- exception thesis.core.DependencyError[source]
Bases:
ThesisErrorRaised when external dependency is missing or incompatible.
Examples
FSL not installed
ANTs not found
Incompatible library version
- exception thesis.core.PipelineError[source]
Bases:
ProcessingErrorRaised when a pipeline step produces an invalid or unrecoverable result.
Examples
Warped ROI mask is entirely empty
Pipeline invariant violated during execution
- class thesis.core.NipypeExecutor[source]
Bases:
objectExecutes Nipype workflows within the thesis framework.
This executor bridges your context system with Nipype’s workflow engine, managing: - Workflow base directory and working directories - Plugin selection and configuration - Logging and error handling
Workflow outputs are written by individual nodes to their configured
output_dirpaths underworkflow.base_dir; downstream code reads them back from the filesystem rather than from a returned dict.Example
>>> from thesis.core import ConfigManager, create_context >>> from nipype import Workflow >>> >>> config = ConfigManager().load_config("default", patient_id="patient_001") >>> context = create_context("patient_001", config, Path("./data")) >>> workflow = Workflow(name="example") >>> >>> executor = NipypeExecutor(context, config.nipype) >>> executor.execute(workflow)
- Parameters:
context (
ProcessingContext)config (
Union[Dict[str,Any],NipypeConfig,None])
- __init__(context, config=None)[source]
Initialize the executor.
- Parameters:
context (
ProcessingContext) – ProcessingContext from your frameworkconfig (
Union[Dict[str,Any],NipypeConfig,None]) – Nipype configuration (dict or NipypeConfig instance)
- execute(workflow, *, event_bus=None, progress=None)[source]
Execute a Nipype workflow.
Workflow outputs are written to disk under
workflow.base_dir(and whateveroutput_dirpaths individual nodes were configured with); downstream code reads them from the filesystem rather than from a returned dict.- Parameters:
workflow (
Workflow) – nipype.pipeline.engine.Workflow instanceevent_bus (
Optional[EventBus]) – Optional event bus for emitting execution events.progress (
Optional[NodeProgressProtocol]) – Optional workflow progress tracker.
- Return type:
- get_working_directory()[source]
Get the workflow working directory.
- Return type:
- Returns:
Path to the workflow working directory
- class thesis.core.NipypeConfig[source]
Bases:
BaseConfigConfiguration for Nipype workflow execution.
- Variables:
working_dir – Base working directory.
crash_dir – Optional crash dump directory.
plugin – Nipype execution plugin.
plugin_args – Plugin arguments passed to
workflow.run.stop_on_first_crash – Whether execution stops after the first crash.
remove_unnecessary_outputs – Whether intermediate outputs are cleaned.
keep_inputs – Whether node input files remain in the working directory.
hash_method – Caching hash strategy.
use_profiler – Whether the Nipype profiler is enabled.
- Parameters:
data (
Any)
- working_dir: Path
- plugin: str
- stop_on_first_crash: bool
- remove_unnecessary_outputs: bool
- keep_inputs: bool
- hash_method: str
- use_profiler: bool
- classmethod validate_plugin(v)[source]
Validate the Nipype execution plugin against the supported allow-list.
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- thesis.core.run_workflow(workflow, context, config=None, *, event_bus=None, progress=None)[source]
Convenience helper to execute a Nipype workflow with the thesis context.
- Parameters:
workflow (
Workflow) – nipype.pipeline.engine.Workflow instancecontext (
ProcessingContext) – ProcessingContextconfig (
Union[Dict[str,Any],NipypeConfig,None]) – Optional NipypeConfig or dict overridesprogress (
Optional[NodeProgressProtocol])
- Return type:
- class thesis.core.Event[source]
Bases:
objectA structured output event emitted during pipeline execution.
- Variables:
message – Human-readable event description.
level – Importance level controlling visibility.
timestamp – Unix timestamp when the event was created.
category – Optional grouping tag (e.g.
"workflow","tool","file","test").patient_id – Patient this event relates to, if any.
metadata – Arbitrary key-value data attached to the event.
- Parameters:
- message: str
- level: EventLevel = 20
- timestamp: float
- category: str = ''
- patient_id: str = ''
- class thesis.core.EventBus[source]
Bases:
objectCentral event pipeline that decouples emission from rendering.
Listeners subscribe to receive events and can filter by minimum level. The bus is thread-safe: events can be emitted from worker threads while the main thread renders output.
Example
>>> bus = EventBus() >>> bus.subscribe(lambda e: print(e.message)) >>> bus.emit("hello")
- Parameters:
max_events (
int)
- subscribe(listener)[source]
Register a listener that will receive all emitted events.
- unsubscribe(listener)[source]
Remove a previously registered listener.
- emit(message, *, level=EventLevel.INFO, category='', patient_id='', metadata=None)[source]
Create and dispatch an event to all listeners.
- Parameters:
- Return type:
- Returns:
The created
Eventinstance.
- emit_event(event)[source]
Dispatch a pre-built event to all listeners.
- get_events(min_level=EventLevel.DEBUG, category=None, patient_id=None)[source]
Return stored events matching the given filters.
- Parameters:
min_level (
EventLevel) – Minimum importance level to include.category (
Optional[str]) – If set, only return events with this category.patient_id (
Optional[str]) – If set, only return events for this patient.
- Return type:
- Returns:
List of matching events in chronological order.
- property event_count: int
Number of stored events.
- class thesis.core.EventLevel[source]
Bases:
IntEnumEvent importance levels, ordered from most to least critical.
ERROR: Failures that stop processing.
WARNING: Issues that may affect outcome but do not stop processing.
IMPORTANT: Key milestones – tool starts/completions, file changes, build/test results, skipped actions.
INFO: Detailed informational messages (full tool logs, timing, etc.).
DEBUG: Internal implementation details.
Default output mode shows ERROR, WARNING, IMPORTANT. Verbose mode shows everything. Quiet mode shows ERROR only plus the final result.
- ERROR = 50
- WARNING = 40
- IMPORTANT = 30
- INFO = 20
- DEBUG = 10
- __new__(value)
- thesis.core.get_event_bus()[source]
Return the process-wide
EventBussingleton.The bus is created on first call and reused thereafter.
- Return type:
- Returns:
The global EventBus instance.
- class thesis.core.OutputMode[source]
-
Verbosity level for console output.
QUIET: errors and final result only.
NORMAL: concise, human-friendly progress and key events.
VERBOSE: full informational logs, timing, debug context.
- QUIET = 'quiet'
- NORMAL = 'normal'
- VERBOSE = 'verbose'
- __new__(value)
- class thesis.core.OutputConfig[source]
Bases:
BaseModelConfiguration for the output system.
Controls verbosity, summary detail, progress display, and format. CLI flags override values set here.
- Variables:
mode – Verbosity level.
summary – Summary detail level.
progress – Whether to show progress bars/spinners.
Nonemeans auto-detect (enabled for TTY, disabled otherwise).output_format – Serialization format.
- Parameters:
data (
Any)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- mode: OutputMode
- summary: SummaryDetail
- output_format: OutputFormat
- property min_event_level: EventLevel
Minimum event level to display in the current mode.
- property show_progress: bool
Whether animated progress UI should be shown.
Progress remains enabled across output modes unless the user explicitly disables it. The CLI and logging layers are responsible for routing non-progress output in a progress-safe way.
- property is_verbose: bool
Shorthand for checking verbose mode.
- property is_quiet: bool
Shorthand for checking quiet mode.
- property is_json: bool
Shorthand for checking JSON output format.
- class thesis.core.SummaryDetail[source]
-
Level of detail in the end-of-run summary.
OFF: no summary printed.
COMPACT: one-line status + 3-6 key bullets.
FULL: expanded summary with all metadata.
- OFF = 'off'
- COMPACT = 'compact'
- FULL = 'full'
- __new__(value)
- class thesis.core.RunStatus[source]
-
Final status of a single patient run.
SUCCESS: completed without errors.
PARTIAL: some steps completed, others failed or were skipped.
FAILED: processing failed with an error.
BLOCKED: could not start (e.g. preflight check failure, missing data).
SKIPPED: intentionally not run (e.g. dry run, user cancellation).
- SUCCESS = 'success'
- PARTIAL = 'partial'
- FAILED = 'failed'
- BLOCKED = 'blocked'
- SKIPPED = 'skipped'
- __new__(value)
- class thesis.core.RunResult[source]
Bases:
BaseModelDetailed result for a single patient run.
Populated incrementally during execution and used to build the final
RunSummary.- Variables:
patient_id – Patient identifier.
status – Final run status.
elapsed_seconds – Wall-clock duration in seconds.
workflow – Workflow name that was executed.
error_message – Primary failure reason, if any.
error_type – Exception class name, if any.
error_history – Per-attempt failure messages in order (one entry per failed retry attempt), used to show error progression across retries.
warnings – Warning messages collected during the run.
steps_completed – Names of successfully completed steps.
steps_failed – Names of steps that failed.
files_changed – Paths of files created or modified.
commands_run – External commands executed (e.g. FSL, ANTs).
retry_count – Number of retries attempted.
suggested_next_step – Actionable suggestion for the user.
metadata – Arbitrary extra data.
- Parameters:
data (
Any)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- patient_id: str
- status: RunStatus
- elapsed_seconds: float
- workflow: str
- error_message: str
- error_type: str
- retry_count: int
- suggested_next_step: str
- class thesis.core.RunSummary[source]
Bases:
BaseModelStructured summary of a single patient run.
Built from a
RunResultafter execution completes. Contains the information needed to render the compact footer shown to the user.- Variables:
patient_id – Patient identifier.
workflow – Workflow name.
status – Final outcome.
elapsed_seconds – Wall-clock duration.
headline – One-line status headline (e.g.
"SUCCESS hcp P001 2m 13s").bullets – Key summary bullets (3-6 items).
next_step – Optional suggested next action.
result – Full result data for verbose/JSON output.
- Parameters:
data (
Any)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- patient_id: str
- workflow: str
- status: RunStatus
- elapsed_seconds: float
- headline: str
- next_step: str
- class thesis.core.BatchSummary[source]
Bases:
BaseModelAggregated summary for a batch of patient runs.
Collects child
RunSummaryobjects and computes aggregate statistics for the batch-level footer.- Variables:
workflow – Workflow name.
total – Total number of patient runs.
succeeded – Count of successful runs.
partial – Count of partially successful runs.
failed – Count of failed runs.
blocked – Count of blocked runs.
skipped – Count of skipped runs.
total_elapsed_seconds – Total wall-clock time for the batch.
avg_elapsed_seconds – Average per-patient duration.
retries – Total retries across all patients.
run_summaries – Per-patient summaries in execution order.
failure_reasons – Grouped failure reasons with patient lists.
suggested_follow_up – Actionable follow-up suggestion.
- Parameters:
data (
Any)
- model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- workflow: str
- total: int
- succeeded: int
- partial: int
- failed: int
- blocked: int
- skipped: int
- total_elapsed_seconds: float
- avg_elapsed_seconds: float
- retries: int
- run_summaries: List[RunSummary]
- suggested_follow_up: str
- classmethod from_results(results, workflow='', batch_elapsed=0.0)[source]
Build a batch summary from individual run results.
- Parameters:
- Return type:
- Returns:
Populated BatchSummary.
- class thesis.core.OutputRenderer[source]
Bases:
objectRenders structured output to the terminal or as JSON.
Connects to the
EventBusand prints events that pass the verbosity filter. Also provides methods for rendering run and batch summaries.- Parameters:
- __init__(config=None, event_bus=None, file=None)[source]
- attach(bus=None)[source]
Subscribe to an event bus to render events in real time.
- render_event(event)[source]
Render a single event to the output stream.
- render_run_summary(summary)[source]
Render a single-run summary footer.
- Parameters:
summary (
RunSummary) – The run summary to render.- Return type:
- render_batch_summary(batch)[source]
Render a batch summary footer.
- Parameters:
batch (
BatchSummary) – The batch summary to render.- Return type: