Source code for rtcog.gui.esam_gui

import os
import time
import os.path as osp
import csv
from playsound import playsound

from psychopy import core, event
from psychopy.visual import TextStim, ImageStim
from psychopy.visual.slider import Slider 
from psychopy import prefs

if os.environ.get("READTHEDOCS") != "True":
    prefs.hardware['keyboard'] = 'pygame'

from rtcog.utils.recorder import Recorder
from rtcog.utils.core import get_logger
from rtcog.paths import RESOURCES_DIR
from rtcog.gui.basic_gui import BasicGUI

log = get_logger()

[docs] class EsamGUI(BasicGUI): """ GUI class for Experience Sampling (ESAM) fMRI experiments. Extends `DefaultGUI` to provide oral recording and Likert-style questionnaires after a template "hit". Parameters ---------- opts : Options Configuration options for the experiment run. shared_responses : multiprocessing.Manager().dict Shared dictionary for returning participant responses. clock : SharedClock, optional Clock for timing events during latency testing. """ def __init__(self, *, opts, clock=None, shared_responses, **kwargs): super().__init__(opts, clock=clock) self.hitID = 1 self.key_left = self.exp_info['leftKey'] self.key_right = self.exp_info['rightKey'] self.key_select = self.exp_info['acceptKey'] self.red_color = [0.4, -0.9, -0.9] self.likert_order = None self.recorder = Recorder(channels=1) self.responses = {} self.shared_responses = shared_responses # Recording Screen self.rec_inst = [ TextStim(win=self.ewin, text='Describe aloud what you were', pos=(0.0, 0.54)), TextStim(win=self.ewin, text='when you heard the beep', pos=(0.0,0.3)), TextStim(win=self.ewin, text='Press any key to stop recording', pos=(0.0,-0.3)), TextStim(win=self.ewin, text='when you finish.', pos=(0.0,-0.42)), ImageStim(win=self.ewin, image=osp.join(RESOURCES_DIR,'microphone_pic.png'), pos=(-0.5,0.0), size=(.2,.2)) ] self.rec_chair = TextStim(win=self.ewin, text='[ RECORDING ]', color=self.red_color, pos=(0.0,0.0), bold=True) # Post-recording screen self.mic_ack_rec = [ TextStim(win=self.ewin, text='Recoding Successful', pos=(0.0,0.06), color='green', bold=True), TextStim(win=self.ewin, text='Thank you!', pos=(0.0,-0.06), color='green', bold=True) ] # Likert Instructions self.likert_inst = [ TextStim(win=self.ewin, text='Now, please use the response box', pos=(0.0, 0.78)), TextStim(win=self.ewin, text='to answer additional questions', pos=(0.0, 0.66)), TextStim(win=self.ewin, text='about what you were experiencing', pos=(0.0, 0.54)), TextStim(win=self.ewin, text='right before the beep', pos=(0.0, 0.42)), TextStim(win=self.ewin, text='Press any key when ready', pos=(0.0, -0.42)), ImageStim(win=self.ewin, image=osp.join(RESOURCES_DIR,'resp_box_form.png'), pos=(0.0,0.0), size=(0.6,0.55)) ] # Likert Questions self.likert_questions = opts.likert_questions # Likert slider opts self.slider_opts = { 'win': self.ewin, 'pos': (0, -0.1), 'size': (1.2, 0.2), 'labelColor': 'black', 'markerColor': self.red_color, 'lineColor': 'black', 'granularity': 1, 'labelHeight': 0.05, 'font': 'Arial' }
[docs] def record_oral_descr(self): """ Record the participant’s oral description after a hit. Displays recording screen and plays a sound cue. Saves audio to a WAV file. """ event.clearEvents() self._draw_stims(self.rec_inst + [self.rec_chair]) playsound(osp.join(RESOURCES_DIR, 'bike_bell.wav')) rec_path = osp.join(self.out_dir, self.out_prefix + '.hit' + str(self.hitID).zfill(3) + '.wav') with self.recorder.open(rec_path, 'wb') as rec_file: rec_file.start_recording() clock = core.Clock() toggle_interval = 0.6 last_toggle_time = 0 while not event.getKeys(): # Toggle [ RECORDING ] text every 0.6sec if clock.getTime() - last_toggle_time >= toggle_interval: last_toggle_time = clock.getTime() self.rec_chair.text = "[ RECORDING ]" if not self.rec_chair.text else "" self._draw_stims(self.rec_inst + [self.rec_chair]) core.wait(0.1) rec_file.stop_recording()
[docs] def draw_ack_recording_screen(self): """ Display a confirmation screen after recording completes. """ self._draw_stims(self.mic_ack_rec) time.sleep(1)
[docs] def draw_likert_instructions(self): """ Show instructions before presenting the Likert questions. """ self._draw_stims(self.likert_inst) event.waitKeys()
[docs] def draw_likert_questions(self, order=None): """ Display a sequence of Likert-style questions for participant response. Parameters ---------- order : list of int, optional Order in which to present the questions. Defaults to sequential. Returns ------- dict Dictionary of {question_name: (rating, rt)} for each response. """ responses = {} if order is None: order = range(len(self.likert_questions)) for q_idx in order: q = self.likert_questions[q_idx] q_text = TextStim(win=self.ewin, text=q['text'], pos=(0.0, 0.2), color='black',height=0.1) labels = q.get('labels', ['Strongly\ndisagree', 'Somewhat\ndisagree', 'Neutral', 'Somewhat agree', 'Strongly agree']) # Fall back on default labels ticks = list(range(1, len(labels) + 1)) slider = Slider( **self.slider_opts, ticks=ticks, labels=labels, startValue=(len(labels) // 2) + 1 ) slider.markerPos = slider.startValue current_pos = slider.startValue rating = None event.clearEvents() slider.markerPos = current_pos self._draw_stims([q_text, slider]) clock = core.Clock() while True: keys = event.getKeys() for key in keys: if key == self.key_left and current_pos > ticks[0]: current_pos -= 1 elif key == self.key_right and current_pos < ticks[-1]: current_pos += 1 elif key == self.key_select: rating = current_pos break elif key in ['escape', 'q']: self.ewin.close() core.quit() slider.markerPos = current_pos self._draw_stims([q_text, slider]) if rating is not None: rt = clock.getTime() break responses[q['name']] = (rating, rt) core.wait(0.5) self.ewin.flip() return responses
[docs] def run_full_action(self): """ Run the full QA block: record voice, show Likert questions, and save results. Returns ------- dict Dictionary of Likert responses for this action. """ # 1) Play beep and record oral description self.record_oral_descr() # 2) Acknowledge successful recording self.draw_ack_recording_screen() # 3) Show instructions for likert questions self.draw_likert_instructions() # 4) Do the Likert questionnaire resp_dict = self.draw_likert_questions(self.likert_order) # 5) Prepare results resp_timestr = time.strftime('%Y%m%d-%H%M%S') resp_path = osp.join(self.out_dir, f'{self.out_prefix}.{resp_timestr}.LikertResponses{str(self.hitID).zfill(3)}.txt') self.shared_responses.clear() self.shared_responses.update(resp_dict) self.responses[resp_path] = resp_dict self.hitID += 1 return resp_dict
[docs] def save_likert_files(self): """ Write all stored Likert response dictionaries to individual text files. """ for resp_path, resp_dict in self.responses.items(): with open (resp_path, 'w') as f: w = csv.writer(f) w.writerow(['question', 'rating', 'rt']) for key, val in resp_dict.items(): rating, rt = val w.writerow([key, rating, round(rt, 2)]) log.debug(f'Likert responses written to {resp_path}') if self.responses: log.info(f'All likert responses saved')