123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- import os
- import json
- import h5py
- import numpy as np
- import matplotlib.pyplot as plt
- from scipy import signal
- def when_successful(traj, x_isl, y_isl, r_isl, t_sit):
- """
- traj - t, x, y - a matrix Nx3 of position data, equally sampled!
- """
- splits = np.where( (traj[:, 1] - x_isl)**2 + (traj[:, 2] - y_isl)**2 < r_isl**2 )[0]
- df = np.where(np.diff(splits) > 5)[0] # idxs of periods of starts
- if len(splits) > 0:
- periods = [[0, df[0] if len(df) > 0 else len(splits)-1]]
- if len(df) > 1:
- for point in df[1:]:
- periods.append( [periods[-1][1] + 1, point] )
- if len(df) > 0:
- periods.append([periods[-1][1] + 1, len(splits)-1])
- for period in periods:
- if traj[splits[period[1]]][0] - traj[splits[period[0]] - 5][0] > t_sit: # -5 is a hack
- return traj[splits[period[0]]][0] + t_sit
- return None
- def calculate_performance(tl, trial_idxs, cfg):
- """
- Returns a matrix of time_bins x metrics, usually (12 x 7) of
- performance_median, performance_upper_CI, performance_lower_CI, chance_median, chance_upper_CI, chance_lower_CI, time
- """
-
- arena_r = cfg['position']['floor_r_in_meters']
- target_r = cfg['experiment']['target_radius']
- t_sit = cfg['experiment']['target_duration']
- timepoints = cfg['experiment']['timepoints']
- s_duration = cfg['experiment']['session_duration']
- trial_time = tl[trial_idxs[:, 1].astype(np.int32)][:, 0] - tl[trial_idxs[:, 0].astype(np.int32)][:, 0]
- correct_trial = (trial_idxs[:, 5] == 1)
- time_bin_length = 5 # in secs
- N_time_slot = int(cfg['experiment']['trial_duration'] / time_bin_length) # 12 bins
- time_x_2plot = (np.arange(N_time_slot) + 1) * time_bin_length
- amount_trials = len(trial_time)
- amount_correct = np.zeros(N_time_slot, dtype=np.int32)
- for i, t_bin in enumerate(time_x_2plot):
- amount_correct[i] = len(np.where(trial_time < t_bin)[0])
- proportion_correct = amount_correct / amount_trials
-
- # bootstrapping real trials
- bs_count = 1000
- proportion_correct_bs = np.zeros((bs_count, N_time_slot))
- confidence_interval_real = np.zeros((2, N_time_slot))
- for i in range(N_time_slot):
- for bs in range(bs_count):
- temp_index = np.random.randint(0, amount_trials, amount_trials)
- temp_correct = np.zeros(amount_trials)
- temp_correct[:amount_correct[i]] = 1
- proportion_correct_bs[bs, i] = temp_correct[temp_index].sum() / float(amount_trials)
- confidence_interval_real[0, i] = np.percentile(proportion_correct_bs[:, i], 84.1) - np.median(proportion_correct_bs[:, i])
- confidence_interval_real[1, i] = np.percentile(proportion_correct_bs[:, i], 15.9) - np.median(proportion_correct_bs[:, i])
- # creating list of fake islands that will not overlap with target islands
- no_fake_islands = 1000
- fake_island_centers_x = np.empty((no_fake_islands, amount_trials))
- fake_island_centers_y = np.empty((no_fake_islands, amount_trials))
- fake_island_centers_x[:] = np.nan
- fake_island_centers_y[:] = np.nan
- for i in range(amount_trials):
- X_target, Y_target = trial_idxs[i][2], trial_idxs[i][3]
- count = 0
- while np.isnan(fake_island_centers_x[:, i]).any():
- angle = 2 * np.pi * np.random.rand()
- r = arena_r * np.sqrt(np.random.rand())
- x_temp = r * np.cos(angle) # add center of the arena if not centered
- y_temp = r * np.sin(angle) # add center of the arena if not centered
- if np.sqrt((x_temp - X_target)**2 + (y_temp - Y_target)**2) > 2 * target_r and \
- np.sqrt(x_temp**2 + y_temp**2) < arena_r - target_r:
- fake_island_centers_x[count, i] = x_temp
- fake_island_centers_y[count, i] = y_temp
- count += 1
-
- # surrogate islands work, now calculate the chance performance
- surrogate_correct = np.zeros((no_fake_islands, amount_trials))
- pos_downsample = 10 # think about reducing
- for trial in range(amount_trials):
- temp_index = np.arange(trial_idxs[trial][0], trial_idxs[trial][1], pos_downsample).astype(np.int32)
- temp_traj = tl[temp_index]
- temp_traj[:, 0] -= temp_traj[0][0] # time relative to trial start
- for surr in range(no_fake_islands):
- x_fake, y_fake = fake_island_centers_x[surr, trial], fake_island_centers_y[surr, trial]
- fake_island_time_finish = when_successful(temp_traj, x_fake, y_fake, target_r, t_sit)
- if fake_island_time_finish is not None:
- surrogate_correct[surr, trial] = fake_island_time_finish
- # now i have to do the same curve as in the real correct, but for the matrix surrogate_correct
- surr_for_deleting = np.array(surrogate_correct)
- proportion_correct_bs_fake = np.zeros((bs_count, N_time_slot))
- confidence_interval_bs_fake = np.zeros((2, N_time_slot))
- for time_slot in range(N_time_slot):
- fake_trials_to_remove = np.where(trial_time < (time_slot + 1) * time_bin_length)[0]
- for trial in fake_trials_to_remove:
- #idxs = np.logical_or(surrogate_correct[:, trial] == 0, surrogate_correct[:, trial] > trial_time[trial])
- idxs = np.where( (surrogate_correct[:, trial] == 0) | (surrogate_correct[:, trial] > trial_time[trial]) )[0]
- for idx in idxs:
- surr_for_deleting[idx, trial] = np.nan
- scwr = surr_for_deleting.flatten()
- scwr = scwr[~np.isnan(scwr)]
- for bs in range(bs_count):
- temp_index = np.random.randint(0, len(scwr), amount_trials)
- temp_correct = np.logical_and(scwr[temp_index] < (time_slot + 1) * time_bin_length, scwr[temp_index] > 0)
- proportion_correct_bs_fake[bs, time_slot] = temp_correct.sum() / float(amount_trials)
- confidence_interval_bs_fake[0, time_slot] = np.percentile(proportion_correct_bs_fake[:, time_slot], 84.1) - np.median(proportion_correct_bs_fake[:, time_slot])
- confidence_interval_bs_fake[1, time_slot] = np.percentile(proportion_correct_bs_fake[:, time_slot], 15.9) - np.median(proportion_correct_bs_fake[:, time_slot])
-
- # compute performance metrics
- c_median = 100 * np.median(proportion_correct_bs_fake, axis=0)
- c_lower_CI = 100 * confidence_interval_bs_fake[1]
- c_upper_CI = 100 * confidence_interval_bs_fake[0]
- p_median = 100 * np.median(proportion_correct_bs, axis=0)
- p_lower_CI = 100 * confidence_interval_real[1]
- p_upper_CI = 100 * confidence_interval_real[0]
-
- return np.column_stack([p_median, p_lower_CI, p_upper_CI, c_median, c_lower_CI, c_upper_CI, time_x_2plot])
- def dump_performance_to_H5(h5name, ds_name, dataset):
- with h5py.File(h5name, 'a') as f:
- if not 'analysis' in f:
- anal_group = f.create_group('analysis')
- anal_group = f['analysis']
- if ds_name in anal_group:
- del anal_group[ds_name]
- anal_group.create_dataset(ds_name, data=dataset)
- def get_finish_times(session_path):
- # loading session
- session = os.path.basename(os.path.normpath(session_path))
- h5name = os.path.join(session_path, session + '.h5')
- jsname = os.path.join(session_path, session + '.json')
- with open(jsname, 'r') as f:
- cfg = json.load(f)
- with h5py.File(h5name, 'r') as f:
- tl = np.array(f['processed']['timeline']) # time, X, Y, speed
- trial_idxs = np.array(f['processed']['trial_idxs']) # idx start, idx end, X, Y, R, trial result (idx to tl)
- islands = np.array(f['raw']['islands']) # tgt_x, tgt_y, tgt_r, d1_x, etc..
- # compute finish times
- finish_times = np.zeros((islands.shape[0], int(islands.shape[1]/3) ))
- for i, trial in enumerate(trial_idxs):
- traj = tl[int(trial[0]):int(trial[1])]
-
- current = islands[i].reshape((3, 3))
- for j, island in enumerate(current):
- finish_time = when_successful(traj, island[0], island[1], island[2], cfg['experiment']['target_duration'])
- if finish_time is not None:
- finish_times[i][j] = round(finish_time, 2)
- return finish_times
- def get_finish_times_rates(finish_times):
- isl_no = finish_times.shape[1]
- rates = np.zeros((isl_no + 1,)) # last one is when no island was successful
- for i in range(isl_no):
- isl_idx = isl_no - i - 1
- successful = finish_times[finish_times[:, isl_idx] > 0]
-
- count = 0
- for succ_trial in successful:
- finished_earlier = [x for x in succ_trial if x > 0 and x < succ_trial[isl_idx]]
- if len(finished_earlier) == 0:
- count += 1
-
- rates[isl_idx] = count
-
- # add fully unsuccessful trials
- rates[-1] = len([x for x in finish_times if x.sum() == 0])
- return rates
|