123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- import os
- import numpy as np
- import pandas as pd
- import rlxnix as rlx
- from joblib import Parallel, delayed
- from IPython import embed
- from ..util import load_white_noise_stim, mutual_info, DelayType
- spike_buffer = {}
- stimulus_name = "gwn300Hz10s0.3.dat"
- def population_spikes(df, data_location, population_size, subtract_delay=True):
- """Assembles the spike times for a 'population' of neurons.
- Parameters
- ----------
- df : pd.DataFrame
- The DataFrame containing the information about where to find the recorded white noise responses.
- population_size : int
- The size of the population, i.e. the number of randomly selected trials.
- subtract_delay : bool, optional
- Defines whether the neuronal delay should be subtracted from the spike times, by default True
- Returns
- -------
- list:
- list of spike times of the randomly selected trials.
- list:
- list containing whether or not the response needs to be inverted.
- list:
- the respective cell ids
- list:
- the respective trial ids.
- """
- cell_indices = np.arange(len(df.dataset_id.unique()))
- np.random.shuffle(cell_indices)
- cells = df.dataset_id.unique()[cell_indices[:population_size]]
- spikes = []
- inversion_needed = []
- cell_ids = []
- trial_ids = []
- for c in cells:
- if c in spike_buffer.keys():
- # print(f"Found spikes of cell {c} in buffer!")
- all_spikes = spike_buffer[c]
- else:
- # print(f"Reading spikes of cell {c} from file!")
- dataset = rlx.Dataset(os.path.join(data_location, c + ".nix"))
- trace_name = "spikes-1" if "spikes-1" in dataset.event_traces else "Spikes-1"
- all_spikes = dataset.event_traces[trace_name].data_array[:]
- spike_buffer[c] = all_spikes
- cell_trials = df[df.dataset_id == c]
- cell_indexes = cell_trials.index.values
- trial_index = cell_indexes[np.random.randint(len(cell_indexes))]
- trial = cell_trials[cell_trials.index == trial_index]
- trial_spikes = all_spikes[(all_spikes >= trial.start_time.values[0]) & (all_spikes < trial.end_time.values[0])] - trial.start_time.values[0]
- if subtract_delay:
- trial_spikes -= trial.delay.values[0] # compensate for the delay between stimulus and response
- spikes.append(trial_spikes[trial_spikes > 0])
- inversion_needed.append(trial.inverted.values[0])
- cell_ids.append(c)
- trial_ids.append(trial.stimulus_index.values[0])
- return spikes, inversion_needed, cell_ids, trial_ids
- def mutual_info_heterogeneous_population(df, data_location, stim_location, population_size=10, delay=0.01, repeats=10,
- kernel_sigma=0.00125, saving=False, result_location=".", delay_type=DelayType.Equal):
- """Calculates the mutual information between the stimulus and the population response assembled from random selections of neuronal responses.
- The population response is the firing rate (estimated by kernel convolution with a Gaussian kernel of the given kernel sigma) averaged across responses.
- Parameters
- ----------
- df : pandas.DataFrame
- The DataFrame holding information about the white noise driven responses recorded in the neurons.
- data_location : _type_
- The folder containing the raw data.
- stim_location : _type_
- The folder in which the stimuli are stored.
- population_size : int, optional
- The population size, by default 10
- delay : float, optional
- The desired average artificial delay between stimulus and response, if negative, the cell delay will not be subtracted, by default 0.01
- repeats : int, optional
- Number of repetitions for the given combination of kernel and delay, by default 10
- kernel_sigma : float, optional
- The standard deviation of the simulated 'synaptic' gaussian kernel, by default 0.00125
- saving : bool, optional
- if True, the population responses are stored to disk, by default False
- result_location : str, optional
- The location the population responses should be stored to, by default "."
- delay_type : DelayType, optional
- The delay type to be used for the artificial delay, by default DelayType.Equal
- Returns
- -------
- list of dictionaries
- containing the analysis results
-
- """
- coherences = []
- frequency = []
- _, stimulus = load_white_noise_stim(os.path.join(stim_location, stimulus_name))
- results = []
- for i in range(repeats):
- r = {"pop_size": population_size, "delay": delay, "true_delay": 0.0, "kernel_sigma": kernel_sigma, "num_inversions": 0,
- "population_rate": 0.0, "rate_modulation": 0.0, "snr": 0.0, "mi":0.0, "mi_100": 0.0, "mi_200": 0.0, "mi_300": 0.0,
- "cell_ids":[], "mtag_ids":[], "trial_ids":[], "result_file":"", "delay_type": str(delay_type) }
- print(" "*200, end="\r")
- print(f"delay type: {delay_type}, population size: {population_size}, delay: {delay:.5f}s, kernel: {kernel_sigma:.5f}, repeat: {i}", end="\r" )
- subtract_delay = delay >= 0.0
- pop_spikes, inversion_needed, cell_ids, trial_ids = population_spikes(df, data_location, population_size, subtract_delay)
- f, c, mis, rates, true_delay = mutual_info(pop_spikes, delay, inversion_needed, stimulus, freq_bin_edges=[0, 100, 200, 300], kernel_sigma=kernel_sigma, delay_type=delay_type)
- coherences.append(c)
- frequency = f
- smoothing_kernel = np.ones(19)/19
- sc = np.convolve(c, smoothing_kernel, mode="same")
- max_coh = np.max(sc[f <= 300])
- peak_f = f[np.where(sc == max_coh)]
- upper_cutoff = f[(sc <= max_coh/np.sqrt(2)) & (f > peak_f)][0]
- lower_cutoff = f[(sc <= max_coh/np.sqrt(2)) & (f < peak_f)][-1] if len(f[(sc <= max_coh/np.sqrt(2)) & (f < peak_f)]) > 0 else 0.0
- avg_response = np.mean(rates, axis=1) if population_size > 1 else rates
- avg_rate = np.mean(avg_response)
- rate_error = np.std(rates, axis=1) if population_size > 1 else None
- variability = np.mean(rate_error, axis=0) if rate_error is not None else None
- snr = np.std(avg_rate) / variability if variability is not None else None
- outfile_name = "pop_size%i_repeat%i_delay%.5f.npz" % (population_size, i, delay)
- outfile_full = os.path.join(result_location, outfile_name)
- if saving:
- np.savez_compressed(outfile_full, coherences=coherences, frequencies=frequency, avg_rate=avg_rate, rate_std=rate_error)
- r["true_delay"] = true_delay
- r["num_inversions"] = np.sum(inversion_needed)
- r["population_rate"] = avg_rate
- r["rate_modulation"] = np.std(avg_response)
- r["peak_coh"] = max_coh
- r["upper_cutoff"] = upper_cutoff
- r["lower_cutoff"] = lower_cutoff
- r["peak_freq"] = peak_f
- r["snr"] = snr if snr is not None else 0.0
- r["variability"] = variability if variability is not None else 0.0
- r["mi"] = mis[0]
- r["mi_100"] = mis[1]
- r["mi_200"] = mis[2]
- r["mi_300"] = mis[3]
- r["cell_ids"] = cell_ids
- r["trial_ids"] = trial_ids
- r["result_file"] = outfile_name if not saving else ""
- results.append(r)
- return results
- def run_heterogeneous_population_analysis(whitenoise_trial_file, data_location, stim_location,
- population_sizes=[2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30],
- delays=[0.0, 0.0005, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.01, 0.0125, 0.015],
- kernels=[0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.005, 0.01, 0.015, 0.02, 0.025],
- repeats=50, saving=False, num_cores=1, delay_type=DelayType.Equal):
- """Runs the population analysis for heterogeneous populations.
- Parameters
- ----------
- whitenoise_trial_file : str
- The dataframe containing the information of the white noise recordings.
- data_location : str
- The path to the raw data.
- stim_location : str
- The path to the folder containing the stimuli.
- population_sizes : list of ints, optional
- List of population sizes, by default [2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30]
- delays : list, optional
- list of artificial delays added to the spike times. -1 indicates that the original delay should not be changed, by default [0.0, 0.0005, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.01, 0.0125, 0.015]
- kernels : list, optional
- list of Gaussian kernel standard deviations used for firing rate estimation, by default [0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.005, 0.01, 0.015, 0.02, 0.025]
- repeats : int, optional
- Maximum number of repetitions for each combination of population sizes, delays, kernels. For each repetition a new set of responses will be used. By default 50
- saving : bool, optional
- Flag that indicates whether or not coherence spectra, firing rates etc should be saved, by default False
- num_cores : int, optional
- number of parallel jobs spawned to run the analyses, by default 1
- delay_type : DelayType
- The type of delay added to the spike responses. By default DelayType.Equal
- Returns
- -------
- pd.DataFrame
- The DataFrame containing the analysis results.
- """
- result_dicts = []
- whitenoise_trials = pd.read_csv(whitenoise_trial_file, sep=";", index_col=0)
- whitenoise_trials = whitenoise_trials[(whitenoise_trials.stimfile == stimulus_name) & (whitenoise_trials.duration == 10)] # & (whitenoise_trials.inverted == False)]
- conditions = []
- for k in kernels:
- for ps in population_sizes:
- for d in delays:
- conditions.append((k, ps, d))
- processed_list = Parallel(n_jobs=num_cores)(delayed(mutual_info_heterogeneous_population)(whitenoise_trials, data_location, stim_location,
- population_size=condition[1],
- delay=condition[2],
- repeats=repeats,
- kernel_sigma=condition[0],
- saving=saving,
- delay_type=delay_type) for condition in conditions)
- for pr in processed_list:
- if pr is not None:
- result_dicts.extend(pr)
- df = pd.DataFrame(result_dicts)
- return df
|