# ensure directory is set first, e.g. cd 'Documents/Jack/PictureCloze/02 Experiment' # %% General Setup from psychopy import visual, core, monitors, event, data, gui import csv import os import codecs import datetime import numpy as np from matplotlib import pyplot as plt from pandas import read_csv from itertools import groupby from random import randint # %% Participant Info comp_name = os.environ['COMPUTERNAME'] print('Detected computer name: \'{}\''.format(comp_name)) print('Building dialog box. Enter the participant and experiment info into the GUI...') myDlg = gui.Dlg(title="PictureCloze Behavioural Experiment") myDlg.addText('Participant Info') myDlg.addField('Subject ID:') myDlg.addField('Sex:', choices=['', 'f', 'm', 'nb']) myDlg.addField('Age:', 25) myDlg.addField('Monolingual:', choices=['', 'yes', 'no']) myDlg.addField('Native English:', choices=['', 'yes', 'no']) myDlg.addField('Dominant Hand:', choices=['', 'right', 'left']) myDlg.addText('Experiment Info') myDlg.addField('Response Group:', choices=['', 1, 2]) myDlg.addField('Stimulus Group:', choices=['', 1, 2]) myDlg.addField('Session Number:', 1) participant_info_gui = myDlg.show() participant_info = { 'subj_id': int(participant_info_gui[0]), 'session_nr': participant_info_gui[8], 'stim_grp': int(participant_info_gui[7]), # 1/2 'resp_grp': int(participant_info_gui[6]), # 1/2 'monolingual': participant_info_gui[3], 'native_english': participant_info_gui[4], 'dom_hand': participant_info_gui[5], 'sex': participant_info_gui[1], # m/f/nb 'age': participant_info_gui[2] } # %% Check participant info makes sense def is_int(s): try: int(s) return True except ValueError: return False pinfo_check = { 'subj_id_is_int': isinstance(participant_info['subj_id'], int), 'stim_grp_1_or_2': participant_info['stim_grp'] in [1, 2], 'resp_grp_1_or_2': participant_info['resp_grp'] in [1, 2], 'monolingual': participant_info['monolingual'] in ['yes', 'no'], 'native_english': participant_info['native_english'] in ['yes', 'no'], 'dom_hand': participant_info['dom_hand'] in ['right', 'left'], 'sex_m_f_or_nb': participant_info['sex'] in ['m', 'f', 'nb'], 'age_is_int': isinstance(participant_info['age'], int) } if not all(value for value in pinfo_check.values()): print('Participant info error!') print(pinfo_check) exit() # set up monitor mon_Hz = 60 # refresh rate mon_framelen = 1/mon_Hz # length of one frame in seconds mon = monitors.Monitor(name='UpstairsBooth', notes='{0}Hz'.format(mon_Hz)) mon.setWidth(52) mon.setDistance(48) mon.setSizePix((1920, 1080)) mon.saveMon() # set up the Psychopy window win = visual.Window(fullscr=True, size=mon.getSizePix(), screen=0, monitor=mon, units='deg') # hide the mouse event.Mouse(visible=False) win.refreshThreshold = mon_framelen + 0.001 # tolerance of 1 ms (any refresh that takes more than the tolerance longer than expected = dropped frame) #win.recordFrameIntervals = True # record actual frame lengths # set up which button is affirmative, and which negative if participant_info['resp_grp'] == 1: resp_setup = {'yes': 'rctrl', 'no': 'lctrl'} elif participant_info['resp_grp'] == 2: resp_setup = {'yes': 'lctrl', 'no': 'rctrl'} resp_setup_names = {'lctrl': 'Left Control', 'rctrl': 'Right Control'} # %% Trial Setup # import trials data trls = read_csv('01-stim_tidy_long.csv') practice_trls = read_csv('01-practice_stim.csv') # filter depending on stimulus group # - if subject stim group is 1 all trials with order group of 1 will be condition A1 (picture-stimulus match), and order group of 2 will be condition A2 # - if subject stim group is 2 all trials with order group of 2 will be condition A1 (picture-stimulus match), and order group of 1 will be condition A2 trls = trls[ ( (trls.order_grp==participant_info['stim_grp']) & (trls.condition=='A1') ) | ( (trls.order_grp!=participant_info['stim_grp']) & (trls.condition=='A2') ) ] # reorder randomly, and ensure no more than five of the same trial type (cong/incong) follow each other shuffle_iter = 0 max_consec_trltypes = 0 while shuffle_iter==0 or max_consec_trltypes>5: # shuffle randomly trls = trls.sample(frac=1).reset_index(drop=True) # count maximum number of consecutive trial types trls['is_A1'] = np.where(trls['condition']=='A1', 1, 0).astype(str) trls['is_A2'] = np.where(trls['condition']=='A2', 1, 0).astype(str) A1s = ''.join(trls['is_A1'].tolist()) A2s = ''.join(trls['is_A2'].tolist()) max_consec_A1s = [int(len(x)) for x in A1s.split('0')] max_consec_A2s = [int(len(x)) for x in A2s.split('0')] max_consec_trltypes = max(max_consec_A1s + max_consec_A2s) shuffle_iter+=1 print('Reshuffled trials (iter {}), with a maximum of {} consecutive trial types'.format(shuffle_iter, max_consec_trltypes)) print('Successfully reshuffled trials\n') # %% Output setup subj_data_path = 'data{0}{1}_{2}_{3}.csv'.format(os.path.sep, participant_info['subj_id'], datetime.datetime.now().strftime('%d-%m-%y'), participant_info['session_nr']) # function to append a trial's data (in dictionary format) to csv def write_csv(fileName, thisTrial): full_path = os.path.abspath(fileName) directory = os.path.dirname(full_path) if not os.path.exists(directory): os.makedirs(directory) if not os.path.isfile(full_path): with codecs.open(full_path, 'ab+', encoding='utf8') as f: csv.writer(f, delimiter=',').writerow(thisTrial.keys()) csv.writer(f, delimiter=',').writerow(thisTrial.values()) else: with codecs.open(full_path, 'ab+', encoding='utf8') as f: csv.writer(f, delimiter=',').writerow(thisTrial.values()) # %% Instructions def instructions(reminder = False, practice = False, wait_time = 3): print('DISPLAYING INSTRUCTIONS') if practice: practice_txt = u'For the practice trials, you will be given feedback on your accuracy for each trial.' else: practice_txt = u'Unlike the practice trials, you will not be given feedback on your accuracy for each trial.' instr_txt = u'In each trial, the following things will happen:\n\n1) You will be shown a picture of an object for 2 seconds.\n2) There will be a short delay.\n3) You will then be shown a word:\n\n Press the {0} key if the word describes the object you saw.\n OR\n Press the {1} key if it does not.\n\n Try to be as fast and accurate as possible.\n\n {2}\n\n\nWhen you have read these instructions, press the space key to begin...'.format(resp_setup_names[resp_setup['yes']], resp_setup_names[resp_setup['no']], practice_txt) if reminder: instr_txt = u'{0}{1}'.format('Here\'s a reminder of the task instructions.\n\n\n', instr_txt) instr_screen = visual.TextStim( win, units='norm', height=0.07, wrapWidth=1.75, text=instr_txt ) instr_screen.draw() win.flip() core.wait(wait_time) # ensure that instructions have been read event.waitKeys(keyList='space') print('SPACE PRESSED; CONTINUING') # %% Trials setup # functions for precise frame-wise timing def ms_2_flips(ms = 100, Hz = 60, round_func = np.round): return int(round_func(ms / (1/Hz*1000))) def display_n_flips(stim, n_flips = 10): for frame_n in range(n_flips): stim.draw() win.flip() def display_x_ms(stim, ms = 100, Hz = 60, round_func = np.round): display_n_flips(stim, ms_2_flips(ms, Hz, round_func)) def display_jitter_ms(stim, min_ms = 100, max_ms = 500, Hz = 60, round_func = np.round): flips_min = ms_2_flips(min_ms, mon_Hz, round_func) flips_max = ms_2_flips(max_ms, mon_Hz, round_func) n_flips = randint(flips_min, flips_max) display_n_flips(stim, n_flips) return(n_flips, n_flips*(1/Hz*1000)) def display_list_n_flips(stim_list, n_flips = 10): for frame_n in range(n_flips): for stim_i in stim_list: stim_i.draw() win.flip() def display_list_ms(stim_list, ms = 100, Hz = 60, round_func = np.round): display_list_n_flips(stim_list, ms_2_flips(ms, Hz, round_func)) def display_list_jitter_ms(stim_list, min_ms = 100, max_ms = 500, Hz = 60, round_func = np.round): flips_min = ms_2_flips(min_ms, mon_Hz, round_func) flips_max = ms_2_flips(max_ms, mon_Hz, round_func) n_flips = randint(flips_min, flips_max) display_list_n_flips(stim_list, n_flips) return(n_flips, n_flips*(1/Hz*1000)) # function that returns a list containing the objects in the Thaler et al. fixation point # can then draw the list contents in a loop def thaler_list_fix(win, units='deg', circlesColor=[1,1,1], circlesColorSpace='rgb', outerCircleDiameter=0.6, innerCircleDiameter=0.2, pos = (0, 0)): crossColor=win.color.tolist() crossColorSpace=win.colorSpace outerCircle = visual.Circle(win, units=units, pos=pos, radius=outerCircleDiameter/2, lineColor=circlesColor, fillColor=circlesColor, fillColorSpace=circlesColorSpace, lineColorSpace=circlesColorSpace) crossVertical = visual.Rect(win, units=units, pos=pos, height=outerCircleDiameter*1.1, width=innerCircleDiameter/2, lineColor=None, fillColor=crossColor, fillColorSpace=crossColorSpace, lineColorSpace=crossColorSpace) crossHorizontal = visual.Rect(win, units=units, pos=pos, height=innerCircleDiameter/2, width=outerCircleDiameter*1.1, lineColor=None, fillColor=crossColor, fillColorSpace=crossColorSpace, lineColorSpace=crossColorSpace) innerCircle = visual.Circle(win, units=units, pos=pos, radius=innerCircleDiameter/2, lineColor=circlesColor, fillColor=circlesColor, fillColorSpace=circlesColorSpace, lineColorSpace=circlesColorSpace) return [outerCircle, crossVertical, crossHorizontal, innerCircle] # function that will be called for each trial def trial(image_path='boss{}8ball.jpg'.format(os.path.sep), string='ball', text_font='Courier New'): # a fixation point in style of Thaler et al. thaler_fix_stim = thaler_list_fix(win=win) # a blank stimulus blank_stim = visual.TextStim(win=win, text='') # the image to display img_stim = visual.ImageStim( win = win, image = image_path, pos = (0.0, 0.0), size = 15, # 7.0403 degrees at 48 inches distance units = 'cm' ) # the text stimulus text_stim = visual.TextStim( win = win, text = string, height = 1.5, units = 'deg', font = text_font ) # fixation 1 display_list_ms(thaler_fix_stim, 300) # note that the list version of the function is required, as thaler_list_fix returns a list object fix1_jitt = display_jitter_ms(blank_stim, 300, 1300) # image display_x_ms(img_stim, 2000) # fixation 2 display_list_ms(thaler_fix_stim, 300) # note that the list version of the function is required, as thaler_list_fix returns a list object fix2_jitt = display_jitter_ms(blank_stim, 300, 1300) # text text_stim.draw() win.flip() word_rt_clock = core.MonotonicClock() # response word_resp = event.waitKeys(keyList=['lctrl', 'rctrl'], timeStamped = word_rt_clock) trl_dat = { 'fix1_jitt': fix1_jitt, 'fix2_jitt': fix2_jitt, 'word_resp': word_resp } return(trl_dat) def block_break(): # read the subject's data subj_data_so_far = read_csv(subj_data_path) accuracies = subj_data_so_far['acc'].to_numpy() blocks = subj_data_so_far['block_nr'].to_numpy() block_acc_txt = '' for block_nr in np.unique(blocks): block_acc_txt = '{0}\n Block {1}: {2}%'.format(block_acc_txt, block_nr, np.round(np.mean(accuracies[blocks==block_nr] * 100), 1)) # give subject summary of their data break_msg = visual.TextStim( win, units='norm', height=0.07, wrapWidth=1.75, text=u'Block {0} complete. Time for a break!\n\n\nYour overall accuracy so far is: {1}%\n\nHere\'s a per-block summary of your experiment so far:{2}\n\nPress the space key when you\'re ready to continue...'.format(np.max(blocks), np.round(np.mean(accuracies * 100), 1), block_acc_txt) ) break_msg.draw() win.flip() print('BREAK STARTED') event.waitKeys(keyList='space') print('BREAK ENDED') # %% Give feedback after experiment def experiment_end(): # change background to black win.color = [-1,-1,-1] win.flip() # Tell the participant the experiment is over end_msg = visual.TextStim( win, units='norm', height=0.07, wrapWidth=1.75, text=u'Experiment complete! Please let the experimenter know...', pos=(0, 0.9) ) loading_msg = visual.TextStim( win, units='norm', height=0.07, wrapWidth=1.75, text=u'Loading summary statistics...', pos=(0, 0) ) end_msg.draw() loading_msg.draw() win.flip() # read the subject's data subj_data = read_csv(subj_data_path) # order by item number subj_data = subj_data.sort_values(by = ['item_nr']) # subject's data as arrays accuracies = subj_data['acc'].to_numpy() RTs = subj_data['rt'].to_numpy() blocks = subj_data['block_nr'].to_numpy() conditions = subj_data['condition'].to_numpy() # read item info to get predictability info items_data = read_csv('01-stim_tidy_long.csv') # only include items for the subject's response condition subj_stim_grp = np.unique(subj_data['stim_grp'].to_numpy()).item(0) items_data = items_data[ ( (items_data.order_grp==subj_stim_grp) & (items_data.condition=='A1') ) | ( (items_data.order_grp!=subj_stim_grp) & (items_data.condition=='A2') ) ] # get the data as numpy arrays perc_name_agree = items_data['perc_name_agree'].to_numpy() # plot summary of subject's data plt.style.use('dark_background') figsize_inch = [16, 6] fig = plt.figure(figsize=figsize_inch, dpi=300) plt.subplot(2, 5, 1) binwidth = 20 bins = np.arange(0, np.max(RTs) + binwidth, binwidth) plt.hist(RTs, bins = bins) plt.title('Your Response Times (RTs)') plt.xlabel('RT (bin width = 20ms)') plt.ylabel('Count') plt.subplot(2, 5, 2) plt.plot(range(1, len(RTs)+1), RTs) plt.title('Your RTs across Trials') plt.xlabel('Trial') plt.ylabel('RT (ms)') plt.subplot(2, 5, 3) # collect data in blocks per_block_RTs = [] for block_nr in np.unique(blocks): per_block_RTs.append(RTs[blocks == block_nr]) v_parts = plt.violinplot(per_block_RTs, showmedians=True, widths = 0.75) #print(v_parts['bodies'][0]._facecolors) plt.xticks(np.arange(np.min(np.unique(blocks)), np.max(np.unique(blocks))+1, 1)) plt.title('Your RTs across Blocks') plt.xlabel('Block') plt.ylabel('RT (ms)') plt.subplot(2, 5, 4) # collect data in blocks per_cond_RTs = [] for condition_nr in np.sort(np.unique(conditions)): per_cond_RTs.append(RTs[conditions == condition_nr]) plt.violinplot(per_cond_RTs, showmedians=True, widths = 0.6) plt.xticks([1, 2], ['Congruent', 'Incongruent']) plt.title('Your RTs across Conditions') plt.xlabel('Condition') plt.ylabel('RT (ms)') plt.subplot(2, 5, 5) plt.title('Your RTs across Predictability') x = perc_name_agree[conditions == "A1"] y = RTs[conditions == "A1"] plt.scatter(x, y, c = [[0.55294118 * 0.3, 0.82745098 * 0.3, 0.78039216 * 0.3]]) coef = np.polyfit(x, y, 1) poly1d_fn = np.poly1d(coef) plt.plot(x, poly1d_fn(x)) plt.xlabel('Predictability (%)') plt.ylabel('RT (ms)') plt.subplot(2, 5, 6) resp_type, resp_counts = np.unique(accuracies, return_counts=True) plt.bar(resp_type, resp_counts) plt.xticks([0, 1], ['Incorrect', 'Correct']) plt.title('Your Accuracy ({0}% Overall)'.format(np.round(np.mean(accuracies * 100), 1))) plt.xlabel('Accuarcy') plt.ylabel('Count') plt.subplot(2, 5, 7) plt.title('Your Incorrect Trials') plt.bar(range(1, len(RTs)+1), np.abs(accuracies-1)) plt.xlabel('Trial') plt.ylabel('Accuracy') plt.subplot(2, 5, 8) per_block_accs = [] for block_nr in np.unique(blocks): per_block_accs.append(np.mean(accuracies[blocks == block_nr]) * 100) plt.bar(np.unique(blocks), per_block_accs) plt.xticks(np.arange(np.min(np.unique(blocks)), np.max(np.unique(blocks))+1, 1)) plt.ylim(0, 100) plt.title('Your Acc. across Blocks') plt.xlabel('Block') plt.ylabel('Accuracy (%)') plt.subplot(2, 5, 9) per_cond_accs = [] for condition_nr in np.sort(np.unique(conditions)): per_cond_accs.append(np.mean(accuracies[conditions == condition_nr]) * 100) plt.bar(['Congruent', 'Incongruent'], per_cond_accs) plt.ylim(0, 100) plt.title('Your Acc. across Conditions') plt.xlabel('Condition') plt.ylabel('Accuracy (%)') def rand_jitter(arr): stdev = .025*(max(arr)-min(arr)) return arr + np.random.randn(len(arr)) * stdev def jitter_height(x, y, s=20, c='b', marker='o', cmap=None, norm=None, vmin=None, vmax=None, alpha=None, linewidths=None, verts=None, hold=None, **kwargs): return plt.scatter(x, rand_jitter(y), s=s, c=c, marker=marker, cmap=cmap, norm=norm, vmin=vmin, vmax=vmax, alpha=alpha, linewidths=linewidths, verts=verts, **kwargs) plt.subplot(2, 5, 10) plt.title('Your Acc. across Predictability') x = perc_name_agree[conditions == "A1"] y = accuracies[conditions == "A1"]*100 jitter_height(x, y, c = [[0.55294118 * 0.3, 0.82745098 * 0.3, 0.78039216 * 0.3]]) coef = np.polyfit(x, y, 1) poly1d_fn = np.poly1d(coef) plt.plot(x, poly1d_fn(x)) plt.xlabel('Predictability (%)') plt.ylabel('Accuracy (jittered %)') # adjust spacing plt.subplots_adjust(left = 0.125, right = 0.9, bottom = 0.1, top = 0.9, wspace = 0.5, hspace = 0.8) # save figure image_path = 'tmp{0}subj_summ.png'.format(os.path.sep) fig.savefig(image_path) end_plot = visual.ImageStim( win, image=image_path, size=[i * 2.54 * 1.5 for i in figsize_inch], units='cm' ) end_msg.draw() end_plot.draw() win.flip() print('PRESS \'SPACE\' KEY TO EXIT THE EXPERIMENT') event.waitKeys(keyList='space') print('EXITED') # %% Practice Trials practice = True this_is_a_repeat_practice_block = False while practice: instructions(reminder = this_is_a_repeat_practice_block, practice = True) practice_accuracies = [] practice_trls = practice_trls.sample(frac=1).reset_index(drop=True) for trl_i in range(len(practice_trls.index)): condition = practice_trls['condition'][trl_i] trl_dat = trial( image_path = 'boss{}{}.jpg'.format(os.path.sep, practice_trls['filename'][trl_i]), string = practice_trls['string'][trl_i] ) if condition == 'A1': corr_ans = resp_setup['yes'] elif condition == 'A2': corr_ans = resp_setup['no'] if corr_ans == trl_dat['word_resp'][0][0]: trl_corr = 1 else: trl_corr = 0 practice_accuracies.append(trl_corr) print('PRACTICE TRL', '{}/{}'.format(trl_i+1, len(practice_trls.index)), ', COND', condition, ', CORR_ANS', corr_ans, ', RESP', trl_dat['word_resp'][0][0], ', ACC', trl_corr, ', RT', round(trl_dat['word_resp'][0][1] * 1000, 1), ', IMAGE', practice_trls['filename'][trl_i], ', WORD', practice_trls['string'][trl_i], ', FIX1_JITT', round(trl_dat['fix1_jitt'][1], 1), ', FIX2_JITT', round(trl_dat['fix2_jitt'][1], 1)) if trl_corr == 1: feedback_txt = 'CORRECT!' feedback_col = [-1, 1, -1] else: feedback_txt = 'INCORRECT!' feedback_col = [1, -1, -1] feedback_msg = visual.TextStim( win = win, text = feedback_txt, height = 1.5, units = 'deg', color = feedback_col, colorSpace = 'rgb', font = 'Courier New' ) display_x_ms(feedback_msg, 1000) retry_msg = visual.TextStim( win, units='norm', height=0.07, wrapWidth=1.75, text='Practice trials complete! Your accuracy in the practice trials was {}%\n\nWould you like to retry the practice trials?\n Press \'y\' to try the practice trials again\n Press \'n\' to continue to the proper experiment'.format(np.round(np.sum(practice_accuracies)/len(practice_trls.index)*100, 1)) ) retry_msg.draw() win.flip() print('PRACTICE BLOCK ACCURACY: {0}/{1} ({2}%)'.format(np.sum(practice_accuracies), len(practice_trls.index), np.round(np.sum(practice_accuracies)/len(practice_trls.index)*100, 1))) print('REPEAT PRACTICE TRIALS? Y/N') repeat_yn = event.waitKeys(keyList=['y', 'n']) if repeat_yn == ['y']: print('REPEATING PRACTICE TRIALS') this_is_a_repeat_practice_block = True else: print('ENDING PRACTICE TRIALS') practice = repeat_yn == ['y'] # %% Experimental Trials instructions(reminder = True, wait_time = 2) block_nr = 1 for trl_i in range(len(trls.index)): # at start of trial, check if time for a break # (as trl_i is an index, will take a break if previous trial was the last before a break is due) if trl_i % 40 == 0 and trl_i != 0: block_break() instructions(reminder = True, wait_time = 2) block_nr += 1 condition = trls['condition'][trl_i] trl_dat = trial( image_path = 'boss{}{}.jpg'.format(os.path.sep, trls['filename'][trl_i]), string = trls['string'][trl_i] ) if condition == 'A1': corr_ans = resp_setup['yes'] elif condition == 'A2': corr_ans = resp_setup['no'] if corr_ans == trl_dat['word_resp'][0][0]: trl_corr = 1 else: trl_corr = 0 trl_dat_tidy = { 'date': datetime.datetime.now().strftime('%d-%m-%y'), 'computer_name': comp_name, 'trial_save_time': datetime.datetime.now().strftime('%H:%M:%S'), 'subj_id': participant_info['subj_id'], 'stim_grp': participant_info['stim_grp'], 'resp_grp': participant_info['resp_grp'], 'sex': participant_info['sex'], 'age': participant_info['age'], 'dom_hand': participant_info['dom_hand'], 'monolingual': participant_info['monolingual'], 'native_english': participant_info['native_english'], 'block_nr': block_nr, 'trl_nr': trl_i + 1, 'item_nr': trls['item_nr'][trl_i], 'condition': condition, 'image': trls['filename'][trl_i], 'string': trls['string'][trl_i], 'corr_ans': corr_ans, 'resp': trl_dat['word_resp'][0][0], 'acc': trl_corr, 'rt': trl_dat['word_resp'][0][1] * 1000, 'fix1_jitt_flip': trl_dat['fix1_jitt'][0], 'fix1_jitt_ms': trl_dat['fix1_jitt'][1], 'fix2_jitt_flip': trl_dat['fix2_jitt'][0], 'fix2_jitt_ms': trl_dat['fix2_jitt'][1] } write_csv(subj_data_path, trl_dat_tidy) print('TRL', '{}/{}'.format(trl_i+1, len(trls.index)), ', COND', condition, ', CORR_ANS', corr_ans, ', RESP', trl_dat['word_resp'][0][0], ', ACC', trl_corr, ', RT', round(trl_dat['word_resp'][0][1] * 1000, 1), ', IMAGE', trls['filename'][trl_i], ', WORD', trls['string'][trl_i], ', FIX1_JITT', round(trl_dat['fix1_jitt'][1], 1), ', FIX2_JITT', round(trl_dat['fix2_jitt'][1], 1)) # %% End experiment and print summary stats print('EXPERIMENT END. Overall, {0} frames were dropped.'.format(win.nDroppedFrames)) experiment_end()