import multiprocessing import numpy as np from pypet import Trajectory from pypet.brian2 import Brian2MonitorResult from brian2.units import ms, khertz from scripts.spatial_network.perlin_map.run_simulation_perlin_map import DATA_FOLDER, TRAJ_NAME traj = None directions = None def calculate_rates(list_of_spike_times, min_time=0*ms): isis = [np.ediff1d(np.extract(spike_times / ms > min_time / ms, spike_times / ms)) * ms for spike_times in list_of_spike_times] rates = np.array([1.0 / np.mean(isi / ms) if isi.shape[0] != 0 else 0 for isi in isis]) * khertz return rates def get_spike_train_dictionary(number_of_neurons, spike_times, neuron_indices): spike_train_dict = {} for neuron_idx in range(number_of_neurons): spike_train_dict[neuron_idx] = [] for neuron_idx, t in zip(neuron_indices, spike_times): spike_train_dict[neuron_idx].append(t) return spike_train_dict def get_firing_rate_dict_per_cell_and_direction(traj, run_name): traj.f_set_crun(run_name) firing_rate_dict = {} direction_names = ["dir{:d}".format(idx) for idx in range(traj.input.number_of_directions)] for idx in range(traj.N_E): firing_rate_dict[idx] = [] for direction in direction_names: number_of_neurons = traj.N_E all_spike_times = traj.results.runs[run_name][direction].spikes.e.t neuron_indices = traj.results.runs[run_name][direction].spikes.e.i ex_spike_trains = get_spike_train_dictionary(number_of_neurons, all_spike_times, neuron_indices) ex_spike_rates = calculate_rates(ex_spike_trains.values()) for idx, spike_rate in enumerate(ex_spike_rates): firing_rate_dict[idx].append(spike_rate) traj.f_restore_default() return firing_rate_dict def get_firing_rate_array_per_cell_and_direction(traj, run_name): traj.f_set_crun(run_name) firing_rate_array = np.ndarray((traj.N_E, traj.input.number_of_directions)) direction_names = ["dir{:d}".format(idx) for idx in range(traj.input.number_of_directions)] for dir_idx, direction in enumerate(direction_names): number_of_neurons = traj.N_E all_spike_times = traj.results.runs[run_name][direction].spikes.e.t neuron_indices = traj.results.runs[run_name][direction].spikes.e.i ex_spike_trains = get_spike_train_dictionary(number_of_neurons, all_spike_times, neuron_indices) ex_spike_rates = calculate_rates(ex_spike_trains.values()) for n_idx, spike_rate in enumerate(ex_spike_rates): # TODO: Why on earth does the unit vanish? firing_rate_array[n_idx, dir_idx] = spike_rate traj.f_restore_default() return firing_rate_array def get_head_direction_indices(directions, firing_rate_array): n_exc_neurons = firing_rate_array.shape[0] n_directions = len(directions) tuning_vectors = np.zeros((n_exc_neurons, n_directions, 2)) rate_sums = np.zeros((n_exc_neurons,)) for ex_id, ex_rates in enumerate(firing_rate_array): rate_sum = 0. for dir_id, dir in enumerate(directions): tuning_vectors[ex_id, dir_id] = np.array([np.cos(dir), np.sin(dir)]) * ex_rates[dir_id] rate_sum += ex_rates[dir_id] rate_sums[ex_id] = rate_sum tuning_vectors_return = tuning_vectors.copy() for ex_id in range(n_exc_neurons): if rate_sums[ex_id] != 0.: tuning_vectors[ex_id, :, :] /= rate_sums[ex_id] tuning_vectors_summed = np.sum(tuning_vectors, axis=1) head_direction_indices = np.array([np.linalg.norm(v) for v in tuning_vectors_summed]) return head_direction_indices, tuning_vectors_return def get_inhibitory_firing_rate_array_per_cell_and_direction(traj, run_name): number_of_neurons = traj.N_I traj.f_set_crun(run_name) firing_rate_array = np.ndarray((number_of_neurons, traj.input.number_of_directions)) direction_names = ["dir{:d}".format(idx) for idx in range(traj.input.number_of_directions)] try: traj.results.runs[run_name]['dir0'].spikes.i.t except: label = traj.derived_parameters.runs[run_name].morphology.morph_label print('Cant find inhibitory spike times for run {}, probably because there where no spikes'.format(run_name, label)) return np.zeros((number_of_neurons, traj.input.number_of_directions)) for dir_idx, direction in enumerate(direction_names): all_spike_times = traj.results.runs[run_name][direction].spikes.i.t neuron_indices = traj.results.runs[run_name][direction].spikes.i.i inh_spike_trains = get_spike_train_dictionary(number_of_neurons, all_spike_times, neuron_indices) inh_spike_rates = calculate_rates(inh_spike_trains.values()) for n_idx, spike_rate in enumerate(inh_spike_rates): # TODO: Why on earth does the unit vanish? firing_rate_array[n_idx, dir_idx] = spike_rate traj.f_restore_default() return firing_rate_array def get_inhibitory_head_direction_indices(directions, firing_rate_array): n_inh_neurons = firing_rate_array.shape[0] n_directions = len(directions) tuning_vectors = np.zeros((n_inh_neurons, n_directions, 2)) rate_sums = np.zeros((n_inh_neurons,)) for inh_id, inh_rates in enumerate(firing_rate_array): rate_sum = 0. for dir_id, dir in enumerate(directions): tuning_vectors[inh_id, dir_id] = np.array([np.cos(dir), np.sin(dir)]) * inh_rates[dir_id] rate_sum += inh_rates[dir_id] rate_sums[inh_id] = rate_sum tuning_vectors_return = tuning_vectors.copy() for inh_id in range(n_inh_neurons): if rate_sums[inh_id] != 0.: tuning_vectors[inh_id, :, :] /= rate_sums[inh_id] tuning_vectors_summed = np.sum(tuning_vectors, axis=1) head_direction_indices = np.array([np.linalg.norm(v) for v in tuning_vectors_summed]) return head_direction_indices, tuning_vectors_return def analyse_single_run(run_name): traj.f_set_crun(run_name) label = traj.derived_parameters.runs[run_name].morphology.morph_label print('Starting analysis of run {}'.format(run_name)) if label != 'no conn': inh_firing_rate_array = get_inhibitory_firing_rate_array_per_cell_and_direction(traj, run_name) inh_head_direction_indices, inh_tuning_vectors = get_inhibitory_head_direction_indices(directions, inh_firing_rate_array) else: n_inh = traj.N_I n_dir = traj.input.number_of_directions inh_firing_rate_array = np.zeros((n_inh, n_dir)) inh_head_direction_indices = np.zeros(n_inh) inh_tuning_vectors = np.zeros((n_inh, n_dir, 2)) exc_firing_rate_array = get_firing_rate_array_per_cell_and_direction(traj, run_name) exc_head_direction_indices, exc_tuning_vectors = get_head_direction_indices(directions, exc_firing_rate_array) print('Finishing analysis of run {}'.format(run_name)) return exc_firing_rate_array, exc_head_direction_indices, exc_tuning_vectors, inh_firing_rate_array, inh_head_direction_indices, inh_tuning_vectors def main(): global traj, directions traj = Trajectory(TRAJ_NAME, add_time=False, dynamic_imports=Brian2MonitorResult) NO_LOADING = 0 FULL_LOAD = 2 traj.f_load(filename=DATA_FOLDER + TRAJ_NAME + ".hdf5", load_parameters=FULL_LOAD, load_results=FULL_LOAD) # Use in conjunction with NO_LOADING to save on memory. Beware, that it might not always work as intended. # traj.v_auto_load = True directions = np.linspace(-np.pi, np.pi, traj.input.number_of_directions, endpoint=False) run_names = traj.f_get_run_names()[::-1] pool = multiprocessing.Pool() multi_proc_result = pool.map(analyse_single_run, run_names) for idx, run_name in enumerate(run_names): traj.f_set_crun(run_name) traj.f_add_result('runs.$.firing_rate_array', multi_proc_result[idx][0], comment='The firing rates of the excitatory population') traj.f_add_result('runs.$.head_direction_indices', multi_proc_result[idx][1], comment='The HDIs of the excitatory population') traj.f_add_result('runs.$.tuning_vectors', multi_proc_result[idx][2], comment='The tuning vectors of the excitatory population') traj.f_add_result('runs.$.inh_firing_rate_array', multi_proc_result[idx][3], comment='The firing rates of the inhibitory population') traj.f_add_result('runs.$.inh_head_direction_indices', multi_proc_result[idx][4], comment='The HDIs of the inhibitory population') traj.f_add_result('runs.$.inh_tuning_vectors', multi_proc_result[idx][5], comment='The tuning vectors of the inhibitory population') traj.f_restore_default() traj.f_store() if __name__ == "__main__": main()