rtcog.preproc package

Subpackages

Submodules

rtcog.preproc.pipeline module

class rtcog.preproc.pipeline.Pipeline(options: Options, Nt: int, mask_Nv: int, mask_img: nibabel.nifti1.Nifti1Image | None = None)[source]

Bases: object

Realtime fMRI preprocessing pipeline.

This class implements a pipeline design pattern for modular preprocessing of fMRI data. Data flows through a series of configurable steps (e.g., smoothing, normalization), allowing flexible analysis workflows. Each step is a separate object that can be enabled/disabled and configured independently.

Nt

Total number of expected volumes (TRs) in the session.

Type:

int

mask_Nv

Number of voxels in the brain mask.

Type:

int

mask_img

Brain mask image used.

Type:

nibabel.Nifti1Image or None

Nv

Number of voxels in the data. Equal to mask_Nv.

Type:

int

steps

Initialized preprocessing step objects.

Type:

list of PreprocStep

run_funcs

Preprocessing functions to apply to each TR during processing.

Type:

list of callable

motion_estimates

Flattened list of motion parameters across volumes.

Type:

list

Data_FromAFNI

Raw input data from AFNI with shape (Nv, Nt).

Type:

np.ndarray or None

Data_processed

Fully processed data with shape (Nv, Nt).

Type:

np.ndarray or None

processed_tr

Last processed TR as a column vector (Nv, 1).

Type:

np.ndarray

out_dir

Directory where outputs will be written.

Type:

str

out_prefix

Filename prefix for output files.

Type:

str

save_orig

Whether to save the original, unprocessed incoming volumes.

Type:

bool

snapshot

Whether to save a debug snapshot of internal variables.

Type:

bool

build_steps() None[source]

Construct the list of preprocessing steps based on user options.

This method uses the registered step types in PreprocStep.registry and only instantiates steps that are marked as “enabled” in the config.

final_steps(save=True) None[source]

Run finalization steps after all volumes are processed.

This includes saving motion estimates, writing processed NIfTI files, and optionally saving a snapshot of internal variables for testing/debugging.

process(t: int, n: int, motion: list[float], this_t_data: numpy.ndarray) numpy.ndarray | None[source]

Run full processing pipeline on a single TR.

Parameters:
  • t (int) – Time index for current volume.

  • n (int) – Index used to determine whether this volume is discarded (0) or kept (non-zero).

  • motion (list or array-like) – Motion parameters for the current volume.

  • this_t_data (np.ndarray) – A 1D array of voxel data for the current time point.

Returns:

The processed data for the current time point as a column vector, or None if the volume is discarded.

Return type:

np.ndarray or None

Raises:

VolumeOverflowError – If the time index exceeds the expected number of time points.

process_first_volume(this_t_data: numpy.ndarray) None[source]

Initialize processing pipeline on the first volume.

Parameters:

this_t_data (np.ndarray) – A 1D array of data for the first TR, with length equal to the number of voxels.

Raises:

SystemExit – If the number of voxels in this_t_data does not match the expected mask size.

property processed_tr: numpy.ndarray

Last processed TR as a column vector of shape (Nv, 1).

Type:

np.ndarray

save_motion_estimates() None[source]

Flatten and save motion estimates to disk.

Output file is saved to: self.out_dir/self.out_prefix + ‘.Motion.1D’

save_nifti_files() None[source]

Save processed and (optionally) original fMRI data to NIfTI format.

Files are saved using unmask_fMRI_img, reconstructing full brain volumes from masked data and writing to: self.out_dir/self.out_prefix + <suffix>

rtcog.preproc.preproc_steps module

class rtcog.preproc.preproc_steps.EMAStep(*, save=False, suffix='.pp_EMA.nii', Nv, Nt, alpha=0.98)[source]

Bases: PreprocStep

Exponential moving average

class rtcog.preproc.preproc_steps.KalmanStep(*, save=False, suffix='.pp_Kalman_LPfilter.nii', Nv, Nt, n_cores=10)[source]

Bases: PreprocStep

Kalman filter for low pass filtering, spike removal, and signal smoothing

end_step(pipeline)[source]
class rtcog.preproc.preproc_steps.PreprocStep(*, save=False, suffix=None, Nv, Nt, **kwargs)[source]

Bases: object

Base class for a preprocessing step in the real-time fMRI pipeline.

Subclasses must implement the _run(pipeline) method, which is called on each TR and receives access to the pipeline’s state. Optionally, subclasses may also implement _start(pipeline) to initialize state at the first TR, and _save(pipeline) to perform any custom saving logic at the end of the run.

save

Whether to save the output from this step to disk.

Type:

bool

suffix

Filename suffix to use for saving a NIfTI file (if save=True).

Type:

str or None

Nv

Number of voxels.

Type:

int

Nt

Number of time points.

Type:

int

data_out

Cached 2D array of shape (N_voxels, N_timepoints) storing output for each TR, populated if save=True.

Type:

np.ndarray or None

Class Attributes
----------------
registry

Mapping of registered step names (e.g., “ema”, “iglm”) to class objects. Automatically populated via __init_subclass__.

Type:

dict

start_step(pipeline):

Optional setup logic to initialize internal state before processing begins.

run(pipeline):

Executes the step’s logic on the current TR. Saves result to data_out if saving is enabled.

end_step(pipeline):

Optional cleanup logic to perform after processing has completed.

save_nifti(pipeline):

Saves the accumulated data to disk as a NIfTI file using the provided mask and filename.

get_class(name):

Class method that retrieves a registered step class by name (case-insensitive).

end_step(pipeline)[source]
classmethod get_class(name)[source]
property name
registry = {'ema': <class 'rtcog.preproc.preproc_steps.EMAStep'>, 'iglm': <class 'rtcog.preproc.preproc_steps.iGLMStep'>, 'kalman': <class 'rtcog.preproc.preproc_steps.KalmanStep'>, 'smooth': <class 'rtcog.preproc.preproc_steps.SmoothStep'>, 'snorm': <class 'rtcog.preproc.preproc_steps.SnormStep'>, 'tnorm': <class 'rtcog.preproc.preproc_steps.TnormStep'>, 'windowing': <class 'rtcog.preproc.preproc_steps.WindowingStep'>}
run(pipeline)[source]
save_nifti(pipeline)[source]
snapshot()[source]

Return data_out for testing purposes

start_step(pipeline)[source]
class rtcog.preproc.preproc_steps.SmoothStep(*, save=False, suffix='.pp_Smooth.nii', Nv, Nt, fwhm=4)[source]

Bases: PreprocStep

Smoothing with Gaussian filter

class rtcog.preproc.preproc_steps.SnormStep(*, save=False, Nv, Nt, suffix='.pp_Zscore.nii')[source]

Bases: PreprocStep

Spatial normalization

class rtcog.preproc.preproc_steps.TnormStep(*, save=False, Nv, Nt, suffix='.pp_Tnorm.nii', nvols_to_compute=50)[source]

Bases: PreprocStep

Temporal normalization

class rtcog.preproc.preproc_steps.WindowingStep(*, save=False, suffix='.pp_Windowed.nii', Nv, Nt, win_length=4)[source]

Bases: PreprocStep

class rtcog.preproc.preproc_steps.iGLMStep(*, save=False, suffix='.pp_iGLM.nii', Nv, Nt, num_polorts=2, iGLM_motion=True)[source]

Bases: PreprocStep

Incremental generalized linear model

rtcog.preproc.step_types module

Enum of possible PreprocSteps.

class rtcog.preproc.step_types.StepType(value)[source]

Bases: Enum

An enumeration.

EMA = 'ema'
IGLM = 'iglm'
KALMAN = 'kalman'
SMOOTH = 'smooth'
SNORM = 'snorm'
WINDOWING = 'windowing'

Module contents