import sys
import os
import os.path as osp
import numpy as np
from typing import Optional
from nibabel.nifti1 import Nifti1Image
from rtcog.preproc.preproc_steps import PreprocStep
from rtcog.preproc.step_types import StepType
from rtcog.utils.exceptions import VolumeOverflowError
from rtcog.utils.options import Options
from rtcog.utils.log import get_logger
from rtcog.utils.fMRI import unmask_fMRI_img
from rtcog.paths import OUTPUT_DIR
log = get_logger()
[docs]
class Pipeline:
"""
Realtime fMRI preprocessing pipeline.
This class implements a pipeline design pattern for modular preprocessing of fMRI data.
Data flows through a series of configurable steps (e.g., smoothing, normalization),
allowing flexible analysis workflows. Each step is a separate object that can be
enabled/disabled and configured independently.
Attributes
----------
Nt : int
Total number of expected volumes (TRs) in the session.
mask_Nv : int
Number of voxels in the brain mask.
mask_img : nibabel.Nifti1Image or None
Brain mask image used.
Nv : int
Number of voxels in the data. Equal to mask_Nv.
steps : list of PreprocStep
Initialized preprocessing step objects.
run_funcs : list of callable
Preprocessing functions to apply to each TR during processing.
motion_estimates : list
Flattened list of motion parameters across volumes.
Data_FromAFNI : np.ndarray or None
Raw input data from AFNI with shape (Nv, Nt).
Data_processed : np.ndarray or None
Fully processed data with shape (Nv, Nt).
processed_tr : np.ndarray
Last processed TR as a column vector (Nv, 1).
out_dir : str
Directory where outputs will be written.
out_prefix : str
Filename prefix for output files.
save_orig : bool
Whether to save the original, unprocessed incoming volumes.
snapshot : bool
Whether to save a debug snapshot of internal variables.
"""
def __init__(self, options: Options, Nt: int, mask_Nv: int, mask_img: Nifti1Image = None):
"""
Initialize the Pipeline object and prepare for real-time fMRI processing.
Parameters
----------
options : Options
Parsed configuration object with pipeline flags and settings.
Nt : int
Total number of TRs expected during the session.
mask_Nv : int
Number of voxels included in the brain mask.
mask_img : nibabel.Nifti1Image, optional
Brain mask used.
"""
self.t = None
self.n = None
self.Nt = Nt
self.mask_Nv = mask_Nv
self.mask_img = mask_img
self.Nv = 0
self._processed_tr = np.zeros((self.mask_Nv,1))
self.motion_estimates = []
self.save_orig = options.save_orig
self.out_dir = options.out_dir
self.out_prefix = options.out_prefix
self.snapshot = options.snapshot
self.snapshot_dir = getattr(options, "snapshot_dir", None) or OUTPUT_DIR
self.Data_FromAFNI = None # np.array (Nv,Nt) for incoming data
self.Data_processed = None
self.step_registry = PreprocStep.registry
self.step_opts = options.steps
self.build_steps()
self.run_funcs = [step.run for step in self.steps]
@property
def processed_tr(self) -> np.ndarray:
"""np.ndarray: Last processed TR as a column vector of shape (Nv, 1)."""
return self._processed_tr
@processed_tr.setter
def processed_tr(self, value: np.ndarray) -> None:
"""
Setter for processed_tr, ensuring correct shape and type.
Parameters
----------
value : np.ndarray
Column vector of shape (Nv, 1) representing the processed TR.
Raises
------
SystemExit
If the input is not a NumPy array or has the wrong shape.
"""
if not isinstance(value, np.ndarray):
raise ValueError(f"pipeline.processed_tr must be a numpy array, but is of type {type(value)}")
if value.shape != (self.mask_Nv, 1):
log.error(f'pipeline.processed_tr has incorrect shape. Expected: {self.mask_Nv, 1}. Actual: {value.shape}')
raise ValueError(f'pipeline.processed_tr has incorrect shape. Expected: {self.mask_Nv, 1}. Actual: {value.shape}')
self._processed_tr = value
[docs]
def build_steps(self) -> None:
"""
Construct the list of preprocessing steps based on user options.
This method uses the registered step types in `PreprocStep.registry`
and only instantiates steps that are marked as "enabled" in the config.
"""
self.steps = []
for step in self.step_opts:
name = step["name"].lower()
if name not in self.step_registry:
log.error(f'Unknown step: {name}')
sys.exit(-1)
StepClass = self.step_registry.get(name)
if not step.get("enabled", False):
continue
save = step.pop("save", False)
step.pop("enabled", None)
step.pop("name", None)
self.steps.append(StepClass(save=save, Nv=self.mask_Nv, Nt=self.Nt, **step))
log.info(f"Steps used: {', '.join([step.name for step in self.steps])}")
[docs]
def process_first_volume(self, this_t_data: np.ndarray) -> None:
"""
Initialize processing pipeline on the first volume.
Parameters
----------
this_t_data : np.ndarray
A 1D array of data for the first TR, with length equal to the number of voxels.
Raises
------
SystemExit
If the number of voxels in `this_t_data` does not match the expected mask size.
"""
self.Nv = len(this_t_data)
if self.mask_Nv != self.Nv:
raise ValueError(f'Discrepancy across masks [data Nv = {self.Nv}, mask Nv = {self.mask_Nv}]')
self.Data_FromAFNI = np.zeros((self.Nv, self.Nt))
self.Data_processed = np.zeros((self.Nv, self.Nt))
self._processed_tr = np.zeros((self.Nv, 1))
for step in self.steps:
step.start_step(self)
[docs]
def process(self, t: int, n: int, motion: list[float], this_t_data: np.ndarray) -> Optional[np.ndarray]:
"""
Run full processing pipeline on a single TR.
Parameters
----------
t : int
Time index for current volume.
n : int
Index used to determine whether this volume is discarded (0) or kept (non-zero).
motion : list or array-like
Motion parameters for the current volume.
this_t_data : np.ndarray
A 1D array of voxel data for the current time point.
Returns
-------
np.ndarray or None
The processed data for the current time point as a column vector,
or `None` if the volume is discarded.
Raises
------
VolumeOverflowError
If the time index exceeds the expected number of time points.
"""
self.t = t
self.n = n
self.motion = motion
if t == 0:
self.process_first_volume(this_t_data)
# Store raw data
try:
self.Data_FromAFNI[:, self.t] = this_t_data
except IndexError:
raise VolumeOverflowError()
# Skip processing during discard volumes
if self.n == 0:
return None
# Start with raw data
self.processed_tr = this_t_data[:, np.newaxis]
# Apply each preprocessing step in sequence
# Each step modifies the data in-place for efficiency
for func in self.run_funcs:
self.processed_tr[:] = func(self)
# Save output
self.Data_processed[:, self.t] = self.processed_tr[:, 0]
return self.processed_tr
[docs]
def final_steps(self, save=True) -> None:
"""
Run finalization steps after all volumes are processed.
This includes saving motion estimates, writing processed NIfTI files,
and optionally saving a snapshot of internal variables for testing/debugging.
"""
for step in self.steps:
step.end_step(self)
if not save:
return
self.save_motion_estimates()
if self.mask_img is None:
# TODO: decide if I should require mask_img or not, since I do upstream in Options, so would have to adjust that
log.warning(' final_steps = No additional outputs generated due to lack of mask.')
return
log.debug(' final_steps - About to write outputs to disk.')
self.save_nifti_files()
# If running snapshot test, save the variable states
if self.snapshot:
var_dict = {}
for step in self.steps:
var_dict.update(step.snapshot())
var_dict.update({
'Data_FromAFNI': self.Data_FromAFNI,
'Data_processed': self.Data_processed
})
os.makedirs(self.snapshot_dir, exist_ok=True)
snap_path = osp.join(self.snapshot_dir, f'new_snapshots.npz')
np.savez(snap_path, **var_dict)
log.info(f'Snapshot saved to snapshot_dir at {snap_path}')
[docs]
def save_motion_estimates(self) -> None:
"""
Flatten and save motion estimates to disk.
Output file is saved to: `self.out_dir/self.out_prefix + '.Motion.1D'`
"""
self.motion_estimates = [item for sublist in self.motion_estimates for item in sublist]
log.info(f'motion_estimates length is {len(self.motion_estimates)}')
self.motion_estimates = np.reshape(self.motion_estimates,newshape=(int(len(self.motion_estimates)/6),6))
motion_path = osp.join(self.out_dir,self.out_prefix+'.Motion.1D')
np.savetxt(
motion_path,
self.motion_estimates,
delimiter="\t"
)
log.info(f'Motion estimates saved to disk: [{motion_path}]')
[docs]
def save_nifti_files(self) -> None:
"""
Save processed and (optionally) original fMRI data to NIfTI format.
Files are saved using `unmask_fMRI_img`, reconstructing full brain volumes
from masked data and writing to: `self.out_dir/self.out_prefix + <suffix>`
"""
out_vars = [self.Data_processed]
out_labels = ['.pp_Final.nii']
if self.save_orig:
out_vars.append(self.Data_FromAFNI)
out_labels.append('.orig.nii')
for variable, file_suffix in zip(out_vars, out_labels):
unmask_fMRI_img(np.array(variable), self.mask_img, osp.join(self.out_dir,self.out_prefix+file_suffix))
for step in self.steps:
step.save_nifti(self)