Source code for rtcog.viz.esam_streaming

import time
import threading
import os.path as osp
from rtcog.utils.shared_memory_manager import SharedMemoryManager
from multiprocessing.managers import ListProxy, DictProxy
import numpy as np
import holoviews as hv
import panel as pn

from rtcog.utils.sync import SyncEvents, ActionState
from rtcog.viz.score_plotter import ScorePlotter
from rtcog.viz.map_plotter import MapPlotter
from rtcog.viz.response_plotter import ResponsePlotter
from rtcog.viz.streaming_config import StreamingConfig
from rtcog.paths import RESOURCES_DIR
from rtcog.utils.log import get_logger

log = get_logger()

hv.extension('bokeh')
pn.extension()

[docs] def run_streamer(streamer_config, sync_events, action_onsets, action_offsets, responses) -> None: """Instantiate streamer object and start thread.""" streamer = ESAMStreamer(streamer_config, sync_events, action_onsets, action_offsets, responses) streamer.run()
[docs] class ESAMStreamer: """ Streamer for realtime fMRI data visualization. This class receives shared memory updates and streams the data live using a Panel server. It streams three plots: - ScorePlotter (match scores) - MapPlotter (activation maps) - ResponsePlotter (behavioral responses) Parameters ---------- config : StreamingConfig Configuration object containing information about the fMRI session. sync_events : SyncEvents Object that handles interprocess synchronization flags and signals. action_onsets : ListProxy Shared list storing the TR indices of action onsets (trial starts). action_offsets : ListProxy Shared list storing the TR indices of action offsets (trial ends). responses : DictProxy Shared dictionary containing responses collected during the experiment. """ def __init__(self, config: StreamingConfig, sync_events: SyncEvents, action_onsets: ListProxy, action_offsets: ListProxy, responses: DictProxy): self._sync = sync_events self._sync.shm_ready.wait() self._last_t = config.matching_opts.match_start - 1 Ntemplates = len(config.template_labels) self._Nt = config.Nt self._match_scores = SharedMemoryManager("match_scores") self._tr_data = SharedMemoryManager("tr_data") self._match_scores_shm = self._match_scores.open() self._tr_data_shm = self._tr_data.open() self._shared_arrs = {} self._shared_arrs["scores"] = np.ndarray((Ntemplates, self._Nt), dtype=np.float32, buffer=self._match_scores_shm.buf) self._shared_arrs["tr_data"] = np.ndarray((config.Nv, config.Nt), dtype=np.float32, buffer=self._tr_data_shm.buf) self._plotters = [ScorePlotter(config), MapPlotter(config)] if responses is not None: self._plotters.append(ResponsePlotter(config, responses)) self._vols_noaction = config.matching_opts.vols_noaction self._action_onsets = action_onsets self._action_offsets = action_offsets self._in_action = False self._cooldown_end = None self._hit = False @property def action_state(self) -> ActionState: """ Construct and return the current ActionState. Returns ------- ActionState An object encapsulating current trial timing state. """ return ActionState( list(self._action_onsets), list(self._action_offsets), self._in_action, self.in_cooldown, self._cooldown_end, self._hit ) @property def in_cooldown(self) -> bool: """ Check whether the streamer is currently in a post-trial cooldown period. Returns ------- bool True if in cooldown, False otherwise. """ if self._action_offsets: return self._last_t < self._action_offsets[-1] + self._vols_noaction return False
[docs] def update(self) -> None: """ Method run in a background thread to stream new data. Waits for new TRs and passes the appropriate data to each plotter. """ while not self._sync.end.is_set(): t = self._last_t + 1 # Wait for new data for next TR, or sleep briefly if t >= self._Nt or t > self._sync.tr_index.value: time.sleep(0.01) continue self._sync.new_tr.wait(timeout=0.05) # Now data for t is available, so increment and process self._last_t = t self._update_action_state(t) for plotter in self._plotters: if not plotter.should_update(t, self.action_state): continue data = None key = plotter.data_key if key in self._shared_arrs: data = self._shared_arrs[key][:, t] plotter.update(t, data, self.action_state)
[docs] def run(self) -> None: """ Start the background update thread and launch the Panel visualization server. """ try: threading.Thread(target=self.update, daemon=True).start() panels = [ getattr(p, 'dmap', getattr(p, 'pane', None)) for p in self._plotters if hasattr(p, 'dmap') or hasattr(p, 'pane') ] template = pn.template.FastListTemplate( title="Live Data", logo=osp.join(RESOURCES_DIR, "brain_white.png"), theme_toggle=False, ) template.main.extend(panels) self._server = pn.serve( template, start=True, show=True, threaded=True, port=5006 ) self._sync.server_ready.set() self._sync.end.wait() finally: self._shutdown()
def _update_action_state(self, t: int) -> None: """ Update internal action state based on the current TR. Parameters ---------- t : int Current TR index. """ self._hit = False if t in self._action_onsets and not self._in_action: self._hit = True self._in_action = True elif self._action_offsets and t in self._action_offsets: self._in_action = False self._cooldown_end = t + self._vols_noaction def _shutdown(self) -> None: """ Stop the server, close shared memory, and shut down all plotters. """ log.info("Shutting down Panel server...") if self._server: self._server.stop() self._close_shared_memory() for p in self._plotters: p.close() def _close_shared_memory(self) -> None: """ Close and unlink shared memory segments. """ if hasattr(self, '_match_scores'): self._match_scores.cleanup() if hasattr(self, '_tr_data'): self._tr_data.cleanup()