import os
import traceback
import pickle
from rtcog.utils.core import setup_afni
from rtcog.utils.exceptions import VolumeOverflowError
import logging
log = logging.getLogger(__name__)
_ReceiverBase = object
ReceiverInterface = None
RT = None
# Do not load afni stuff on Read the Docs
if os.environ.get("READTHEDOCS") != "True":
ReceiverInterface, RT = setup_afni()
_ReceiverBase = ReceiverInterface
[docs]
class CustomReceiverInterface(_ReceiverBase):
"""Custom AFNI real-time receiver with rtcog extensions."""
def __init__(self, port=None, show_data=False, verb=0, auto_save=True, clock=None, out_path=None):
if ReceiverInterface is None or RT is None:
raise RuntimeError(
"CustomReceiverInterface cannot be used without AFNI"
)
super().__init__()
self.RTI = RT.RTInterface()
if port:
self.RTI.server_port = port
self.verb = verb
level = {
0: logging.ERROR,
1: logging.INFO,
2: logging.DEBUG
}.get(verb, logging.ERROR)
log.setLevel(level)
if not self.RTI:
return
# Show Some part of the data on every TR
self.RTI.show_data = show_data
# callbacks
self.compute_TR_data = None
self.final_steps = None
self.auto_save = auto_save
self.clock = clock
self.timing = {"recv": [], "proc": []}
self.out_path = out_path
[docs]
def process_one_TR(self):
"""return 0 to continue, 1 on valid termination, -1 on error"""
# TODO: clear old data
# del self.RTI.extras, self.RTI.motion
# will have to update this_tr_data logic...
log.debug("++ Entering process_one_TR()")
rv = self.RTI.read_TR_data()
if self.clock and not rv:
recv_time = self.clock.now()
self.timing["recv"].append(recv_time)
print(f"Recv @ {recv_time:.3f}", flush=True)
if rv:
log.error('** process 1 TR: read data failure')
return rv
# if callback is registered
data = None
if self.compute_TR_data:
data = self.compute_TR_data(self.RTI.motion, self.RTI.extras)
if self.clock:
proc_time = self.clock.now()
self.timing["proc"].append(proc_time)
print(f"Proc @ {proc_time:.3f}", flush=True)
if not data:
return 1
return 0
[docs]
def process_one_run(self):
"""repeatedly: process all incoming data for a single run
return 0 on success and 1 on error
"""
log.info("++ Entering process_one_run()")
# wait for the real-time plugin to talk to us
if self.RTI.wait_for_new_run():
return 1
# process one TR at a time until
log.info('-- incoming data')
try:
rv = self.process_one_TR()
while rv == 0:
rv = self.process_one_TR()
except VolumeOverflowError:
log.error(f'++ ERROR: Receiving more volumes from the scanner than expected.')
log.error(f'Exiting experiment...')
if self.final_steps:
self.final_steps()
return
except Exception as e:
log.error(f"++ ERROR: An unexpected error occurred: {e}")
log.error(traceback.format_exc())
if self.final_steps:
if self.auto_save:
self.final_steps()
else:
self.final_steps(save=False)
return
finally:
log.info("-- The life of this program is coming to an end....")
if self.final_steps:
self.final_steps()
if rv > 0:
tail = '(terminating on success)'
else:
tail = '(terminating on error)'
log.info('-- processed %d TRs of data %s' % (self.RTI.nread, tail))
log.info('-' * 60)
return rv
[docs]
def save_timing(self):
with open(self.out_path, 'wb') as f:
pickle.dump(self.timing, f)
log.info(f'Timing saved to {self.out_path}')
[docs]
class MinimalReceiverInterface(CustomReceiverInterface):
"""Receiver interface without any data computation. Used for latency testing."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.timing = {"recv": []}
self.t = 1
[docs]
def process_one_TR(self):
"""Overridden to only track recv time, not proc time."""
log.debug("++ Entering MinimalReceiverInterface.process_one_TR()")
rv = self.RTI.read_TR_data()
if self.clock and not rv:
recv_time = self.clock.now()
self.timing["recv"].append(recv_time)
print(f"Recv @ {recv_time:.3f} - Time point [t={self.t}]", flush=True)
self.t += 1
if rv:
log.error('** process 1 TR: read data failure')
return rv
return 0 # Always continue; no data computation