import multiprocessing import numpy as np from brian2.units import Hz from pypet import Trajectory from pypet.brian2 import Brian2MonitorResult, Brian2Result from scripts.spatial_network.head_direction_index_over_noise_scale import calculate_rates from scripts.spatial_network.placement_jitter.run_entropy_maximisation_orientation_map_placement_jitter import DATA_FOLDER, TRAJ_NAME traj = None directions = None def get_spike_train_dictionary(number_of_neurons, spike_times, neuron_indices): spike_train_dict = {} for neuron_idx in range(number_of_neurons): spike_train_dict[neuron_idx] = [] for neuron_idx, t in zip(neuron_indices, spike_times): spike_train_dict[neuron_idx].append(t) return spike_train_dict def get_firing_rate_dict_per_cell_and_direction(traj, run_name): traj.f_set_crun(run_name) firing_rate_dict = {} direction_names = ["dir{:d}".format(idx) for idx in range(traj.input.number_of_directions)] for idx in range(traj.N_E): firing_rate_dict[idx] = [] for direction in direction_names: number_of_neurons = traj.N_E all_spike_times = traj.results.runs[run_name][direction].spikes.e.t neuron_indices = traj.results.runs[run_name][direction].spikes.e.i ex_spike_trains = get_spike_train_dictionary(number_of_neurons, all_spike_times, neuron_indices) ex_spike_rates = calculate_rates(ex_spike_trains.values()) for idx, spike_rate in enumerate(ex_spike_rates): firing_rate_dict[idx].append(spike_rate) traj.f_restore_default() return firing_rate_dict def get_firing_rate_array_per_cell_and_direction(traj, run_name): traj.f_set_crun(run_name) firing_rate_array = np.ndarray((traj.N_E, traj.input.number_of_directions)) direction_names = ["dir{:d}".format(idx) for idx in range(traj.input.number_of_directions)] for dir_idx, direction in enumerate(direction_names): number_of_neurons = traj.N_E all_spike_times = traj.results.runs[run_name][direction].spikes.e.t neuron_indices = traj.results.runs[run_name][direction].spikes.e.i ex_spike_trains = get_spike_train_dictionary(number_of_neurons, all_spike_times, neuron_indices) ex_spike_rates = calculate_rates(ex_spike_trains.values()) for n_idx, spike_rate in enumerate(ex_spike_rates): #TODO: Why on earth does the unit vanish? firing_rate_array[n_idx, dir_idx] = spike_rate traj.f_restore_default() return firing_rate_array def get_head_direction_indices(directions, firing_rate_array): n_exc_neurons = firing_rate_array.shape[0] n_directions = len(directions) tuning_vectors = np.zeros((n_exc_neurons,n_directions,2)) rate_sums = np.zeros((n_exc_neurons,)) for ex_id, ex_rates in enumerate(firing_rate_array): rate_sum = 0. for dir_id, dir in enumerate(directions): tuning_vectors[ex_id, dir_id] = np.array([np.cos(dir), np.sin(dir)]) * ex_rates[dir_id] rate_sum += ex_rates[dir_id] rate_sums[ex_id] = rate_sum tuning_vectors_return = tuning_vectors.copy() for ex_id in range(n_exc_neurons): if rate_sums[ex_id] != 0.: tuning_vectors[ex_id, :, :] /= rate_sums[ex_id] tuning_vectors_summed = np.sum(tuning_vectors, axis=1) head_direction_indices = np.array([np.linalg.norm(v) for v in tuning_vectors_summed]) return head_direction_indices, tuning_vectors_return def get_inhibitory_firing_rate_array_per_cell_and_direction(traj, run_name): number_of_neurons = traj.N_I traj.f_set_crun(run_name) firing_rate_array = np.ndarray((number_of_neurons, traj.input.number_of_directions)) direction_names = ["dir{:d}".format(idx) for idx in range(traj.input.number_of_directions)] try: traj.results.runs[run_name]['dir0'].spikes.i.t except: label = traj.derived_parameters.runs[run_name].morphology.morph_label print('Cant find t for run {} with label {}'.format(run_name,label)) return np.zeros((number_of_neurons, traj.input.number_of_directions)) for dir_idx, direction in enumerate(direction_names): all_spike_times = traj.results.runs[run_name][direction].spikes.i.t neuron_indices = traj.results.runs[run_name][direction].spikes.i.i inh_spike_trains = get_spike_train_dictionary(number_of_neurons, all_spike_times, neuron_indices) inh_spike_rates = calculate_rates(inh_spike_trains.values()) for n_idx, spike_rate in enumerate(inh_spike_rates): #TODO: Why on earth does the unit vanish? firing_rate_array[n_idx, dir_idx] = spike_rate traj.f_restore_default() return firing_rate_array def get_inhibitory_head_direction_indices(directions, firing_rate_array): n_inh_neurons = firing_rate_array.shape[0] n_directions = len(directions) tuning_vectors = np.zeros((n_inh_neurons,n_directions,2)) rate_sums = np.zeros((n_inh_neurons,)) for inh_id, inh_rates in enumerate(firing_rate_array): rate_sum = 0. for dir_id, dir in enumerate(directions): tuning_vectors[inh_id, dir_id] = np.array([np.cos(dir), np.sin(dir)]) * inh_rates[dir_id] rate_sum += inh_rates[dir_id] rate_sums[inh_id] = rate_sum tuning_vectors_return = tuning_vectors.copy() for inh_id in range(n_inh_neurons): if rate_sums[inh_id] != 0.: tuning_vectors[inh_id, :, :] /= rate_sums[inh_id] tuning_vectors_summed = np.sum(tuning_vectors, axis=1) head_direction_indices = np.array([np.linalg.norm(v) for v in tuning_vectors_summed]) return head_direction_indices, tuning_vectors_return def get_runs_with_circular_morphology(traj): filtered_indices = traj.f_find_idx(('parameters.long_axis', 'parameters.short_axis'), lambda r1, r2: r1 == r2) return filtered_indices def analyse_single_run(run_name): traj.f_set_crun(run_name) label = traj.derived_parameters.runs[run_name].morphology.morph_label print(run_name, label) if label != 'no conn': print(run_name,'inh') inh_firing_rate_array = get_inhibitory_firing_rate_array_per_cell_and_direction(traj, run_name) inh_head_direction_indices, inh_tuning_vectors = get_inhibitory_head_direction_indices(directions, inh_firing_rate_array) else: n_inh = traj.N_I n_dir = traj.input.number_of_directions inh_firing_rate_array = np.zeros((n_inh, n_dir)) inh_head_direction_indices = np.zeros(n_inh) inh_tuning_vectors = np.zeros((n_inh,n_dir,2)) exc_firing_rate_array = get_firing_rate_array_per_cell_and_direction(traj, run_name) exc_head_direction_indices, exc_tuning_vectors = get_head_direction_indices(directions, exc_firing_rate_array) return exc_firing_rate_array, exc_head_direction_indices, exc_tuning_vectors, inh_firing_rate_array, inh_head_direction_indices, inh_tuning_vectors def main(): global traj, directions traj = Trajectory(TRAJ_NAME, add_time=False, dynamic_imports=Brian2MonitorResult) NO_LOADING = 0 FULL_LOAD = 2 traj.f_load(filename=DATA_FOLDER + TRAJ_NAME + ".hdf5", load_parameters=FULL_LOAD, load_results=FULL_LOAD) # traj.v_auto_load = True correlation_lengths = traj.f_get('correlation_length').f_get_range() long_axis = traj.f_get('long_axis').f_get_range() short_axis = traj.f_get('short_axis').f_get_range() directions = np.linspace(-np.pi, np.pi, traj.input.number_of_directions, endpoint=False) run_names = traj.f_get_run_names()[::-1] # run_names = ['run_00000900'] pool = multiprocessing.Pool() # for idx, run_name in enumerate(tqdm(traj.f_get_run_names())): multi_proc_result = pool.map(analyse_single_run, run_names) print(type(multi_proc_result), len(multi_proc_result)) for idx, run_name in enumerate(run_names): traj.f_set_crun(run_name) traj.f_add_result('runs.$.firing_rate_array', multi_proc_result[idx][0], comment='The firing rates of the excitatory population') traj.f_add_result('runs.$.head_direction_indices', multi_proc_result[idx][1], comment='The HDIs of the excitatory population') traj.f_add_result('runs.$.tuning_vectors', multi_proc_result[idx][2], comment='The tuning vectors of the excitatory population') traj.f_add_result('runs.$.inh_firing_rate_array', multi_proc_result[idx][3], comment='The firing rates of the inhibitory population') traj.f_add_result('runs.$.inh_head_direction_indices', multi_proc_result[idx][4], comment='The HDIs of the inhibitory population') traj.f_add_result('runs.$.inh_tuning_vectors', multi_proc_result[idx][5], comment='The tuning vectors of the inhibitory population') traj.f_restore_default() traj.f_store() if __name__ == "__main__": main()