Source code for rtcog.processor.basic_processor

import sys
import numpy as np

from rtcog.preproc.pipeline import Pipeline
from rtcog.utils.log import set_logger
from rtcog.utils.fMRI import load_fMRI_file

[docs] class BasicProcessor: """ Class representing a real-time fMRI processor. This class handles the setup of the data processing stream, initialization of the preprocessing pipeline, and the management of incoming data from the scanner. Parameters ---------- options : Options Configuration object containing experiment parameters (e.g., TR, number of volumes, paths). sync : SyncEvents Container for multiprocessing synchronization primitives used in experiment. """ def __init__(self, options, sync, **kwargs): self.log = set_logger(options.debug, options.silent) self.sync = sync self.exp_type = options.exp_type self.n = 0 # Counter for number of volumes pre-processed (Start = 1) self.t = -1 # Counter for number of received volumes (Start = 0 self.Nv= None # Number of voxels in data mask self.Nt = options.nvols # Number acquisitions self.nvols_discard = options.discard # Number of volumes to discard from any analysis (won't enter pre-processing) self.this_motion = None if self.Nt == 0: self.sync.end.set() raise ValueError('Number of expected volumes (--nvols) is zero.') if options.mask_path is None: self.sync.end.set() raise ValueError('No mask was provided!') try: self.mask_img = load_fMRI_file(options.mask_path) except Exception as e: self.sync.end.set() raise RuntimeError(f'Error loading mask file: {e}') self.mask_Nv = int(np.sum(self.mask_img.get_fdata())) self.log.debug(f'Number of Voxels in user-provided mask: {self.mask_Nv}') if self.mask_Nv == 0: self.sync.end.set() raise ValueError('Provided mask file is empty.') self.pipe = Pipeline(options, self.Nt, self.mask_Nv, self.mask_img) def _compute_TR_data_impl(self, motion, extra): """ Process data for the current TR by passing it to the pipeline. Parameters ---------- motion : list of list[float] List of 6-element motion parameter lists (one per TR). extra : list of list[float] List of voxel values for the current TR, where each sublist contains time series data for a voxel. Returns ------- np.array pipeline.processed_tr (the processed data for this TR) """ self.t += 1 # Keep a record of motion estimates self.this_motion = [i[self.t] for i in motion] self.pipe.motion_estimates.append(self.this_motion) if len(self.this_motion) != 6: raise ValueError(f'Motion not read in correctly. Expected length: 6 | Actual length: {len(self.this_motion)}') this_t_data = np.array([e[self.t] for e in extra]) del extra self.Nv = len(this_t_data) if self.t > 0: if len(this_t_data) != self.Nv: self.log.error(f'Extra data not read in correctly.') self.log.error(f'Expected length: {self.Nv} | Actual length: {len(this_t_data)}') raise ValueError('Extra data not read in correctly.') # Update n (only if not longer a discard volume) if self.t > self.nvols_discard - 1: self.n += 1 return self.pipe.process(self.t, self.n, self.this_motion, this_t_data)
[docs] def compute_TR_data(self, motion, extra): """ Public wrapper for TR processing with logging. Parameters ---------- motion : list of list[float] 6 motion parameters per TR. extra : list of list[float] Voxel-wise time series. Returns ------- int Always returns 1 (for compatibility with callback interface). """ self._compute_TR_data_impl(motion, extra) self.log.info(f' - Time point [t={self.t}, n={self.n}]') return 1
[docs] def end_run(self, save=True): """ Finalize experiment and optionally save outputs. Parameters ---------- save : bool Whether to save final outputs (default: True). """ self.pipe.final_steps(save=save) self.sync.end.set()