Core Package#

The thesis.core package provides the fundamental building blocks for the dMRI pipeline: the @workflow / @verify decorator API, declarative path types, workflow I/O contract helpers, configuration management, processing context, workflow registry, Nipype integration, and I/O utilities.

thesis.core#

Core infrastructure for the thesis framework.

Provides the workflow registry, @workflow decorator API, declarative path types, hierarchical configuration loading, processing context, structured output, Nipype execution bridge, logging, and I/O utilities. Workflow implementations depend only on this package — there are no cross-workflow imports.

class thesis.core.ConfigManager[source]

Bases: object

Manages configuration loading, merging, and validation.

Supports hierarchical configuration with defaults, environment-specific overrides, and patient-specific configs. Uses Pydantic for validation and type safety.

Config hierarchy (later configs override earlier): 1. Default config 2. Hardware config 3. Protocol config 4. Patient-specific config 5. Runtime overrides

Example

>>> manager = ConfigManager(config_dir="./config")
>>> config = manager.load_config("default")
>>> config = manager.add_overrides(config, patient_id="DTI_LDF001")
Parameters:

config_dir (Union[str, Path, None])

__init__(config_dir=None)[source]

Initialize the configuration manager.

Parameters:

config_dir (Union[str, Path, None]) – Base directory for config files. If None, uses ‘./config’

load_config(config_name='default', patient_id=None, protocol=None, overrides=None, protocol_required=False)[source]

Load a configuration with optional overrides.

Parameters:
  • config_name (str) – Name of the base config to load

  • patient_id (Optional[str]) – Optional patient ID for patient-specific config

  • protocol (Optional[str]) – Optional protocol name for protocol-specific config

  • overrides (Optional[Mapping[str, object]]) – Optional dictionary of runtime overrides

  • protocol_required (bool) – If True, raise ConfigurationError when protocol is given but no matching file exists. Set this when the protocol was explicitly requested by the user (e.g. via --protocol) rather than supplied as a workflow’s default_protocol fallback.

Return type:

PipelineConfig

Returns:

Validated PipelineConfig object

Raises:

ConfigurationError – If the base config_name file is missing, or if protocol_required is True and the protocol file is missing.

Example

>>> config = manager.load_config(
...     config_name="default",
...     patient_id="DTI_LDF001",
...     overrides={"preprocessing": {"threads": 8}}
... )
load_config_dict(name, subdir=None)[source]

Load a raw config dictionary without validation.

Parameters:
  • name (str) – Config file name without the extension.

  • subdir (Optional[str]) – Optional config subdirectory.

Return type:

Optional[dict[str, object]]

Returns:

Raw config dictionary, or None when the file does not exist.

save_config(config, name, subdir=None)[source]

Save a configuration to a YAML file.

Parameters:
Return type:

Path

Returns:

Path to saved file

list_configs(subdir=None)[source]

List available config files.

Parameters:

subdir (Optional[str]) – Optional subdirectory to search

Return type:

list[str]

Returns:

List of config names (without .yaml extension)

get_config_dir(subdir=None)[source]

Get the path to a config directory.

Parameters:

subdir (Optional[str]) – Optional subdirectory

Return type:

Path

Returns:

Path object

clear_cache()[source]

Clear the configuration cache.

Return type:

None

class thesis.core.ProcessingContext[source]

Bases: object

Context object for medical imaging processing pipelines.

Carries all relevant information about a processing run, including patient ID, data paths, configuration, and intermediate results.

Variables:
  • patient_id – Unique patient identifier

  • config – Configuration for this processing run

  • data_dir – Shared assets base (config paths.assets_dir): templates, atlases, ROIs. Not a parent of input_dir/output_dir.

  • input_dir – Per-patient input data directory (inputs_dir/<patient_id>)

  • output_dir – Per-patient output directory (output_dir/<patient_id>)

  • working_dir – Temporary working/scratch directory

  • metadata – Additional metadata

  • results – Dictionary to store intermediate results

Example

>>> ctx = ProcessingContext(
...     patient_id="DTI_LDF001",
...     config=my_config,
...     data_dir=Path("./data")
... )
>>> ctx.add_result("registration", transform_matrix)
>>> transform = ctx.get_result("registration")
Parameters:
patient_id: str
config: PipelineConfig
data_dir: Path
input_dir: Path | None = None
output_dir: Path | None = None
working_dir: Path | None = None
metadata: Dict[str, Any]
results: Dict[str, Any]
add_result(key, value)[source]

Store a processing result.

Parameters:
  • key (str) – Result identifier

  • value (Any) – Result data

Return type:

None

Example

>>> ctx.add_result("brain_mask", mask_path)
get_result(key, default=None)[source]

Retrieve a processing result.

Parameters:
  • key (str) – Result identifier

  • default (Any) – Default value if key not found

Return type:

Any

Returns:

Result data or default

Example

>>> mask_path = ctx.get_result("brain_mask")
has_result(key)[source]

Check if a result exists.

Parameters:

key (str) – Result identifier

Return type:

bool

Returns:

True if result exists, False otherwise

add_metadata(key, value)[source]

Add metadata to the context.

Parameters:
  • key (str) – Metadata key

  • value (Any) – Metadata value

Return type:

None

Example

>>> ctx.add_metadata("acquisition_date", "2025-01-15")
get_metadata(key, default=None)[source]

Retrieve metadata.

Parameters:
  • key (str) – Metadata key

  • default (Any) – Default value if key not found

Return type:

Any

Returns:

Metadata value or default

get_input_path(filename)[source]

Get full path for an input file.

Parameters:

filename (str) – Input filename

Return type:

Path

Returns:

Full path to input file

Raises:

ValueError – If a relative filename escapes input_dir.

Example

>>> t1_path = ctx.get_input_path("DTI_LDF001_T1.nii.gz")
get_output_path(filename, subdir=None)[source]

Get full path for an output file.

Parameters:
  • filename (str) – Output filename

  • subdir (Optional[str]) – Optional subdirectory within output_dir

Return type:

Path

Returns:

Full path to output file

Raises:

ValueError – If a relative path escapes output_dir.

Example

>>> reg_path = ctx.get_output_path("T1_registered.nii.gz", "registration")
get_working_path(filename)[source]

Get full path for a temporary working file.

Parameters:

filename (str) – Working filename

Return type:

Path

Returns:

Full path to working file

Example

>>> temp_path = ctx.get_working_path("temp_mask.nii.gz")
cleanup_working_dir()[source]

Remove all files from the working directory.

Use with caution - this deletes temporary files.

Return type:

None

list_input_files(pattern='*')[source]

List files in the input directory.

Parameters:

pattern (str) – Glob pattern for matching files

Return type:

list[Path]

Returns:

List of matching file paths

Example

>>> nii_files = ctx.list_input_files("*.nii.gz")
list_output_files(pattern='*')[source]

List files in the output directory.

Parameters:

pattern (str) – Glob pattern for matching files

Return type:

list[Path]

Returns:

List of matching file paths

to_dict()[source]

Convert context to dictionary.

Return type:

Dict[str, Any]

Returns:

Dictionary representation of context

__init__(patient_id, config, data_dir, input_dir=None, output_dir=None, working_dir=None, metadata=<factory>, results=<factory>)
Parameters:
Return type:

None

thesis.core.create_context(patient_id, config, data_dir=None, **kwargs)[source]

Factory function to create a processing context.

Parameters:
  • patient_id (str) – Patient identifier

  • config (PipelineConfig) – Configuration object

  • data_dir (Optional[Path]) – Shared assets base (reads config paths.assets_dir if not provided)

  • **kwargs – Additional arguments passed to ProcessingContext

Return type:

ProcessingContext

Returns:

Initialized ProcessingContext

Example

>>> config = load_config("default")
>>> ctx = create_context("DTI_LDF001", config)
thesis.core.to_path(v)[source]
Overloads:
  • v (None) → None

  • v (Union[str, Path]) → Path

Parameters:

v (str | Path | None)

Return type:

Path | None

Convert a string or Path to a Path object, expanding ~ and environment variables.

Parameters:

v (Union[str, Path, None]) – Path value as string, Path, or None

Returns:

Expanded Path object, or None if input was None

Return type:

Path | None

thesis.core.resolve_path(base, value)[source]

Resolve a path relative to a base directory, expanding ~ and environment variables.

Parameters:
  • base (Path) – Base directory for relative paths

  • value (Union[Path, str]) – Path to resolve (absolute or relative)

Return type:

Path

Returns:

Resolved Path object. If value is absolute, returns it expanded. Otherwise, returns base / expanded_value.

thesis.core.write_list_file(paths, out_file)[source]

Write a list of paths to a text file, one per line.

Parameters:
  • paths (Iterable[Path]) – Iterable of Path objects to write

  • out_file (Path) – Output file path

Return type:

Path

Returns:

Path to the written file

thesis.core.get_logger(name=None)[source]

Get a logger instance for a module.

This function returns the global loguru logger with context about the calling module. The logger is automatically configured on first use.

Parameters:

name (Optional[str]) – Name of the module/component (typically __name__)

Return type:

Any

Returns:

Logger instance with module context

Example

>>> logger = get_logger(__name__)
>>> logger.info("Processing image")
thesis.core.setup_logging(log_dir=None, log_level='INFO', console_output=True, file_output=True, rotation='10 MB', retention='7 days', compression='zip')[source]

Configure the logging system for the application.

Parameters:
  • log_dir (Union[str, Path, None]) – Directory for log files. If None, uses ‘./logs’

  • log_level (str) – Minimum log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

  • console_output (bool) – Whether to output logs to console

  • file_output (bool) – Whether to output logs to file

  • rotation (str) – When to rotate log files (size or time-based)

  • retention (str) – How long to keep old log files

  • compression (str) – Compression format for rotated logs

Return type:

None

Example

>>> setup_logging(log_dir="./logs", log_level="DEBUG")
>>> logger.info("Logging configured")
thesis.core.reset_logging()[source]

Reset the logging system (primarily for testing).

Removes all handlers and resets the initialization flag.

Return type:

None

class thesis.core.WorkflowEntry[source]

Bases: object

Metadata and factory for a registered workflow.

Variables:
  • name – Short identifier used on the CLI (e.g. "hcp").

  • factory – Callable that accepts (config, context) and returns a Nipype Workflow.

  • verifier – Optional callable (config, context) -> List[str] that performs pre-run requirement checks. An empty list means all clear.

  • description – Human-readable description shown by thesis list-workflows.

  • default_protocol – Protocol name used when neither the CLI --protocol flag nor a per-patient config supplies one.

  • default_config – Config name to use when no -c flag is provided (None falls back to the CLI default, "default").

Parameters:
name: str
factory: Callable[[PipelineConfig, ProcessingContext], object]
verifier: Callable[[PipelineConfig, ProcessingContext], list[str]] | None = None
description: str = ''
default_protocol: str | None = None
default_config: str | None = None
is_cohort_level: bool = False
__init__(name, factory, verifier=None, description='', default_protocol=None, default_config=None, is_cohort_level=False)
Parameters:
Return type:

None

class thesis.core.WorkflowRegistry[source]

Bases: object

Registry for WorkflowEntry objects.

Each workflow self-registers at module import time. The recommended path is the workflow() decorator, which builds a WorkflowEntry and calls register() for you:

from thesis.core.decorators import workflow

@workflow(name="hcp", description="HCP tractography workflow.", protocol="hcp")
def build_workflow(*, config, context):
    ...

Direct registration via register() is still supported for tests or introspection tooling:

>>> WORKFLOW_REGISTRY.register(WorkflowEntry(
...     name="hcp",
...     factory=build_workflow,
...     description="HCP tractography workflow.",
...     default_protocol="hcp",
... ))
>>> entry = WORKFLOW_REGISTRY.get("hcp")
__init__()[source]
Return type:

None

register(entry)[source]

Register a workflow entry.

Parameters:

entry (WorkflowEntry) – The WorkflowEntry to register.

Return type:

None

get(name)[source]

Return the entry for name.

Parameters:

name (str) – Registered workflow name.

Return type:

WorkflowEntry

Returns:

The matching WorkflowEntry.

Raises:

KeyError – If name is not registered, with a helpful message listing available workflows.

has(name)[source]

Return True if name is registered.

Parameters:

name (str)

Return type:

bool

list()[source]

Return sorted list of registered workflow names.

Return type:

List[str]

all_entries()[source]

Return all entries sorted by name.

Return type:

List[WorkflowEntry]

exception thesis.core.ThesisError[source]

Bases: Exception

Base exception for all thesis framework errors.

exception thesis.core.ConfigurationError[source]

Bases: ThesisError

Raised when configuration is invalid or missing.

Examples

  • Missing required configuration file

  • Invalid configuration values

  • Configuration validation failure

exception thesis.core.ValidationError[source]

Bases: ThesisError

Raised when input validation fails.

Examples

  • Invalid image dimensions

  • Missing required files

  • Invalid parameter values

exception thesis.core.ProcessingError[source]

Bases: ThesisError

Raised when processing operation fails.

Examples

  • Algorithm failure

  • Computation error

  • Unexpected processing result

exception thesis.core.RegistrationError[source]

Bases: ProcessingError

Raised when image registration fails.

exception thesis.core.SegmentationError[source]

Bases: ProcessingError

Raised when image segmentation fails.

exception thesis.core.TractographyError[source]

Bases: ProcessingError

Raised when tractography processing fails.

exception thesis.core.FileIOError[source]

Bases: ThesisError

Raised when I/O operation fails.

Examples

  • Unable to read file

  • Unable to write file

  • Corrupted data

exception thesis.core.DependencyError[source]

Bases: ThesisError

Raised when external dependency is missing or incompatible.

Examples

  • FSL not installed

  • ANTs not found

  • Incompatible library version

exception thesis.core.PipelineError[source]

Bases: ProcessingError

Raised when a pipeline step produces an invalid or unrecoverable result.

Examples

  • Warped ROI mask is entirely empty

  • Pipeline invariant violated during execution

class thesis.core.NipypeExecutor[source]

Bases: object

Executes Nipype workflows within the thesis framework.

This executor bridges your context system with Nipype’s workflow engine, managing: - Workflow base directory and working directories - Plugin selection and configuration - Logging and error handling

Workflow outputs are written by individual nodes to their configured output_dir paths under workflow.base_dir; downstream code reads them back from the filesystem rather than from a returned dict.

Example

>>> from thesis.core import ConfigManager, create_context
>>> from nipype import Workflow
>>>
>>> config = ConfigManager().load_config("default", patient_id="patient_001")
>>> context = create_context("patient_001", config, Path("./data"))
>>> workflow = Workflow(name="example")
>>>
>>> executor = NipypeExecutor(context, config.nipype)
>>> executor.execute(workflow)
Parameters:
__init__(context, config=None)[source]

Initialize the executor.

Parameters:
execute(workflow, *, event_bus=None, progress=None)[source]

Execute a Nipype workflow.

Workflow outputs are written to disk under workflow.base_dir (and whatever output_dir paths individual nodes were configured with); downstream code reads them from the filesystem rather than from a returned dict.

Parameters:
  • workflow (Workflow) – nipype.pipeline.engine.Workflow instance

  • event_bus (Optional[EventBus]) – Optional event bus for emitting execution events.

  • progress (Optional[NodeProgressProtocol]) – Optional workflow progress tracker.

Return type:

None

get_working_directory()[source]

Get the workflow working directory.

Return type:

Path

Returns:

Path to the workflow working directory

cleanup(remove_intermediate=None)[source]

Clean up workflow intermediate files.

Parameters:

remove_intermediate (Optional[bool]) – Whether to remove intermediate outputs. If None, uses config setting.

Return type:

None

class thesis.core.NipypeConfig[source]

Bases: BaseConfig

Configuration for Nipype workflow execution.

Variables:
  • working_dir – Base working directory.

  • crash_dir – Optional crash dump directory.

  • plugin – Nipype execution plugin.

  • plugin_args – Plugin arguments passed to workflow.run.

  • stop_on_first_crash – Whether execution stops after the first crash.

  • remove_unnecessary_outputs – Whether intermediate outputs are cleaned.

  • keep_inputs – Whether node input files remain in the working directory.

  • hash_method – Caching hash strategy.

  • use_profiler – Whether the Nipype profiler is enabled.

Parameters:

data (Any)

working_dir: Path
crash_dir: Path | None
plugin: str
plugin_args: Dict[str, Any]
stop_on_first_crash: bool
remove_unnecessary_outputs: bool
keep_inputs: bool
hash_method: str
use_profiler: bool
classmethod validate_plugin(v)[source]

Validate the Nipype execution plugin against the supported allow-list.

Parameters:

v (str)

Return type:

str

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

thesis.core.run_workflow(workflow, context, config=None, *, event_bus=None, progress=None)[source]

Convenience helper to execute a Nipype workflow with the thesis context.

Parameters:
Return type:

None

class thesis.core.Event[source]

Bases: object

A structured output event emitted during pipeline execution.

Variables:
  • message – Human-readable event description.

  • level – Importance level controlling visibility.

  • timestamp – Unix timestamp when the event was created.

  • category – Optional grouping tag (e.g. "workflow", "tool", "file", "test").

  • patient_id – Patient this event relates to, if any.

  • metadata – Arbitrary key-value data attached to the event.

Parameters:
message: str
level: EventLevel = 20
timestamp: float
category: str = ''
patient_id: str = ''
metadata: Dict[str, Any]
__init__(message, level=EventLevel.INFO, timestamp=<factory>, category='', patient_id='', metadata=<factory>)
Parameters:
Return type:

None

class thesis.core.EventBus[source]

Bases: object

Central event pipeline that decouples emission from rendering.

Listeners subscribe to receive events and can filter by minimum level. The bus is thread-safe: events can be emitted from worker threads while the main thread renders output.

Example

>>> bus = EventBus()
>>> bus.subscribe(lambda e: print(e.message))
>>> bus.emit("hello")
Parameters:

max_events (int)

__init__(max_events=10000)[source]
Parameters:

max_events (int)

Return type:

None

subscribe(listener)[source]

Register a listener that will receive all emitted events.

Parameters:

listener (Callable[[Event], None]) – Callable that accepts an Event.

Return type:

None

unsubscribe(listener)[source]

Remove a previously registered listener.

Parameters:

listener (Callable[[Event], None]) – The listener to remove. No-op if not found.

Return type:

None

emit(message, *, level=EventLevel.INFO, category='', patient_id='', metadata=None)[source]

Create and dispatch an event to all listeners.

Parameters:
  • message (str) – Human-readable description of the event.

  • level (EventLevel) – Importance level.

  • category (str) – Optional grouping tag.

  • patient_id (str) – Patient identifier, if applicable.

  • metadata (Optional[Dict[str, Any]]) – Extra structured data.

Return type:

Event

Returns:

The created Event instance.

emit_event(event)[source]

Dispatch a pre-built event to all listeners.

Parameters:

event (Event) – Event instance to dispatch.

Return type:

None

get_events(min_level=EventLevel.DEBUG, category=None, patient_id=None)[source]

Return stored events matching the given filters.

Parameters:
  • min_level (EventLevel) – Minimum importance level to include.

  • category (Optional[str]) – If set, only return events with this category.

  • patient_id (Optional[str]) – If set, only return events for this patient.

Return type:

List[Event]

Returns:

List of matching events in chronological order.

clear()[source]

Remove all stored events.

Return type:

None

property event_count: int

Number of stored events.

class thesis.core.EventLevel[source]

Bases: IntEnum

Event importance levels, ordered from most to least critical.

  • ERROR: Failures that stop processing.

  • WARNING: Issues that may affect outcome but do not stop processing.

  • IMPORTANT: Key milestones – tool starts/completions, file changes, build/test results, skipped actions.

  • INFO: Detailed informational messages (full tool logs, timing, etc.).

  • DEBUG: Internal implementation details.

Default output mode shows ERROR, WARNING, IMPORTANT. Verbose mode shows everything. Quiet mode shows ERROR only plus the final result.

ERROR = 50
WARNING = 40
IMPORTANT = 30
INFO = 20
DEBUG = 10
__new__(value)
thesis.core.get_event_bus()[source]

Return the process-wide EventBus singleton.

The bus is created on first call and reused thereafter.

Return type:

EventBus

Returns:

The global EventBus instance.

class thesis.core.OutputMode[source]

Bases: str, Enum

Verbosity level for console output.

  • QUIET: errors and final result only.

  • NORMAL: concise, human-friendly progress and key events.

  • VERBOSE: full informational logs, timing, debug context.

QUIET = 'quiet'
NORMAL = 'normal'
VERBOSE = 'verbose'
__new__(value)
class thesis.core.OutputConfig[source]

Bases: BaseModel

Configuration for the output system.

Controls verbosity, summary detail, progress display, and format. CLI flags override values set here.

Variables:
  • mode – Verbosity level.

  • summary – Summary detail level.

  • progress – Whether to show progress bars/spinners. None means auto-detect (enabled for TTY, disabled otherwise).

  • output_format – Serialization format.

Parameters:

data (Any)

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

mode: OutputMode
summary: SummaryDetail
progress: bool | None
output_format: OutputFormat
property min_event_level: EventLevel

Minimum event level to display in the current mode.

property show_progress: bool

Whether animated progress UI should be shown.

Progress remains enabled across output modes unless the user explicitly disables it. The CLI and logging layers are responsible for routing non-progress output in a progress-safe way.

property is_verbose: bool

Shorthand for checking verbose mode.

property is_quiet: bool

Shorthand for checking quiet mode.

property is_json: bool

Shorthand for checking JSON output format.

class thesis.core.SummaryDetail[source]

Bases: str, Enum

Level of detail in the end-of-run summary.

  • OFF: no summary printed.

  • COMPACT: one-line status + 3-6 key bullets.

  • FULL: expanded summary with all metadata.

OFF = 'off'
COMPACT = 'compact'
FULL = 'full'
__new__(value)
class thesis.core.RunStatus[source]

Bases: str, Enum

Final status of a single patient run.

  • SUCCESS: completed without errors.

  • PARTIAL: some steps completed, others failed or were skipped.

  • FAILED: processing failed with an error.

  • BLOCKED: could not start (e.g. preflight check failure, missing data).

  • SKIPPED: intentionally not run (e.g. dry run, user cancellation).

SUCCESS = 'success'
PARTIAL = 'partial'
FAILED = 'failed'
BLOCKED = 'blocked'
SKIPPED = 'skipped'
__new__(value)
class thesis.core.RunResult[source]

Bases: BaseModel

Detailed result for a single patient run.

Populated incrementally during execution and used to build the final RunSummary.

Variables:
  • patient_id – Patient identifier.

  • status – Final run status.

  • elapsed_seconds – Wall-clock duration in seconds.

  • workflow – Workflow name that was executed.

  • error_message – Primary failure reason, if any.

  • error_type – Exception class name, if any.

  • error_history – Per-attempt failure messages in order (one entry per failed retry attempt), used to show error progression across retries.

  • warnings – Warning messages collected during the run.

  • steps_completed – Names of successfully completed steps.

  • steps_failed – Names of steps that failed.

  • files_changed – Paths of files created or modified.

  • commands_run – External commands executed (e.g. FSL, ANTs).

  • retry_count – Number of retries attempted.

  • suggested_next_step – Actionable suggestion for the user.

  • metadata – Arbitrary extra data.

Parameters:

data (Any)

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

patient_id: str
status: RunStatus
elapsed_seconds: float
workflow: str
error_message: str
error_type: str
error_history: List[str]
warnings: List[str]
steps_completed: List[str]
steps_failed: List[str]
files_changed: List[str]
commands_run: List[str]
retry_count: int
suggested_next_step: str
metadata: Dict[str, Any]
class thesis.core.RunSummary[source]

Bases: BaseModel

Structured summary of a single patient run.

Built from a RunResult after execution completes. Contains the information needed to render the compact footer shown to the user.

Variables:
  • patient_id – Patient identifier.

  • workflow – Workflow name.

  • status – Final outcome.

  • elapsed_seconds – Wall-clock duration.

  • headline – One-line status headline (e.g. "SUCCESS  hcp  P001  2m 13s").

  • bullets – Key summary bullets (3-6 items).

  • next_step – Optional suggested next action.

  • result – Full result data for verbose/JSON output.

Parameters:

data (Any)

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

patient_id: str
workflow: str
status: RunStatus
elapsed_seconds: float
headline: str
bullets: List[str]
next_step: str
result: RunResult | None
classmethod from_result(result)[source]

Build a summary from a completed run result.

Auto-generates the headline and bullet points from the result data.

Parameters:

result (RunResult) – Completed run result.

Return type:

RunSummary

Returns:

Populated RunSummary.

class thesis.core.BatchSummary[source]

Bases: BaseModel

Aggregated summary for a batch of patient runs.

Collects child RunSummary objects and computes aggregate statistics for the batch-level footer.

Variables:
  • workflow – Workflow name.

  • total – Total number of patient runs.

  • succeeded – Count of successful runs.

  • partial – Count of partially successful runs.

  • failed – Count of failed runs.

  • blocked – Count of blocked runs.

  • skipped – Count of skipped runs.

  • total_elapsed_seconds – Total wall-clock time for the batch.

  • avg_elapsed_seconds – Average per-patient duration.

  • retries – Total retries across all patients.

  • run_summaries – Per-patient summaries in execution order.

  • failure_reasons – Grouped failure reasons with patient lists.

  • suggested_follow_up – Actionable follow-up suggestion.

Parameters:

data (Any)

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

workflow: str
total: int
succeeded: int
partial: int
failed: int
blocked: int
skipped: int
total_elapsed_seconds: float
avg_elapsed_seconds: float
retries: int
run_summaries: List[RunSummary]
failure_reasons: Dict[str, List[str]]
suggested_follow_up: str
classmethod from_results(results, workflow='', batch_elapsed=0.0)[source]

Build a batch summary from individual run results.

Parameters:
  • results (List[RunResult]) – List of completed run results.

  • workflow (str) – Workflow name.

  • batch_elapsed (float) – Total wall-clock time for the batch.

Return type:

BatchSummary

Returns:

Populated BatchSummary.

class thesis.core.OutputRenderer[source]

Bases: object

Renders structured output to the terminal or as JSON.

Connects to the EventBus and prints events that pass the verbosity filter. Also provides methods for rendering run and batch summaries.

Parameters:
  • config (Optional[OutputConfig]) – Output configuration controlling verbosity and format.

  • event_bus (Optional[EventBus]) – Event bus to subscribe to. Uses the global singleton if not provided.

  • file (Optional[IO[str]]) – Output stream (default: stderr).

__init__(config=None, event_bus=None, file=None)[source]
Parameters:
Return type:

None

attach(bus=None)[source]

Subscribe to an event bus to render events in real time.

Parameters:

bus (Optional[EventBus]) – Event bus to subscribe to. If None, uses the global singleton.

Return type:

None

detach()[source]

Unsubscribe from the event bus.

Return type:

None

render_event(event)[source]

Render a single event to the output stream.

Parameters:

event (Event) – The event to render.

Return type:

None

render_run_summary(summary)[source]

Render a single-run summary footer.

Parameters:

summary (RunSummary) – The run summary to render.

Return type:

None

render_batch_summary(batch)[source]

Render a batch summary footer.

Parameters:

batch (BatchSummary) – The batch summary to render.

Return type:

None