Project Architecture Overview#

Package Structure#

thesis/
├── src/thesis/
│   ├── __init__.py           # Package metadata (version, exports)
│   ├── __main__.py           # Enables `python -m thesis`
│   ├── cli.py                # Click CLI (run, list-*, show-config, info, stats collect)
│   ├── core/                 # Shared infrastructure (no workflow imports allowed)
│   │   ├── decorators.py     # @workflow / @requires / @produces / @verify decorators
│   │   ├── path_declarations.py  # PatientFile, PatientDir, OutputDir, WorkingFile, CohortDir,
│   │   │                         #   PriorOutput, DataFile, DataDir, GlobMatch, GlobGroup,
│   │   │                         #   ConfigList, CohortPatients
│   │   ├── context.py        # ProcessingContext dataclass (patient_id, config, paths, results)
│   │   ├── exceptions.py     # ThesisError hierarchy
│   │   ├── io.py             # NIfTI / bvals / bvecs I/O utilities
│   │   ├── gpu.py            # GPU/CUDA detection (check_gpu, GPUStatus)
│   │   ├── registry.py       # WorkflowRegistry, WorkflowEntry, WORKFLOW_REGISTRY
│   │   ├── config/
│   │   │   ├── manager.py    # ConfigManager — hierarchical YAML loading
│   │   │   ├── loaders.py    # YAML load/save/deep-merge helpers
│   │   │   └── validators.py # Pydantic v2 config models (PipelineConfig + 20+ sub-models)
│   │   ├── logging/
│   │   │   ├── __init__.py   # setup_logging, get_logger, reset_logging,
│   │   │   │                  #   set_console_level, suppress_nipype_native_logging,
│   │   │   │                  #   suspend_console_logging
│   │   │   ├── formatters.py # Log format strings
│   │   │   └── handlers.py   # InterceptHandler, PatientLogHandler, PerformanceLogger
│   │   ├── output/
│   │   │   ├── __init__.py   # Public exports
│   │   │   ├── events.py     # EventLevel, Event, EventBus (thread-safe singleton)
│   │   │   ├── modes.py      # OutputMode, SummaryDetail, OutputFormat, OutputConfig
│   │   │   ├── summary.py    # RunStatus, RunResult, RunSummary, BatchSummary
│   │   │   ├── progress.py   # ProgressTracker (spinner), BatchProgress (tqdm)
│   │   │   └── renderer.py   # OutputRenderer (filters events, renders summaries)
│   │   ├── nipype/
│   │   │   ├── executor.py    # NipypeExecutor, run_workflow,
│   │   │   │                   #   apply_nipype_execution_config,
│   │   │   │                   #   build_nipype_status_callback, count_workflow_nodes
│   │   │   └── interfaces/
│   │   │       ├── fsl.py         # ProbTrackX2 (stderr fix), ProbTrackX2GPU, factory
│   │   │       └── freesurfer.py  # SynthSeg CommandLine wrapper for mri_synthseg
│   │   └── utils/
│   │       └── utils.py      # to_path (~/$VAR expansion), resolve_path, config helpers
│   └── workflows/                # 17 self-registering workflows
│       ├── minimal.py            # Example/demo
│       ├── qc/                   # QC overlays, track density, stats, batch outliers (post-workflow hook)
│       ├── hcp/                  # FSL ProbTrackX2 (HCP-preprocessed inputs)
│       ├── tract_synthseg/       # SynthSeg + tractography meta-workflow; backend per tractography.method
│       ├── mrtrix3/              # 5ttgen → dhollander → msmt_csd → mtnormalise → tckgen → tcksift2
│       ├── full_pipeline/        # preprocess → registration → tractography → tract_similarity
│       │                         # Backend: tractography.method=probtrackx2 (default) or mrtrix3
│       ├── preprocess/           # Raw → HCP layout (TOPUP, Eddy, BedpostX, N4, SynthStrip/SynthSeg)
│       ├── registration/         # ANTs + FireANTs registration, swappable viewer
│       ├── synthseg/             # Standalone SynthSeg
│       ├── atlas/                # Cohort statistical atlas (NumPy)
│       ├── learned_atlas/        # Learned deformable cohort atlas (VoxelMorph/AtlasMorph; optional ml extra)
│       ├── atlas_to_patient/     # Warp cohort atlas into patient space
│       ├── transforms/           # Apply ANTs transforms per transforms.jobs (registers as `transform`)
│       └── tract_similarity/     # Per-patient + cohort + hcp_loo + sweep (registers 4 entries)
├── tests/
│   ├── unit/                 # Unit tests (no external tools required)
│   ├── integration/          # Integration tests (may require FSL/ANTs)
│   └── conftest.py           # Shared pytest fixtures
├── config/                   # YAML config (5-level merge: default → hardware → protocol → patient → CLI)
├── data/                     # Input data (masks, transforms, HCP subjects)
├── scripts/                  # Utility scripts (build_label_map.py, ...)
├── docs/                     # Sphinx documentation (this directory)
└── logs/                     # Runtime logs (auto-generated, gitignored)

Design Patterns#

1. Module Organisation#

  • Each cross-cutting concern (config, logging, Nipype execution, output) lives in its own core/ sub-package.

  • Workflows depend on core/ only — no cross-workflow imports. Reuse via shared helpers in core/.

  • Public APIs are exposed through __init__.py; implementation details stay private.

  • Larger workflows split private implementation details into focused helper modules (e.g. atlas: workflow.py + compute.py + _params.py + _io.py + _statistics.py).

2. Pipeline Architecture#

  • Workflows are factory functions decorated with @workflow(name=…, description=…) from thesis.core.decorators. The decorator is the outermost of the stack; it reads the metadata the inner decorators attached, synthesises a (config, context) -> Workflow adapter, builds a composite verifier, and self-registers the workflow with WORKFLOW_REGISTRY at import time. It returns the original function unchanged. The body builds and returns a raw nipype.Workflow — there is no workflow base class (core/base.py does not exist; BaseModule/BasePipelineStep/BasePipeline/NipypeStep are gone).

  • @requires(...) and @produces(...) attach declarative input/output PathDeclaration types (see core/path_declarations.py). They are pure metadata attachers and may appear in any order. The adapter resolves each declaration against the active ProcessingContext at workflow-build time and injects the resulting pathlib.Path (or None/list) as a keyword-only argument named after the declaration key. config/context are injected only when the body’s signature names them.

  • @verify(check_fn, ...) attaches optional preflight checks. The historical signature is (config, context) -> list[str] (an empty list means all clear); a verifier may also opt into the resolved declared kwargs via a **kwargs catch-all or a parameter named after a declared key. Every non-optional @requires(...) declaration also generates an implicit existence check that runs before the explicit @verify checks.

  • Meta-workflows (e.g. tract_synthseg, full_pipeline) compose multiple sub-workflows in a single Nipype graph by importing their build_workflow factories and connecting them contract-to-contractupstream.outputnode.<field> -> downstream.inputnode.<field> — via the inputnode / outputnode boundary nodes published in thesis.core.contracts, never via internal node names.

For the complete, worked guide to building a workflow with this decorator API, see custom_workflows.md; to port an existing standalone Nipype script, see nipype_to_framework.md.

3. Configuration Management#

  • Five-level hierarchy (default → hardware → protocol → patient → CLI overrides).

  • Deep-merge semantics: later levels override earlier ones, nested keys preserved.

  • ConfigManager.load_config(config_name, patient_id, protocol, overrides) performs all merging.

  • Configuration models are validated with Pydantic v2 and reject unknown fields (extra="forbid").

  • See configuration/index.md for the per-section reference.

4. Logging#

  • All modules use from thesis.core.logging import get_logger then logger = get_logger(__name__).

  • Do not use stdlib logging directly — use loguru via get_logger().

  • Exception: inside Nipype Function nodes (which run in separate processes and cannot pickle the loguru logger), use print().

  • InterceptHandler bridges third-party loggers (nipype, nibabel) into loguru.

  • File logs rotate at 10 MB and are retained for 7 days.

5. Dependency Injection#

  • ProcessingContext carries patient_id, config, data_dir, input_dir, output_dir, working_dir, results, metadata.

  • It is passed explicitly through the call chain — no global state.

  • Facilitates testing: swap context with mock to test any function in isolation.

6. Workflow Registry#

  • Each workflow self-registers at module import time via the @workflow(...) decorator, which builds a WorkflowEntry and calls WORKFLOW_REGISTRY.register(entry) internally.

  • WORKFLOW_REGISTRY.get("name") retrieves a WorkflowEntry (factory, verifier, description, default_protocol, default_config, is_cohort_level) by its short name.

  • The CLI discovers workflows via WORKFLOW_REGISTRY.all_entries() and triggers per-workflow imports lazily via _ensure_workflow_imported() in cli.py.

7. Structured Output System#

  • EventBus (thread-safe singleton) decouples event emission from rendering — pipeline code emits structured Event objects, and the OutputRenderer subscribes to present them.

  • Events carry an EventLevel (ERROR, WARNING, IMPORTANT, INFO, DEBUG); the active OutputMode (quiet/normal/verbose) determines the minimum level shown.

  • End-of-run summaries are Pydantic models (RunSummary, BatchSummary) built from execution metadata — not freeform text. They include headline, duration, status bullets, failure details, and next-step suggestions.

  • Progress UI (ProgressTracker for spinners, BatchProgress for tqdm bars; ClickNodeProgress in cli.py for Nipype node progress) auto-detects TTY vs CI and disables animation for non-interactive contexts.

  • Colour is additive; text labels ([OK], [FAIL], [WARN]) ensure readability without colour; NO_COLOR is respected.

  • CLI flags (-v, -q, --summary, --no-progress) override YAML-level output: defaults.

8. Cohort Workflows#

  • Workflows with is_cohort_level=True (atlas, learned_atlas, tract_similarity_cohort, tract_similarity_hcp_loo, tract_similarity_sweep) ignore -p/--patient-id and --all.

  • The CLI dispatches them to a single _run_single_patient_with_retries invocation with patient_id="cohort".

  • learned_atlas is the learned alternative to the statistical atlas workflow: instead of voxel-wise averaging, it trains a deformable tract-density template (a sharp learnable template plus a deformation-only diffeomorphic network) over the cohort, but emits the same five atlas maps as atlas so atlas_to_patient and tract_similarity keep working unchanged. It requires the optional ml extra (pip install -e '.[ml]'; torch is lazy-imported only inside the training node) and owns the learned_atlas: config namespace (schema LearnedAtlasConfig).


Core Module Responsibilities#

core/config/#

Hierarchical YAML configuration loading, Pydantic validation, and deep-merge helpers. PipelineConfig is the root model; sub-models include PathConfig, HardwareConfig, S3Config, PreprocessingConfig, RegistrationConfig (+ FireantsRegistrationConfig, RegistrationViewerConfig), SegmentationConfig, SynthSegConfig, TractographyConfig, HCPConfig, TransformsConfig (+ TransformJobConfig, AtlasTransformConfig, AtlasSourceConfig), ValidationConfig, QCConfig, AtlasConfig, AtlasQCConfig, TractSimilarityConfig (+ SideThresholdConfig), TractSimilaritySweepConfig (+ ThresholdGridConfig), NipypeConfig, OutputSettingsConfig.

core/logging/#

Loguru-based logging with console + rotating file output. Bridges third-party logging frameworks (InterceptHandler).

core/output/#

Structured output system for CLI runs. EventBus, OutputRenderer, RunResult/RunSummary/BatchSummary Pydantic models, ProgressTracker/BatchProgress for animated spinners and tqdm bars, and configuration via OutputSettingsConfig (YAML defaults overridden by CLI flags).

core/nipype/#

  • NipypeExecutor — applies execution config (plugin, crash dir, profiler), handles Windows compatibility, runs the workflow.

  • apply_nipype_execution_config — sets nipype.config globals required for content-based hashing across batch retries.

  • build_nipype_status_callback — translates node start/finish/failure into structured CLI events.

  • count_workflow_nodes — recursive node count (used to size progress bars for nested workflows).

  • interfaces/fsl.pyProbTrackX2, ProbTrackX2GPU, and a factory that selects the right binary at runtime.

  • interfaces/freesurfer.pySynthSeg CommandLine wrapper for mri_synthseg.

core/context.py#

ProcessingContext dataclass — the primary state carrier through the pipeline. Key attributes: patient_id, config, data_dir, input_dir, output_dir, working_dir, results, metadata.

core/decorators.py#

@workflow(name=…, description=…, protocol=…, scope=…), @requires(**path_decls), @produces(**path_decls), and @verify(*checks) decorators that wrap a workflow factory so it self-registers with WORKFLOW_REGISTRY at import time. The outer @workflow synthesises a (config, context) -> Workflow adapter that resolves every declared path and injects the resolved pathlib.Path (or list) as a keyword argument to the body. The composite verifier runs any implicit existence checks from @requires followed by every @verify callable.

core/path_declarations.py#

Declarative path types resolved by the @workflow adapter:

  • Per-patient inputs: PatientFile, PatientDir (with optional config_paths, fallback_dirs, and {patient_id} substitution).

  • Outputs / working state: OutputDir, WorkingFile, CohortDir.

  • Cross-stage handoffs: PriorOutput (single file or glob discovery under context.output_dir).

  • Cohort-shared assets: DataFile, DataDir (under context.data_dir, with path-traversal safety).

  • Glob discovery: GlobMatch (single pattern), GlobGroup (related patterns sharing a search directory).

  • Structured iteration: ConfigList (per-item file/dir/str resolution), CohortPatients (iterate per-patient subdirs with optional file-pattern filtering).

See docs/api/core.path_declarations for the full reference.

core/io.py#

NIfTI / bvals / bvecs I/O: load_nifti, save_nifti, load_bvals, load_bvecs, save_bvals, save_bvecs, check_file_exists, ensure_directory, find_files, copy_nifti_metadata, get_file_info.

core/gpu.py#

check_gpu()GPUStatus(available, reason) — detects GPU probtrackx2 binary and CUDA runtime. Used by the CLI’s startup GPU check.

core/utils/#

to_path() (with ~ and $VAR expansion) and resolve_path() (resolve against a base directory).


Exception Hierarchy#

ThesisError
├── ConfigurationError
├── ValidationError
├── ProcessingError
│   ├── RegistrationError
│   ├── SegmentationError
│   ├── TractographyError
│   └── PipelineError
├── FileIOError
└── DependencyError

For external tool failures (e.g. FSL commands), wrap with RuntimeError or FileNotFoundError providing the failed command in the message.


Data Flow#

CLI command (thesis run)
    ↓
OutputRenderer subscribes to EventBus   ← filters events by verbosity mode
    ↓
ConfigManager.load_config()              ← deep-merge: default → hardware → protocol → patient
    ↓
_resolve_gpu(cfg)                        ← CUDA check; may set gpu_enabled=False (one-shot per process)
    ↓
ProcessingContext.create_context()       ← resolves input_dir, output_dir, working_dir
    ↓
composite_verifier(config, context)      ← runs @requires existence checks + @verify callables
    ↓
@workflow adapter (config, context)      ← resolves declared paths, calls build_workflow(**kwargs)
    ↓
NipypeExecutor (or meta_wf.run()) with status callback + cli progress
    ├── apply_nipype_execution_config()  ← sets nipype.config globals (hash_method, crash_dir, etc.)
    └── workflow.run(plugin, plugin_args)
    ↓
[optional] post-workflow QC overlays     ← when qc.generate_overlays: true
    ↓
[optional] tract_similarity enrichment   ← reads metrics.json (full_pipeline only)
    ↓
RunSummary / BatchSummary rendered       ← structured end-of-run report (compact/full/JSON)
    ↓
[optional] _render_batch_tractography_stats + _persist_batch_stats
                                          ← collect_batch_stats + detect_batch_outliers,
                                            writes <output>/batch_stats/stats_<ts>.json + latest.json

Adding a New Workflow#

Tip

This section is a high-level checklist. For the full decorator-API reference — every decorator and path declaration, the config-schema mechanism, the inputnode/outputnode contracts, and a common-gotchas list — read custom_workflows.md. To adapt an existing standalone Nipype script, follow nipype_to_framework.md.

  1. Create src/thesis/workflows/my_workflow/workflow.py.

  2. Add config fields either by claiming a top-level YAML key via @workflow(config_namespace=…, config_schema=…) (no edit to core/config/validators.py needed — see custom_workflows.md), or, for shared cross-workflow settings, by extending the Pydantic models in core/config/validators.py and updating the matching reference page under configuration/.

  3. Add tests in tests/unit/ (and tests/integration/ if external tools are involved).

  4. Define the workflow factory using the decorator API. The decorator stack runs outside-in, so @workflow(...) is outermost and the inner decorators (@requires, @produces, @verify) attach metadata that the adapter consumes when it resolves paths. The body returns a raw nipype.Workflow — there is no base class to inherit:

from pathlib import Path

from nipype import Node, Workflow
from nipype.interfaces.utility import Function

from thesis.core.config import PipelineConfig
from thesis.core.context import ProcessingContext
from thesis.core.decorators import produces, requires, verify, workflow
from thesis.core.path_declarations import OutputDir, PatientFile


def _check_t1_modality(config: PipelineConfig, context: ProcessingContext, **kwargs) -> list[str]:
    """Custom preflight check on top of the implicit existence check."""
    t1: Path | None = kwargs.get("t1")
    if t1 is not None and t1.suffix not in {".gz", ".nii"}:
        return [f"t1 must be a NIfTI file, got {t1.suffix!r}"]
    return []


@workflow(
    name="my_workflow",
    description="One-line description for `thesis list-workflows`.",
    protocol="my_workflow",        # default_protocol — falls back to None if omitted
    default_config="default",      # default config when `-c` is not passed
    scope="patient",               # or "cohort" for cohort-level workflows
)
@requires(
    t1=PatientFile(
        default="T1w/T1w_acpc_dc_restore.nii.gz",
        config_paths=["my_workflow.t1_image", "hcp.t1_image"],
    ),
)
@produces(out_dir=OutputDir("my_workflow"))
@verify(_check_t1_modality)
def build_workflow(
    *,
    t1: Path,
    out_dir: Path,
    config: PipelineConfig,
    context: ProcessingContext,
) -> Workflow:
    wf = Workflow(name=f"my_workflow_{context.patient_id}")
    # ... add Nipype nodes and connections ...
    return wf

Notes:

  • Resolved paths are passed in as keyword-only arguments named after the @requires / @produces keys.

  • The wrapped factory may also accept config and/or context — the adapter inspects the signature and only forwards them when declared.

  • Cohort-scope workflows (scope="cohort") must not declare PatientFile / PatientDir requirements; the decorator validates this at decoration time. Use CohortDir, CohortPatients, DataFile, etc. instead.

  • Setting THESIS_STRICT_REGISTRY=1 makes duplicate registrations raise instead of warn — useful for catching accidental double-imports in tests.

Make it composable. A standalone single-patient workflow runs as-is, but to embed it in a meta-pipeline (e.g. full_pipeline, tract_synthseg) it must publish a stable I/O contract: call attach_inputnode(...) / attach_outputnode(...) from thesis.core.contracts to expose inputnode / outputnode boundary nodes with the published field names. Meta-workflows then wire upstream.outputnode.<field> -> downstream.inputnode.<field> instead of reaching into internal node names. See the contracts section in nipype_integration.md and src/thesis/workflows/full_pipeline/_core.py for the worked reference.

The CLI picks the new workflow up automatically via thesis run -w my_workflow. For cohort workflows, scope="cohort" makes the CLI skip patient discovery and dispatch as a single cohort run.

See src/thesis/workflows/minimal.py for the smallest possible example and src/thesis/workflows/atlas/workflow.py for a fully-fledged cohort workflow with custom @verify checks.


Key Principles#

  • Modularity — workflows depend only on core/; core modules are independent of each other where possible.

  • Clarity — code expresses intent; avoid magic; prefer explicit over implicit.

  • Testability — dependency injection via ProcessingContext enables easy mocking.

  • Configurability — all parameters come from YAML config, never hardcoded.

  • Logging — every module logs key steps; use structured logging for machine parsing.

  • Type safety — Python 3.11+ type hints on all public APIs; run mypy in CI.