import numpy as np
from scipy.special import legendre
from scipy import ndimage
import logging
from rtcog.utils.fMRI import unmask_fMRI_img, mask_fMRI_img
log = logging.getLogger('online_preproc')
[docs]
def gen_polort_regressors(polort, nt):
"""
Generate Legendre polynomials of a given order for nuisance regression purposes.
Parameters
----------
polort : int
Maximum polynomial order to generate.
nt : int
Number of data points.
Returns
-------
out : np.ndarray, shape (nt, polort)
Legendre polynomial regressors.
"""
out = np.zeros((nt, polort))
for n in range(polort):
Pn = legendre(n)
x = np.linspace(-1.0, 1.0, nt)
out[:,n] = Pn(x).T
return(out)
# Kalman Filter Functions
# =======================
[docs]
def welford(k, x, M, S):
# inputs np.array(Nv,)
Mnext = M + (x-M)/k
Snext = S + (x-M)*(x-Mnext)
if k == 1:
std = np.zeros(x.shape[0])
else:
std = np.sqrt(Snext/(k-1))
return Mnext, Snext, std
# Smoothing Functions
# ===================
def _smooth_array(arr, affine, fwhm=None, ensure_finite=True, copy=True):
"""
Smooth images by applying a Gaussian filter.
Apply a Gaussian filter along the three first dimensions of arr.
Based on https://github.com/nilearn/nilearn/blob/main/nilearn/image/image.py
Parameters
----------
arr : numpy.ndarray
4D array, with image number as last dimension. 3D arrays are also
accepted.
affine : numpy.ndarray
(4, 4) matrix, giving affine transformation for image. (3, 3) matrices
are also accepted (only these coefficients are used).
If fwhm='fast', the affine is not used and can be None
fwhm : scalar, numpy.ndarray/tuple/list, 'fast' or None
Smoothing strength, as a full-width at half maximum, in millimeters.
If a nonzero scalar is given, width is identical in all 3 directions.
A numpy.ndarray/list/tuple must have 3 elements,
giving the FWHM along each axis.
If any of the elements is zero or None,
smoothing is not performed along that axis.
If fwhm == 'fast', a fast smoothing will be performed with
a filter [0.2, 1, 0.2] in each direction and a normalisation
to preserve the local average value.
If fwhm is None, no filtering is performed (useful when just removal
of non-finite values is needed).
ensure_finite : bool
if True, replace every non-finite values (like NaNs) by zero before
filtering.
copy : bool
if True, input array is not modified. True by default: the filtering
is not performed in-place.
Returns
-------
filtered_arr : numpy.ndarray
arr, filtered.
Notes
-----
This function is most efficient with arr in C order.
"""
# Here, we have to investigate use cases of fwhm. Particularly, if fwhm=0.
# See issue #1537
if isinstance(fwhm, (int, float)) and (fwhm == 0.0):
log.warning("The parameter 'fwhm' for smoothing is specified "
"as {0}. Setting it to None "
"(no smoothing will be performed)"
.format(fwhm))
fwhm = None
if arr.dtype.kind == 'i':
if arr.dtype == np.int64:
arr = arr.astype(np.float64)
else:
arr = arr.astype(np.float32) # We don't need crazy precision.
if copy:
arr = arr.copy()
if ensure_finite:
# SPM tends to put NaNs in the data outside the brain
arr[np.logical_not(np.isfinite(arr))] = 0
if isinstance(fwhm, str) and (fwhm == 'fast'):
arr = _fast_smooth_array(arr)
elif fwhm is not None:
fwhm = np.asarray(fwhm)
fwhm = np.where(fwhm == None, 0.0, fwhm) # noqa: E711
affine = affine[:3, :3] # Keep only the scale part.
fwhm_over_sigma_ratio = np.sqrt(8 * np.log(2)) # FWHM to sigma.
vox_size = np.sqrt(np.sum(affine ** 2, axis=0))
sigma = fwhm / (fwhm_over_sigma_ratio * vox_size)
for n, s in enumerate(sigma):
if s > 0.0:
ndimage.gaussian_filter1d(arr, s, output=arr, axis=n)
return arr
[docs]
def rt_smooth_vol(data_arr, mask_img, fwhm=4):
"""
Apply smoothing to fMRI data volumes.
Parameters
----------
data_arr : np.ndarray, shape (Nvoxels, 1)
The input fMRI data array to be processed.
mask_img : nibabel.Nifti1Image
A binary mask image.
fwhm : float, optional
The full width at half maximum (FWHM) value used to define the smoothing kernel.
The default is 4 mm.
Returns
-------
np.ndarray, shape (Nvoxels, 1)
The smoothed fMRI data.
"""
data_arr = data_arr[:, 0]
x = unmask_fMRI_img(data_arr, mask_img)
x_sm = _smooth_array(x, affine=mask_img.affine, fwhm=fwhm)
x_sm_v = mask_fMRI_img(x_sm, mask_img)
return x_sm_v[:, np.newaxis]
# Temporal Normalization
# ==========================
[docs]
def calculate_spc(current_signal, baseline_signal, mean_removed):
"""
Calculate signal percent change (SPC) for a given timepoint.
The SPC is computed voxel-wise as either:
- `100 * (s / s̄)` if the mean has already been removed (e.g. via EMA)
- `100 * ((s - s̄) / s̄)` otherwise
Parameters
----------
current_signal : np.ndarray, shape (Nvoxels,)
The current volume's voxel intensities after prior preprocessing.
baseline_signal : np.ndarray, shape (Nvoxels,)
The voxel-wise mean signal computed from the baseline period.
mean_removed : bool
Whether the signal mean has already been removed (e.g., via EMA).
If True, use multiplicative SPC; if False, use standard SPC.
Returns
-------
np.ndarray, shape (Nvoxels, 1)
The normalized signal.
"""
if mean_removed:
spc = (current_signal / baseline_signal) * 100
else:
spc = ((current_signal - baseline_signal) / baseline_signal) * 100
return spc[:, np.newaxis]
# Windowing utilities
# ==========================
[docs]
class CircularBuffer:
"""
Circular buffer for storing recent fMRI volumes for windowed operations.
Parameters
----------
Nv : int
Number of voxels in each volume.
size : int
Size of the circular buffer (number of volumes to store).
Attributes
----------
buffer : np.ndarray, shape (Nv, size)
The circular buffer storing the recent volumes.
insert_idx : int
Current insertion index in the buffer.
size : int
Size of the buffer.
full : bool
Whether the buffer has been filled at least once.
"""
def __init__(self, Nv, size):
self.buffer = np.zeros((Nv, size))
self.insert_idx = 0
self.size = size
self.full = False
[docs]
def update(self, data):
self.buffer[:, self.insert_idx] = data.squeeze()
self.insert_idx = (self.insert_idx + 1) % self.size
if self.insert_idx == 0 and not self.full:
self.full = True
if self.full:
return np.concatenate([self.buffer[:, self.insert_idx:], self.buffer[:, :self.insert_idx]], axis=1)
else:
return None