import os import numpy as np import pandas as pd import rlxnix as rlx from joblib import Parallel, delayed from IPython import embed from ..util import load_white_noise_stim, mutual_info, DelayType spike_buffer = {} stimulus_name = "gwn300Hz10s0.3.dat" def population_spikes(df, data_location, population_size, subtract_delay=True): """Assembles the spike times for a 'population' of neurons. Parameters ---------- df : pd.DataFrame The DataFrame containing the information about where to find the recorded white noise responses. population_size : int The size of the population, i.e. the number of randomly selected trials. subtract_delay : bool, optional Defines whether the neuronal delay should be subtracted from the spike times, by default True Returns ------- list: list of spike times of the randomly selected trials. list: list containing whether or not the response needs to be inverted. list: the respective cell ids list: the respective trial ids. """ cell_indices = np.arange(len(df.dataset_id.unique())) np.random.shuffle(cell_indices) cells = df.dataset_id.unique()[cell_indices[:population_size]] spikes = [] inversion_needed = [] cell_ids = [] trial_ids = [] for c in cells: if c in spike_buffer.keys(): # print(f"Found spikes of cell {c} in buffer!") all_spikes = spike_buffer[c] else: # print(f"Reading spikes of cell {c} from file!") dataset = rlx.Dataset(os.path.join(data_location, c + ".nix")) trace_name = "spikes-1" if "spikes-1" in dataset.event_traces else "Spikes-1" all_spikes = dataset.event_traces[trace_name].data_array[:] spike_buffer[c] = all_spikes cell_trials = df[df.dataset_id == c] cell_indexes = cell_trials.index.values trial_index = cell_indexes[np.random.randint(len(cell_indexes))] trial = cell_trials[cell_trials.index == trial_index] trial_spikes = all_spikes[(all_spikes >= trial.start_time.values[0]) & (all_spikes < trial.end_time.values[0])] - trial.start_time.values[0] if subtract_delay: trial_spikes -= trial.delay.values[0] # compensate for the delay between stimulus and response spikes.append(trial_spikes[trial_spikes > 0]) inversion_needed.append(trial.inverted.values[0]) cell_ids.append(c) trial_ids.append(trial.stimulus_index.values[0]) return spikes, inversion_needed, cell_ids, trial_ids def mutual_info_heterogeneous_population(df, data_location, stim_location, population_size=10, delay=0.01, repeats=10, kernel_sigma=0.00125, saving=False, result_location=".", delay_type=DelayType.Equal): """Calculates the mutual information between the stimulus and the population response assembled from random selections of neuronal responses. The population response is the firing rate (estimated by kernel convolution with a Gaussian kernel of the given kernel sigma) averaged across responses. Parameters ---------- df : pandas.DataFrame The DataFrame holding information about the white noise driven responses recorded in the neurons. data_location : _type_ The folder containing the raw data. stim_location : _type_ The folder in which the stimuli are stored. population_size : int, optional The population size, by default 10 delay : float, optional The desired average artificial delay between stimulus and response, if negative, the cell delay will not be subtracted, by default 0.01 repeats : int, optional Number of repetitions for the given combination of kernel and delay, by default 10 kernel_sigma : float, optional The standard deviation of the simulated 'synaptic' gaussian kernel, by default 0.00125 saving : bool, optional if True, the population responses are stored to disk, by default False result_location : str, optional The location the population responses should be stored to, by default "." delay_type : DelayType, optional The delay type to be used for the artificial delay, by default DelayType.Equal Returns ------- list of dictionaries containing the analysis results """ coherences = [] frequency = [] _, stimulus = load_white_noise_stim(os.path.join(stim_location, stimulus_name)) results = [] for i in range(repeats): r = {"pop_size": population_size, "delay": delay, "true_delay": 0.0, "kernel_sigma": kernel_sigma, "num_inversions": 0, "population_rate": 0.0, "rate_modulation": 0.0, "snr": 0.0, "mi":0.0, "mi_100": 0.0, "mi_200": 0.0, "mi_300": 0.0, "cell_ids":[], "mtag_ids":[], "trial_ids":[], "result_file":"", "delay_type": str(delay_type) } print(" "*200, end="\r") print(f"delay type: {delay_type}, population size: {population_size}, delay: {delay:.5f}s, kernel: {kernel_sigma:.5f}, repeat: {i}", end="\r" ) subtract_delay = delay >= 0.0 pop_spikes, inversion_needed, cell_ids, trial_ids = population_spikes(df, data_location, population_size, subtract_delay) f, c, mis, rates, true_delay = mutual_info(pop_spikes, delay, inversion_needed, stimulus, freq_bin_edges=[0, 100, 200, 300], kernel_sigma=kernel_sigma, delay_type=delay_type) coherences.append(c) frequency = f smoothing_kernel = np.ones(19)/19 sc = np.convolve(c, smoothing_kernel, mode="same") max_coh = np.max(sc[f <= 300]) peak_f = f[np.where(sc == max_coh)] upper_cutoff = f[(sc <= max_coh/np.sqrt(2)) & (f > peak_f)][0] lower_cutoff = f[(sc <= max_coh/np.sqrt(2)) & (f < peak_f)][-1] if len(f[(sc <= max_coh/np.sqrt(2)) & (f < peak_f)]) > 0 else 0.0 avg_response = np.mean(rates, axis=1) if population_size > 1 else rates avg_rate = np.mean(avg_response) rate_error = np.std(rates, axis=1) if population_size > 1 else None variability = np.mean(rate_error, axis=0) if rate_error is not None else None snr = np.std(avg_rate) / variability if variability is not None else None outfile_name = "pop_size%i_repeat%i_delay%.5f.npz" % (population_size, i, delay) outfile_full = os.path.join(result_location, outfile_name) if saving: np.savez_compressed(outfile_full, coherences=coherences, frequencies=frequency, avg_rate=avg_rate, rate_std=rate_error) r["true_delay"] = true_delay r["num_inversions"] = np.sum(inversion_needed) r["population_rate"] = avg_rate r["rate_modulation"] = np.std(avg_response) r["peak_coh"] = max_coh r["upper_cutoff"] = upper_cutoff r["lower_cutoff"] = lower_cutoff r["peak_freq"] = peak_f r["snr"] = snr if snr is not None else 0.0 r["variability"] = variability if variability is not None else 0.0 r["mi"] = mis[0] r["mi_100"] = mis[1] r["mi_200"] = mis[2] r["mi_300"] = mis[3] r["cell_ids"] = cell_ids r["trial_ids"] = trial_ids r["result_file"] = outfile_name if not saving else "" results.append(r) return results def run_heterogeneous_population_analysis(whitenoise_trial_file, data_location, stim_location, population_sizes=[2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30], delays=[0.0, 0.0005, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.01, 0.0125, 0.015], kernels=[0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.005, 0.01, 0.015, 0.02, 0.025], repeats=50, saving=False, num_cores=1, delay_type=DelayType.Equal): """Runs the population analysis for heterogeneous populations. Parameters ---------- whitenoise_trial_file : str The dataframe containing the information of the white noise recordings. data_location : str The path to the raw data. stim_location : str The path to the folder containing the stimuli. population_sizes : list of ints, optional List of population sizes, by default [2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30] delays : list, optional list of artificial delays added to the spike times. -1 indicates that the original delay should not be changed, by default [0.0, 0.0005, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.01, 0.0125, 0.015] kernels : list, optional list of Gaussian kernel standard deviations used for firing rate estimation, by default [0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.005, 0.01, 0.015, 0.02, 0.025] repeats : int, optional Maximum number of repetitions for each combination of population sizes, delays, kernels. For each repetition a new set of responses will be used. By default 50 saving : bool, optional Flag that indicates whether or not coherence spectra, firing rates etc should be saved, by default False num_cores : int, optional number of parallel jobs spawned to run the analyses, by default 1 delay_type : DelayType The type of delay added to the spike responses. By default DelayType.Equal Returns ------- pd.DataFrame The DataFrame containing the analysis results. """ result_dicts = [] whitenoise_trials = pd.read_csv(whitenoise_trial_file, sep=";", index_col=0) whitenoise_trials = whitenoise_trials[(whitenoise_trials.stimfile == stimulus_name) & (whitenoise_trials.duration == 10)] # & (whitenoise_trials.inverted == False)] conditions = [] for k in kernels: for ps in population_sizes: for d in delays: conditions.append((k, ps, d)) processed_list = Parallel(n_jobs=num_cores)(delayed(mutual_info_heterogeneous_population)(whitenoise_trials, data_location, stim_location, population_size=condition[1], delay=condition[2], repeats=repeats, kernel_sigma=condition[0], saving=saving, delay_type=delay_type) for condition in conditions) for pr in processed_list: if pr is not None: result_dicts.extend(pr) df = pd.DataFrame(result_dicts) return df