Project Architecture Overview#
Package Structure#
thesis/
├── src/thesis/
│ ├── __init__.py # Package metadata (version, exports)
│ ├── __main__.py # Enables `python -m thesis`
│ ├── cli.py # Click CLI (run, list-*, show-config, info, stats collect)
│ ├── core/ # Shared infrastructure (no workflow imports allowed)
│ │ ├── decorators.py # @workflow / @requires / @produces / @verify decorators
│ │ ├── path_declarations.py # PatientFile, PatientDir, OutputDir, WorkingFile, CohortDir,
│ │ │ # PriorOutput, DataFile, DataDir, GlobMatch, GlobGroup,
│ │ │ # ConfigList, CohortPatients
│ │ ├── context.py # ProcessingContext dataclass (patient_id, config, paths, results)
│ │ ├── exceptions.py # ThesisError hierarchy
│ │ ├── io.py # NIfTI / bvals / bvecs I/O utilities
│ │ ├── gpu.py # GPU/CUDA detection (check_gpu, GPUStatus)
│ │ ├── registry.py # WorkflowRegistry, WorkflowEntry, WORKFLOW_REGISTRY
│ │ ├── config/
│ │ │ ├── manager.py # ConfigManager — hierarchical YAML loading
│ │ │ ├── loaders.py # YAML load/save/deep-merge helpers
│ │ │ └── validators.py # Pydantic v2 config models (PipelineConfig + 20+ sub-models)
│ │ ├── logging/
│ │ │ ├── __init__.py # setup_logging, get_logger, reset_logging,
│ │ │ │ # set_console_level, suppress_nipype_native_logging,
│ │ │ │ # suspend_console_logging
│ │ │ ├── formatters.py # Log format strings
│ │ │ └── handlers.py # InterceptHandler, PatientLogHandler, PerformanceLogger
│ │ ├── output/
│ │ │ ├── __init__.py # Public exports
│ │ │ ├── events.py # EventLevel, Event, EventBus (thread-safe singleton)
│ │ │ ├── modes.py # OutputMode, SummaryDetail, OutputFormat, OutputConfig
│ │ │ ├── summary.py # RunStatus, RunResult, RunSummary, BatchSummary
│ │ │ ├── progress.py # ProgressTracker (spinner), BatchProgress (tqdm)
│ │ │ └── renderer.py # OutputRenderer (filters events, renders summaries)
│ │ ├── nipype/
│ │ │ ├── executor.py # NipypeExecutor, run_workflow,
│ │ │ │ # apply_nipype_execution_config,
│ │ │ │ # build_nipype_status_callback, count_workflow_nodes
│ │ │ └── interfaces/
│ │ │ ├── fsl.py # ProbTrackX2 (stderr fix), ProbTrackX2GPU, factory
│ │ │ └── freesurfer.py # SynthSeg CommandLine wrapper for mri_synthseg
│ │ └── utils/
│ │ └── utils.py # to_path (~/$VAR expansion), resolve_path, config helpers
│ └── workflows/ # 17 self-registering workflows
│ ├── minimal.py # Example/demo
│ ├── qc/ # QC overlays, track density, stats, batch outliers (post-workflow hook)
│ ├── hcp/ # FSL ProbTrackX2 (HCP-preprocessed inputs)
│ ├── tract_synthseg/ # SynthSeg + tractography meta-workflow; backend per tractography.method
│ ├── mrtrix3/ # 5ttgen → dhollander → msmt_csd → mtnormalise → tckgen → tcksift2
│ ├── full_pipeline/ # preprocess → registration → tractography → tract_similarity
│ │ # Backend: tractography.method=probtrackx2 (default) or mrtrix3
│ ├── preprocess/ # Raw → HCP layout (TOPUP, Eddy, BedpostX, N4, SynthStrip/SynthSeg)
│ ├── registration/ # ANTs + FireANTs registration, swappable viewer
│ ├── synthseg/ # Standalone SynthSeg
│ ├── atlas/ # Cohort statistical atlas (NumPy)
│ ├── learned_atlas/ # Learned deformable cohort atlas (VoxelMorph/AtlasMorph; optional ml extra)
│ ├── atlas_to_patient/ # Warp cohort atlas into patient space
│ ├── transforms/ # Apply ANTs transforms per transforms.jobs (registers as `transform`)
│ └── tract_similarity/ # Per-patient + cohort + hcp_loo + sweep (registers 4 entries)
├── tests/
│ ├── unit/ # Unit tests (no external tools required)
│ ├── integration/ # Integration tests (may require FSL/ANTs)
│ └── conftest.py # Shared pytest fixtures
├── config/ # YAML config (5-level merge: default → hardware → protocol → patient → CLI)
├── data/ # Input data (masks, transforms, HCP subjects)
├── scripts/ # Utility scripts (build_label_map.py, ...)
├── docs/ # Sphinx documentation (this directory)
└── logs/ # Runtime logs (auto-generated, gitignored)
Design Patterns#
1. Module Organisation#
Each cross-cutting concern (config, logging, Nipype execution, output) lives in its own
core/sub-package.Workflows depend on
core/only — no cross-workflow imports. Reuse via shared helpers incore/.Public APIs are exposed through
__init__.py; implementation details stay private.Larger workflows split private implementation details into focused helper modules (e.g. atlas:
workflow.py+compute.py+_params.py+_io.py+_statistics.py).
2. Pipeline Architecture#
Workflows are factory functions decorated with
@workflow(name=…, description=…)fromthesis.core.decorators. The decorator is the outermost of the stack; it reads the metadata the inner decorators attached, synthesises a(config, context) -> Workflowadapter, builds a composite verifier, and self-registers the workflow withWORKFLOW_REGISTRYat import time. It returns the original function unchanged. The body builds and returns a rawnipype.Workflow— there is no workflow base class (core/base.pydoes not exist;BaseModule/BasePipelineStep/BasePipeline/NipypeStepare gone).@requires(...)and@produces(...)attach declarative input/outputPathDeclarationtypes (seecore/path_declarations.py). They are pure metadata attachers and may appear in any order. The adapter resolves each declaration against the activeProcessingContextat workflow-build time and injects the resultingpathlib.Path(orNone/list) as a keyword-only argument named after the declaration key.config/contextare injected only when the body’s signature names them.@verify(check_fn, ...)attaches optional preflight checks. The historical signature is(config, context) -> list[str](an empty list means all clear); a verifier may also opt into the resolved declared kwargs via a**kwargscatch-all or a parameter named after a declared key. Every non-optional@requires(...)declaration also generates an implicit existence check that runs before the explicit@verifychecks.Meta-workflows (e.g.
tract_synthseg,full_pipeline) compose multiple sub-workflows in a single Nipype graph by importing theirbuild_workflowfactories and connecting them contract-to-contract —upstream.outputnode.<field> -> downstream.inputnode.<field>— via theinputnode/outputnodeboundary nodes published inthesis.core.contracts, never via internal node names.
For the complete, worked guide to building a workflow with this decorator API, see custom_workflows.md; to port an existing standalone Nipype script, see nipype_to_framework.md.
3. Configuration Management#
Five-level hierarchy (default → hardware → protocol → patient → CLI overrides).
Deep-merge semantics: later levels override earlier ones, nested keys preserved.
ConfigManager.load_config(config_name, patient_id, protocol, overrides)performs all merging.Configuration models are validated with Pydantic v2 and reject unknown fields (
extra="forbid").See
configuration/index.mdfor the per-section reference.
4. Logging#
All modules use
from thesis.core.logging import get_loggerthenlogger = get_logger(__name__).Do not use stdlib
loggingdirectly — use loguru viaget_logger().Exception: inside Nipype
Functionnodes (which run in separate processes and cannot pickle the loguru logger), useprint().InterceptHandlerbridges third-party loggers (nipype, nibabel) into loguru.File logs rotate at 10 MB and are retained for 7 days.
5. Dependency Injection#
ProcessingContextcarriespatient_id,config,data_dir,input_dir,output_dir,working_dir,results,metadata.It is passed explicitly through the call chain — no global state.
Facilitates testing: swap context with mock to test any function in isolation.
6. Workflow Registry#
Each workflow self-registers at module import time via the
@workflow(...)decorator, which builds aWorkflowEntryand callsWORKFLOW_REGISTRY.register(entry)internally.WORKFLOW_REGISTRY.get("name")retrieves aWorkflowEntry(factory, verifier, description, default_protocol, default_config, is_cohort_level) by its short name.The CLI discovers workflows via
WORKFLOW_REGISTRY.all_entries()and triggers per-workflow imports lazily via_ensure_workflow_imported()incli.py.
7. Structured Output System#
EventBus(thread-safe singleton) decouples event emission from rendering — pipeline code emits structuredEventobjects, and theOutputRenderersubscribes to present them.Events carry an
EventLevel(ERROR, WARNING, IMPORTANT, INFO, DEBUG); the activeOutputMode(quiet/normal/verbose) determines the minimum level shown.End-of-run summaries are Pydantic models (
RunSummary,BatchSummary) built from execution metadata — not freeform text. They include headline, duration, status bullets, failure details, and next-step suggestions.Progress UI (
ProgressTrackerfor spinners,BatchProgressfor tqdm bars;ClickNodeProgressincli.pyfor Nipype node progress) auto-detects TTY vs CI and disables animation for non-interactive contexts.Colour is additive; text labels (
[OK],[FAIL],[WARN]) ensure readability without colour;NO_COLORis respected.CLI flags (
-v,-q,--summary,--no-progress) override YAML-leveloutput:defaults.
8. Cohort Workflows#
Workflows with
is_cohort_level=True(atlas,learned_atlas,tract_similarity_cohort,tract_similarity_hcp_loo,tract_similarity_sweep) ignore-p/--patient-idand--all.The CLI dispatches them to a single
_run_single_patient_with_retriesinvocation withpatient_id="cohort".learned_atlasis the learned alternative to the statisticalatlasworkflow: instead of voxel-wise averaging, it trains a deformable tract-density template (a sharp learnable template plus a deformation-only diffeomorphic network) over the cohort, but emits the same five atlas maps asatlassoatlas_to_patientandtract_similaritykeep working unchanged. It requires the optionalmlextra (pip install -e '.[ml]';torchis lazy-imported only inside the training node) and owns thelearned_atlas:config namespace (schemaLearnedAtlasConfig).
Core Module Responsibilities#
core/config/#
Hierarchical YAML configuration loading, Pydantic validation, and deep-merge helpers. PipelineConfig is the root model; sub-models include PathConfig, HardwareConfig, S3Config, PreprocessingConfig, RegistrationConfig (+ FireantsRegistrationConfig, RegistrationViewerConfig), SegmentationConfig, SynthSegConfig, TractographyConfig, HCPConfig, TransformsConfig (+ TransformJobConfig, AtlasTransformConfig, AtlasSourceConfig), ValidationConfig, QCConfig, AtlasConfig, AtlasQCConfig, TractSimilarityConfig (+ SideThresholdConfig), TractSimilaritySweepConfig (+ ThresholdGridConfig), NipypeConfig, OutputSettingsConfig.
core/logging/#
Loguru-based logging with console + rotating file output. Bridges third-party logging frameworks (InterceptHandler).
core/output/#
Structured output system for CLI runs. EventBus, OutputRenderer, RunResult/RunSummary/BatchSummary Pydantic models, ProgressTracker/BatchProgress for animated spinners and tqdm bars, and configuration via OutputSettingsConfig (YAML defaults overridden by CLI flags).
core/nipype/#
NipypeExecutor— applies execution config (plugin, crash dir, profiler), handles Windows compatibility, runs the workflow.apply_nipype_execution_config— setsnipype.configglobals required for content-based hashing across batch retries.build_nipype_status_callback— translates node start/finish/failure into structured CLI events.count_workflow_nodes— recursive node count (used to size progress bars for nested workflows).interfaces/fsl.py—ProbTrackX2,ProbTrackX2GPU, and a factory that selects the right binary at runtime.interfaces/freesurfer.py—SynthSegCommandLine wrapper formri_synthseg.
core/context.py#
ProcessingContext dataclass — the primary state carrier through the pipeline. Key attributes: patient_id, config, data_dir, input_dir, output_dir, working_dir, results, metadata.
core/decorators.py#
@workflow(name=…, description=…, protocol=…, scope=…), @requires(**path_decls), @produces(**path_decls), and @verify(*checks) decorators that wrap a workflow factory so it self-registers with WORKFLOW_REGISTRY at import time. The outer @workflow synthesises a (config, context) -> Workflow adapter that resolves every declared path and injects the resolved pathlib.Path (or list) as a keyword argument to the body. The composite verifier runs any implicit existence checks from @requires followed by every @verify callable.
core/path_declarations.py#
Declarative path types resolved by the @workflow adapter:
Per-patient inputs:
PatientFile,PatientDir(with optionalconfig_paths,fallback_dirs, and{patient_id}substitution).Outputs / working state:
OutputDir,WorkingFile,CohortDir.Cross-stage handoffs:
PriorOutput(single file or glob discovery undercontext.output_dir).Cohort-shared assets:
DataFile,DataDir(undercontext.data_dir, with path-traversal safety).Glob discovery:
GlobMatch(single pattern),GlobGroup(related patterns sharing a search directory).Structured iteration:
ConfigList(per-item file/dir/str resolution),CohortPatients(iterate per-patient subdirs with optional file-pattern filtering).
See docs/api/core.path_declarations for the full reference.
core/io.py#
NIfTI / bvals / bvecs I/O: load_nifti, save_nifti, load_bvals, load_bvecs, save_bvals, save_bvecs, check_file_exists, ensure_directory, find_files, copy_nifti_metadata, get_file_info.
core/gpu.py#
check_gpu() → GPUStatus(available, reason) — detects GPU probtrackx2 binary and CUDA runtime. Used by the CLI’s startup GPU check.
core/utils/#
to_path() (with ~ and $VAR expansion) and resolve_path() (resolve against a base directory).
Exception Hierarchy#
ThesisError
├── ConfigurationError
├── ValidationError
├── ProcessingError
│ ├── RegistrationError
│ ├── SegmentationError
│ ├── TractographyError
│ └── PipelineError
├── FileIOError
└── DependencyError
For external tool failures (e.g. FSL commands), wrap with RuntimeError or FileNotFoundError providing the failed command in the message.
Data Flow#
CLI command (thesis run)
↓
OutputRenderer subscribes to EventBus ← filters events by verbosity mode
↓
ConfigManager.load_config() ← deep-merge: default → hardware → protocol → patient
↓
_resolve_gpu(cfg) ← CUDA check; may set gpu_enabled=False (one-shot per process)
↓
ProcessingContext.create_context() ← resolves input_dir, output_dir, working_dir
↓
composite_verifier(config, context) ← runs @requires existence checks + @verify callables
↓
@workflow adapter (config, context) ← resolves declared paths, calls build_workflow(**kwargs)
↓
NipypeExecutor (or meta_wf.run()) with status callback + cli progress
├── apply_nipype_execution_config() ← sets nipype.config globals (hash_method, crash_dir, etc.)
└── workflow.run(plugin, plugin_args)
↓
[optional] post-workflow QC overlays ← when qc.generate_overlays: true
↓
[optional] tract_similarity enrichment ← reads metrics.json (full_pipeline only)
↓
RunSummary / BatchSummary rendered ← structured end-of-run report (compact/full/JSON)
↓
[optional] _render_batch_tractography_stats + _persist_batch_stats
← collect_batch_stats + detect_batch_outliers,
writes <output>/batch_stats/stats_<ts>.json + latest.json
Adding a New Workflow#
Tip
This section is a high-level checklist. For the full decorator-API reference —
every decorator and path declaration, the config-schema mechanism, the
inputnode/outputnode contracts, and a common-gotchas list — read
custom_workflows.md. To adapt an existing standalone
Nipype script, follow nipype_to_framework.md.
Create
src/thesis/workflows/my_workflow/workflow.py.Add config fields either by claiming a top-level YAML key via
@workflow(config_namespace=…, config_schema=…)(no edit tocore/config/validators.pyneeded — seecustom_workflows.md), or, for shared cross-workflow settings, by extending the Pydantic models incore/config/validators.pyand updating the matching reference page underconfiguration/.Add tests in
tests/unit/(andtests/integration/if external tools are involved).Define the workflow factory using the decorator API. The decorator stack runs outside-in, so
@workflow(...)is outermost and the inner decorators (@requires,@produces,@verify) attach metadata that the adapter consumes when it resolves paths. The body returns a rawnipype.Workflow— there is no base class to inherit:
from pathlib import Path
from nipype import Node, Workflow
from nipype.interfaces.utility import Function
from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile
def _check_t1_modality(config: PipelineConfig, context: ProcessingContext, **kwargs) -> list[str]:
"""Custom preflight check on top of the implicit existence check."""
t1: Path | None = kwargs.get("t1")
if t1 is not None and t1.suffix not in {".gz", ".nii"}:
return [f"t1 must be a NIfTI file, got {t1.suffix!r}"]
return []
@workflow(
name="my_workflow",
description="One-line description for `thesis list-workflows`.",
protocol="my_workflow", # default_protocol — falls back to None if omitted
default_config="default", # default config when `-c` is not passed
scope="patient", # or "cohort" for cohort-level workflows
)
@requires(
t1=PatientFile(
default="T1w/T1w_acpc_dc_restore.nii.gz",
config_paths=["my_workflow.t1_image", "hcp.t1_image"],
),
)
@produces(out_dir=OutputDir("my_workflow"))
@verify(_check_t1_modality)
def build_workflow(
*,
t1: Path,
out_dir: Path,
config: PipelineConfig,
context: ProcessingContext,
) -> Workflow:
wf = Workflow(name=f"my_workflow_{context.patient_id}")
# ... add Nipype nodes and connections ...
return wf
Notes:
Resolved paths are passed in as keyword-only arguments named after the
@requires/@produceskeys.The wrapped factory may also accept
configand/orcontext— the adapter inspects the signature and only forwards them when declared.Cohort-scope workflows (
scope="cohort") must not declarePatientFile/PatientDirrequirements; the decorator validates this at decoration time. UseCohortDir,CohortPatients,DataFile, etc. instead.Setting
THESIS_STRICT_REGISTRY=1makes duplicate registrations raise instead of warn — useful for catching accidental double-imports in tests.
Make it composable. A standalone single-patient workflow runs as-is, but to embed it in a meta-pipeline (e.g. full_pipeline, tract_synthseg) it must publish a stable I/O contract: call attach_inputnode(...) / attach_outputnode(...) from thesis.core.contracts to expose inputnode / outputnode boundary nodes with the published field names. Meta-workflows then wire upstream.outputnode.<field> -> downstream.inputnode.<field> instead of reaching into internal node names. See the contracts section in nipype_integration.md and src/thesis/workflows/full_pipeline/_core.py for the worked reference.
The CLI picks the new workflow up automatically via thesis run -w my_workflow. For cohort workflows, scope="cohort" makes the CLI skip patient discovery and dispatch as a single cohort run.
See src/thesis/workflows/minimal.py for the smallest possible example and src/thesis/workflows/atlas/workflow.py for a fully-fledged cohort workflow with custom @verify checks.
Key Principles#
Modularity — workflows depend only on
core/; core modules are independent of each other where possible.Clarity — code expresses intent; avoid magic; prefer explicit over implicit.
Testability — dependency injection via
ProcessingContextenables easy mocking.Configurability — all parameters come from YAML config, never hardcoded.
Logging — every module logs key steps; use structured logging for machine parsing.
Type safety — Python 3.11+ type hints on all public APIs; run
mypyin CI.