Source code for thesis.workflows.minimal
"""Minimal Nipype workflow example for CLI usage."""
from nipype import Node, Workflow
from nipype.interfaces.utility import Function
from thesis.core.decorators import produces, workflow
from thesis.core.path_declarations import OutputDir
def _write_marker(output_dir: str, name: str) -> str:
"""Write a marker file to prove workflow execution."""
from pathlib import Path
output_path = Path(output_dir) / f"{name}.txt"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text("workflow completed\n", encoding="utf-8")
return str(output_path)
[docs]
@workflow(name="minimal", description="Minimal example workflow.")
@produces(out_dir=OutputDir(""))
def build_workflow(*, out_dir, context):
"""Build a minimal workflow that writes a marker file."""
wf = Workflow(name=f"minimal_{context.patient_id}")
marker = Node(
Function(
input_names=["output_dir", "name"],
output_names=["out_file"],
function=_write_marker,
),
name="marker",
)
marker.inputs.output_dir = str(out_dir)
marker.inputs.name = context.patient_id
wf.add_nodes([marker])
return wf