"""File manipulation utilities for preprocessing workflows.
This module provides utility functions for manipulating dMRI-related files
including bval/bvec files, index files, acquisition parameters, and creating
HCP-compatible directory structures.
"""
import re
from pathlib import Path
from typing import Dict
from thesis.core.exceptions import FileIOError
from thesis.core.logging import get_logger
__all__ = [
"modify_bval",
"create_index_file",
"create_acqparams_file",
"create_hcp_output_structure",
]
logger = get_logger(__name__)
[docs]
def modify_bval(input_bval: str, output_bval: str) -> str:
"""Modify bval file by replacing standalone "100" values with "101".
This function reads a bval file and replaces all standalone occurrences
of "100" with "101", which can be useful for preprocessing workflows that
require distinct b-values. Handles both space and tab separators.
Args:
input_bval: Path to input bval file
output_bval: Path to output bval file
Returns:
Absolute path to the output bval file as a string
Raises:
FileNotFoundError: If input_bval does not exist
FileIOError: If unable to read or write the file
Example:
>>> output = modify_bval("input.bval", "output.bval")
>>> # File containing "0 100 1000 100" becomes "0 101 1000 101"
"""
input_path = Path(input_bval)
output_path = Path(output_bval)
if not input_path.exists():
raise FileNotFoundError(f"Input bval file not found: {input_bval}")
try:
logger.debug(f"Reading bval file: {input_path}")
with open(input_path, "r", encoding="utf-8") as f:
content = f.read()
# Replace standalone "100" with "101" using word boundaries
# This matches "100" that is not part of a larger number
modified_content = re.sub(r"\b100\b", "101", content)
# Skip writing when an identical file already exists. Re-writing a
# file with the same content updates its mtime, which invalidates
# Nipype's input-hash check for Eddy (this file is wired into
# eddy.in_bval) and cascades re-runs through bedpostx, dtifit,
# probtrackx2, etc. on every pipeline invocation.
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
if output_path.exists() and output_path.read_text(encoding="utf-8") == modified_content:
logger.debug(f"Bval file unchanged, skipping write: {output_path}")
return str(output_path.absolute())
except IOError:
pass # fall through to (re)write below
logger.debug(f"Writing modified bval file: {output_path}")
with open(output_path, "w", encoding="utf-8") as f:
f.write(modified_content)
logger.info(f"Modified bval file created: {output_path}")
return str(output_path.absolute())
except IOError as e:
raise FileIOError(f"Failed to modify bval file: {e}") from e
[docs]
def create_index_file(bval_file: str, output_index: str) -> str:
"""Create an index file with "1" for each b-value in the bval file.
This function reads a bval file, counts the number of values, and creates
an index file with "1" for each value. The whitespace structure (spaces,
tabs, newlines) is preserved from the input bval file.
Args:
bval_file: Path to input bval file
output_index: Path to output index file
Returns:
Absolute path to the output index file as a string
Raises:
FileNotFoundError: If bval_file does not exist
FileIOError: If unable to read or write files
Example:
>>> output = create_index_file("input.bval", "index.txt")
>>> # If input.bval contains "0 100 1000", output contains "1 1 1"
"""
bval_path = Path(bval_file)
output_path = Path(output_index)
if not bval_path.exists():
raise FileNotFoundError(f"Input bval file not found: {bval_file}")
try:
logger.debug(f"Reading bval file: {bval_path}")
with open(bval_path, "r", encoding="utf-8") as f:
content = f.read()
# Replace each number with "1", preserving whitespace structure
# Match any sequence of digits and replace with "1"
index_content = re.sub(r"\d+", "1", content)
# Skip writing when an identical file already exists. Re-writing a
# file with the same content updates its mtime, which invalidates
# Nipype's input-hash check for Eddy (this file is wired into
# eddy.in_index) and cascades re-runs through the rest of the
# pipeline on every invocation.
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
if output_path.exists() and output_path.read_text(encoding="utf-8") == index_content:
logger.debug(f"Index file unchanged, skipping write: {output_path}")
return str(output_path.absolute())
except IOError:
pass # fall through to (re)write below
logger.debug(f"Writing index file: {output_path}")
with open(output_path, "w", encoding="utf-8") as f:
f.write(index_content)
logger.info(f"Index file created: {output_path}")
return str(output_path.absolute())
except IOError as e:
raise FileIOError(f"Failed to create index file: {e}") from e
[docs]
def create_acqparams_file(output_path: str, bandwidth: float, phase_encoding_dirs: int) -> str:
"""Create acquisition parameters file for FSL eddy/topup.
This function calculates the readout time from bandwidth and phase encoding
directions, then creates a two-line file specifying AP and PA phase encoding
directions with the calculated readout time.
Args:
output_path: Path to output acqparams file
bandwidth: Bandwidth in Hz
phase_encoding_dirs: Number of phase encoding directions
Returns:
Absolute path to the output acqparams file as a string
Raises:
FileIOError: If unable to write the file
ValueError: If bandwidth <= 0 or phase_encoding_dirs <= 1
Example:
>>> output = create_acqparams_file("acqparams.txt", 2300.0, 96)
>>> # Creates file with readout time = (96-1)/2300.0 = 0.0413
"""
if bandwidth <= 0:
raise ValueError(f"Bandwidth must be positive, got: {bandwidth}")
if phase_encoding_dirs <= 1:
raise ValueError(f"Phase encoding dirs must be > 1, got: {phase_encoding_dirs}")
output_file = Path(output_path)
# Calculate readout time
readout_time = (phase_encoding_dirs - 1) / bandwidth
# Format to 4 decimal places
readout_str = f"{readout_time:.4f}"
# Create content with AP and PA directions
content = f"0 -1 0 {readout_str}\n0 1 0 {readout_str}\n"
# Skip writing when an identical file already exists. Re-writing a file
# with the same content updates its mtime, which invalidates Nipype's
# default timestamp-based node cache. TOPUP and Eddy both use this file
# as a mandatory input, so an unnecessary mtime bump causes both nodes to
# report "Outdated cache found" and re-run on every subsequent invocation.
output_file.parent.mkdir(parents=True, exist_ok=True)
try:
if output_file.exists() and output_file.read_text(encoding="utf-8") == content:
logger.debug(f"Acqparams file unchanged, skipping write: {output_file}")
return str(output_file.absolute())
except IOError:
pass # fall through to (re)write below
try:
logger.debug(f"Writing acqparams file: {output_file}")
with open(output_file, "w", encoding="utf-8") as f:
f.write(content)
logger.info(
f"Acqparams file created: {output_file} "
f"(readout_time={readout_str}, bandwidth={bandwidth}, "
f"phase_encoding_dirs={phase_encoding_dirs})"
)
return str(output_file.absolute())
except IOError as e:
raise FileIOError(f"Failed to create acqparams file: {e}") from e
[docs]
def create_hcp_output_structure(output_dir: Path, patient_id: str) -> Dict[str, Path]:
"""Create HCP-compatible directory structure for processing outputs.
This function creates the standard HCP directory structure used by tools
like BedpostX and ProbTrackX2. All directories are created relative to
the provided output_dir.
Args:
output_dir: Base output directory
patient_id: Patient identifier (for logging purposes)
Returns:
Dictionary mapping directory names to their Path objects:
- "base": output_dir
- "t1w": T1w/
- "diffusion": T1w/Diffusion/
- "bedpostx": T1w/Diffusion.bedpostX/
- "registration": registration/
- "synthseg": synthseg/
- "dti": dti/
- "dti_wls": dti/WLS/
Raises:
FileIOError: If unable to create directories
Example:
>>> dirs = create_hcp_output_structure(Path("/output"), "DTI_001")
>>> print(dirs["diffusion"])
/output/T1w/Diffusion
"""
output_dir = Path(output_dir)
directories = {
"base": output_dir,
"t1w": output_dir / "T1w",
"diffusion": output_dir / "T1w" / "Diffusion",
"bedpostx": output_dir / "T1w" / "Diffusion.bedpostX",
"registration": output_dir / "registration",
"synthseg": output_dir / "synthseg",
"dti": output_dir / "dti",
"dti_wls": output_dir / "dti" / "WLS",
}
try:
logger.debug(f"Creating HCP directory structure for patient {patient_id}")
for name, path in directories.items():
path.mkdir(parents=True, exist_ok=True)
logger.debug(f"Created directory: {path}")
logger.info(f"HCP directory structure created for {patient_id} at {output_dir}")
return directories
except OSError as e:
raise FileIOError(f"Failed to create HCP directory structure: {e}") from e