Source code for rtcog.utils.core

import os
import sys
import time
import os.path as osp
import shutil
from multiprocessing import Event, Value
from ctypes import c_int
from rtcog.utils.sync import SyncEvents
from rtcog.utils.log import get_logger

log = get_logger()

[docs] def file_exists(path): """Check if file exists.""" if not osp.isfile(path): raise FileNotFoundError(f"File not found: {path}") return path
[docs] def create_sync_events(): """Create multiprocessing infrastructure.""" return SyncEvents( new_tr=Event(), shm_ready=Event(), action_end=Event(), hit=Event(), end=Event(), server_ready=Event(), tr_index=Value(c_int, -1) )
[docs] class SharedClock: """ Minimal clock for tracking elapsed time across processes. This clock is initialized at instantiation and provides a method for retrieving the current time relative to that start point. Attributes ---------- _start_time : float The absolute time (in seconds) when the clock was created, based on `time.perf_counter()`. """ def __init__(self): """ Initialize the SharedClock and store the current time as the reference start point. """ self._start_time = time.perf_counter()
[docs] def now(self): """ Get the current time relative to the clock's start time. Returns ------- float Elapsed time in seconds since the clock was initialized. """ return time.perf_counter() - self._start_time
[docs] def setup_afni(): if os.environ.get("READTHEDOCS") == "True": log.warning("AFNI not loaded. Mocking imports for documentation build") return None, None afni_path = shutil.which('afni') if not afni_path: log.error('++ ERROR: AFNI not found in the system PATH') raise RuntimeError("AFNI not found") abin_path = osp.dirname(afni_path) sys.path.insert(1, abin_path) from afnipy import module_test_lib testlibs = ['signal', 'time'] if module_test_lib.num_import_failures(testlibs): raise RuntimeError("AFNI module import failures") from realtime_receiver import ReceiverInterface from afnipy import lib_realtime as RT return ReceiverInterface, RT