Source code for thesis.workflows.minimal

"""Minimal Nipype workflow example for CLI usage."""

from nipype import Node, Workflow
from nipype.interfaces.utility import Function

from thesis.core.decorators import produces, workflow
from thesis.core.path_declarations import OutputDir


def _write_marker(output_dir: str, name: str) -> str:
    """Write a marker file to prove workflow execution."""
    from pathlib import Path

    output_path = Path(output_dir) / f"{name}.txt"
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text("workflow completed\n", encoding="utf-8")
    return str(output_path)


[docs] @workflow(name="minimal", description="Minimal example workflow.") @produces(out_dir=OutputDir("")) def build_workflow(*, out_dir, context): """Build a minimal workflow that writes a marker file.""" wf = Workflow(name=f"minimal_{context.patient_id}") marker = Node( Function( input_names=["output_dir", "name"], output_names=["out_file"], function=_write_marker, ), name="marker", ) marker.inputs.output_dir = str(out_dir) marker.inputs.name = context.patient_id wf.add_nodes([marker]) return wf