# -*- coding: utf-8 -*- # %% # import modules import os.path as op import glob import pandas as pd import numpy as np import re import mne from mne_icalabel import label_components import mne_bids # import matplotlib as mpl from string import ascii_lowercase chars = [*ascii_lowercase, 'ä', 'ö', 'ü', 'ß'] # %matplotlib qt # %% # settings # maximum number of mistakes (false positives + false negatives) max_mistakes = 50 # max number of channels to interpolate max_chs_interp = 8 # min number of trials in the cleaned dataset min_total_epochs = 0 # minimum number of acceptable trials per character min_trials_per_char = 10 # rejection criteria for trials and channels reject_criteria = dict(eeg=150e-6) # max peak-to-peak amplitude of 150 µV flat_criteria = dict(eeg=5e-7) # min peak-to-peak amplitude of 0.5 µV prop_bad_to_remove_ch = 0.5 # proportion of trials that a given channel needs to be rejected by the above criteria for the whole channel to just be interpolated # %% # import participant list p_list = pd.read_csv(op.join('eeg', 'participants.tsv'), delimiter='\t') # exclude the participant for whom the recording was restarted p_list = p_list.loc[p_list.recording_restarted == 0] # store all metadata from preprocessing all_subj_metadata = [] # %% # preprocess for i, p_row in p_list.iterrows(): subj_id = p_row.participant_id subj_nr = re.sub('^sub-', '', subj_id) # the number as a string, including leading zeroes but without thr 'sub-' prefix, as expected by mne_bids head_circum_cm = p_row.head_circum head_radius_m = (head_circum_cm / 2 / np.pi) / 100 # used in setting up montage - relevant to speherical interpolation subj_eeg_path = mne_bids.BIDSPath( root = 'eeg', subject = subj_nr, task = 'alphabeticdecision', suffix = 'eeg', datatype = 'eeg', extension = 'vhdr' ) subj_events_tsv_path = mne_bids.BIDSPath( root = 'eeg', subject = subj_nr, task = 'alphabeticdecision', suffix = 'events', datatype = 'eeg', extension = 'tsv' ) # import the events file tsv (behavioural data is stored in there) subj_events = pd.read_csv(subj_events_tsv_path, delimiter='\t') # get just the trial information subj_beh = subj_events.loc[(subj_events.trial_type =='target') | (subj_events.trial_type =='nontarget')] n_attempts = 0 do_restart = True new_chs_to_interp = [] while do_restart: n_attempts += 1 assert n_attempts <= 2 # should exclude if fails to work on second attempt print(f'\nPREPROCESSING {subj_id}, attempt {n_attempts}\n') # will contain metadata for this participant subj_metadata = {'subj_id': subj_id} # check the participant didn't make too many mistakes # (note, accuracy is stored as string representation of float) subj_metadata['n_false_neg_resp'] = np.sum((subj_beh['accuracy'] == 0) & (subj_beh['target'] == 1)) # includes RT timeout subj_metadata['n_false_pos_resp'] = np.sum((subj_beh['accuracy'] == 0) & (subj_beh['target'] == 0)) # import EEG raw = mne_bids.read_raw_bids(subj_eeg_path) raw.load_data() # check sampling frequency assert raw.info['sfreq'] == 1000 # set electrode locations ============= mon = mne.channels.read_custom_montage('AC-64.bvef', head_size=head_radius_m) raw.set_montage(mon) # get event triggers ================== events, event_dict = mne.events_from_annotations(raw) # mne.viz.plot_events(events, sfreq=raw.info['sfreq'], first_samp=raw.first_samp, event_id=event_dict) # handling of bad channels ============ # if any extra bad channels, add these raw.info['bads'].extend(new_chs_to_interp) # check number of interpolated channels is not too high assert len(raw.info['bads']) <= max_chs_interp # interpolate bad channels (spherical spline method) to avoid biasing average reference print('Interpolating electrodes: ', raw.info['bads']) n_chs_interpolated = len(raw.info['bads']) raw = raw.interpolate_bads(method={'eeg': 'spline'}) # raw.plot() # filter ============================== # use an average EEG reference raw.set_eeg_reference('average', ch_type='eeg') # filter the data ===================== raw_ica = raw.copy() # create copy for ICA iir_params = {'order': 1, 'ftype': 'butter'} # bandpass filter with effective 4th order butterworth between .1 and 40 Hz raw.filter(0.1, 40, method='iir', phase='zero-double', iir_params=iir_params) # version of the data with a highpass of 1 Hz to improve ICA performance # raw_ica.filter(1, 40, method='iir', phase='zero-double', iir_params=iir_params) raw_ica.filter(1, 100, method='iir', phase='zero-double', iir_params=iir_params) # changed from preregistration to match iclabel dataset # fit ICA ============================= ica = mne.preprocessing.ICA( method='infomax', fit_params=dict(extended=True), # n_components=64, n_components=32, # changed from preregistration, as 64 components led to problems max_iter='auto', random_state=97) ica.fit(raw_ica, picks='eeg') # ica.plot_components(); # ica.plot_sources(raw); # ICLabel ============================= # predict ICA labels from iclabel # extract the labels of each component ic_labels = label_components(raw_ica, ica, method='iclabel') print(ic_labels) # exclude components classified as something other than "brain" or "other" with at least 85% predicted probability exclude_idx = np.where( (~np.isin(np.array(ic_labels['labels']), ['brain', 'other'])) & (ic_labels['y_pred_proba']>0.85) )[0] print(f'Excluding these ICA components: {exclude_idx}') # apply ICA =========================== ica.apply(raw, exclude=exclude_idx) subj_metadata['n_ica_excluded'] = len(exclude_idx) # epoch the data ====================== # Note: nt stands for non-target beh_nt = subj_beh.drop(subj_beh[subj_beh.target==1].index).reset_index() events_nt = events[events[:, 2]==event_dict['nontarget'], :] assert( len(beh_nt) == events_nt.shape[0] ) # manually change the event IDs to have a unique ID for each character event_ids_char_ids = {c: i+1 for i, c in enumerate(chars)} events_nt[:, 2] = [event_ids_char_ids[c_x] for c_x in beh_nt.stimulus] # adjust for the 8 ms delay between trigger onset and stimuli appearing at the centre of the screen events_nt[:, 0] += np.round( 8 * (raw.info['sfreq'] / 1000) ).astype(int) # will be 8 samples, as we recorded at 1000 Hz epochs_nt = mne.Epochs( raw, events=events_nt, # event_id={'nontarget': 1}, event_id = event_ids_char_ids, tmin=-0.2, tmax=1.0, preload=True, reject_by_annotation=True, reject=None, flat=None ) beh_nt['drop_log'] = [';'.join(x) for x in epochs_nt.drop_log] beh_nt_clean = beh_nt[beh_nt['drop_log'] == ''] # for each trial, also store the IDs as metadata in the epochs epochs_nt.metadata = beh_nt_clean[['stimulus']] # now reject bad epochs epochs_nt.drop_bad(reject=reject_criteria, flat=flat_criteria) # look for consistently noisy channels # ...first, extract the drop log to count the number of trials for which each channel is marked as bad... bad_counts = {k: 0 for k in raw.info['ch_names']} for bad_i in epochs_nt.drop_log: for str_j in bad_i: if str_j in raw.info['ch_names']: bad_counts[str_j] += 1 # ...then convert this to a proportion of available non-target trials bad_props = {k: v/len(epochs_nt) for k, v in bad_counts.items()} # ...make note of these bad channels to be interpolated in the second attempt new_chs_to_interp = [k for k, v in bad_props.items() if v > prop_bad_to_remove_ch] # ...and finally, restart if this list is not empty, with those new channels interpolated at the start of the pipeline if len(new_chs_to_interp)==0: do_restart = False print(epochs_nt) # check the number of trials retained per item stim_counts = epochs_nt.metadata.groupby(['stimulus']).size().reset_index(name='n') assert np.min(stim_counts.n) >= min_trials_per_char subj_metadata['min_trials_per_char'] = np.min(stim_counts.n) subj_metadata['mean_trials_per_char'] = np.mean(stim_counts.n) subj_metadata['max_trials_per_char'] = np.max(stim_counts.n) subj_metadata['n_epochs'] = len(epochs_nt) subj_metadata['n_epochs_excluded'] = 750 - len(epochs_nt) subj_metadata['n_extra_chans_excluded'] = len(new_chs_to_interp) subj_metadata['n_total_chans_interpolated'] = n_chs_interpolated subj_metadata['n_preprocessing_attempts'] = n_attempts # save epochs object epo_file = f'{subj_id}-epo.fif' epochs_nt.save(op.join('epo', epo_file), overwrite=True) all_subj_metadata.append(subj_metadata) metadata_df = pd.DataFrame(all_subj_metadata) metadata_df.to_csv('preprocessing_metadata.csv', index=False)