123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- import os
- import math
- import random
- import logging
- import numpy as np
- import pandas as pd
- import rlxnix as rlx
- import nixio as nix
- import itertools as it
- from joblib import Parallel, delayed
- from simplejson import load
- from code.util import load_white_noise_stim, mutual_info
- from IPython import embed
- stimulus_name = "gwn300Hz10s0.3.dat"
- def mutual_info_homogenous_population(all_spikes, stimulus_indices, stimulus, kernel, repeats, dataset_name, contrast, cv, burstiness):
- """working horse of the analysis...
- Parameters
- ----------
- all_spikes : list
- list of np.arrays holding the spike times
- stimulus_indices : list
- list of stimulus ids
- stimulus : np.array
- the stimulus waveform
- kernel : float
- The sigma of the Gaussian kernel used to create firing rates
- repeats : int
- The maximal number of repeats for each population size
- dataset_name : str
- the name (id) of the analyzed dataset
- contrast : float
- the stimulus contrast (i.e. intensity)
- cv : float
- The coefficient of variation of the interspike intervals of the baseline response
- burstiness : float
- The burstiness of the baseline response
- Returns
- -------
- list of dictionaries
- the analysis results
- """
- results = []
- populations_sizes = np.arange(1, len(all_spikes) + 1)
- for ps in populations_sizes:
- possible_combinations = None
- shuf = None
- combination_count = math.factorial(len(all_spikes)) / (math.factorial(ps) * math.factorial(len(all_spikes) - ps))
- if combination_count < 10000:
- possible_combinations = list(it.combinations(np.arange(len(all_spikes)), ps))
- shuf = np.arange(len(possible_combinations))
- np.random.shuffle(shuf)
- combination_count = len(possible_combinations)
-
- coherences = []
- count = 0
- while count < repeats and count < combination_count:
- print("\tpopulation_size: %i, repeat: %i" % (ps, count), end="\r", flush=True)
- if possible_combinations is not None:
- combination = possible_combinations[shuf[count]]
- else:
- combination = random.sample(range(len(all_spikes)), ps)
- results_dict = {"dataset_id": dataset_name, "contrast": contrast, "pop_size": ps, "kernel": kernel, "snr": 0.0, "mi":0.0, "mi_100": 0.0, "mi_200": 0.0, "mi_300": 0.0, "result_file":"n.a."}
-
- spikes = []
- for i in combination:
- spikes.append(all_spikes[i])
- f, coh, mis, rates, _ = mutual_info(spikes, 0.0, np.zeros(len(spikes)), stimulus, freq_bin_edges=[0, 100, 200, 300], kernel_sigma=kernel)
- coherences.append(coh)
- smoothing_kernel = np.ones(19)/19
- sc = np.convolve(coh, smoothing_kernel, mode="same")
- max_coh = np.max(sc[f <= 300])
- peak_f = f[np.where(sc == max_coh)]
- upper_cutoff = f[(sc <= max_coh/np.sqrt(2)) & (f > peak_f)][0]
- lower_cutoff = f[(sc <= max_coh/np.sqrt(2)) & (f < peak_f)][-1] if len(f[(sc <= max_coh/np.sqrt(2)) & (f < peak_f)]) > 0 else 0.0
- avg_response = np.mean(rates, axis=1) if ps > 1 else rates
- avg_rate = np.mean(avg_response)
- rate_error = np.std(rates, axis=1) if ps > 1 else None
- variability = np.mean(rate_error, axis=0) if rate_error is not None else None
- snr = np.std(avg_rate) / variability if variability is not None else None
- results_dict["population_rate"] = avg_rate
- results_dict["rate_modulation"] = np.std(avg_response)
- results_dict["peak_coh"] = max_coh
- results_dict["upper_cutoff"] = upper_cutoff
- results_dict["lower_cutoff"] = lower_cutoff
- results_dict["peak_freq"] = peak_f
- results_dict["variability"] = variability if variability is not None else 0.0
- results_dict["snr"] = snr if snr is not None else 0.0
- results_dict["mi"] = mis[0]
- results_dict["mi_100"] = mis[1]
- results_dict["mi_200"] = mis[2]
- results_dict["mi_300"] = mis[3]
- try:
- results_dict["stimulus_indices"] = stimulus_indices[np.array(combination)]
- except:
- embed()
- exit()
- results_dict["cv"] = cv
- results_dict["burstiness"] = burstiness
- results.append(results_dict)
- count += 1
- print("\n", end="")
- return results
- def get_spikes(trials, dataset):
- """Get all the spike trains that belong to one stimulus condition.
- Paramters
- ----------
- trials : pd.DataFrame
- the trial information
- dataset : rlx.Dataset
- the dataset instance that grants access to the the spikes event trace
- Returns
- -------
- list
- The spike times for each trial as np.ndarray, relative to trial onset in seconds.
- np.ndarray
- The trial indexes
- """
- trace_name = "spikes-1" if "spikes-1" in dataset.event_traces else "Spikes-1"
- all_spikes = dataset.event_traces[trace_name].data_array[:]
- spikes = []
- for s, e in zip(trials.start_time.values, trials.end_time.values):
- spikes.append(all_spikes[(all_spikes >= s) & (all_spikes < e)] - s)
- trial_ids = trials.stimulus_index.unique()
- return spikes, trial_ids
- def homogeneous_population_dataset(dataset_name, trial_information, data_location, stim_location, kernel_size=0.001, repeats=10):
- """
- Parameters
- ----------
- dataset_name : str
- name of the dataset
- trial_information : pd.DataFrame
- DataFrame containing the information about the whitenoise trials recorded in that datasets/cell
- data_location : str
- location of the raw data.
- stim_location : str
- location of the stimulus file.
- kernel_size : float, optional
- sigma of the Gaussian kernel used to create the firing rate, by default 0.001
- repeats : int, optional
- maximal number of different populations created from the trials, by default 10
- Returns
- -------
- list of dictionaries
- the dicts contain the analysis results for each repetition at each population size.
- """
- dataset = rlx.Dataset(os.path.join(data_location, dataset_name + ".nix"))
- _, stimulus = load_white_noise_stim(os.path.join(stim_location, stimulus_name))
- contrasts = trial_information.contrast.unique()
- results = []
- for c in contrasts:
- contrast_trials = trial_information[trial_information.contrast == c]
- cv = contrast_trials.cv.unique()[0]
- burstiness = contrast_trials.burstiness.unique()[0]
- trial_spikes, trial_indices = get_spikes(contrast_trials, dataset)
- print(f"\tcontrast {c}, stimulus: {stimulus_name} number of trials: {len(trial_spikes)}")
- results.extend(mutual_info_homogenous_population(trial_spikes, trial_indices, stimulus, kernel_size,
- repeats,dataset_name, c, cv, burstiness))
- return results
- def process_cell(dataset_name, whitenoise_trials, data_location, stim_location, kernel_size=0.001, repeats=10):
- """entry point for the homogeneous population analysis
- Args:
- dataset (str): the name of the dataset.
- whitenoise_trials (pd.DataFrame): the data frame containing the cell properties as analyzed with "WhiteNoiseOverview.py".
- stim_location (str): path to the stimulus files.
- kernel_size (float, optional): std of the Gaussian kernel used to calculate the firing rates. Defaults to 0.001.
- repeats (int, optional): Number of populations drawn for each population size. Defaults to 10.
- Returns:
- list of dictionaries: the dictionaries contain the analysis results for each repetition and population size.
- """
- logging.info(f"Homogeneous populations: processing {dataset_name} ...")
- trials = whitenoise_trials[(whitenoise_trials.dataset_id == dataset_name)]
- results_dicts = homogeneous_population_dataset(dataset_name, trials, data_location, stim_location, kernel_size, repeats)
- return results_dicts
- def run_homogeneous_population_analysis(whitenoise_trial_file, data_location, stim_location, kernel_size=0.001, repeats=10, num_cores=1):
- """Analyses the coding performance in homogeneous populations constructed from trials of the same cell.
- Parameters
- ----------
- whitenoise_trial_file : pd.DataFrame
- The data frame containing information about all trials recorded with the same whitenoise stimulus.
- data_location : str
- _description_
- stim_location : str
- _description_
- kernel_size : float, optional
- _description_, by default 0.001
- repeats : int, optional
- _description_, by default 10
- num_cores : int, optional
- _description_, by default 1
- Returns
- -------
- _type_
- _description_
- """
- whitenoise_trials = pd.read_csv(whitenoise_trial_file, sep=";", index_col=0)
- whitenoise_trials = whitenoise_trials[(whitenoise_trials.stimfile == stimulus_name) & (whitenoise_trials.duration == 10)]
- datasets = whitenoise_trials.dataset_id.unique()
- processed_list = Parallel(n_jobs=num_cores)(delayed(process_cell)(dataset, whitenoise_trials, data_location, stim_location, kernel_size, repeats) for dataset in datasets)
- results = []
- for pr in processed_list:
- if pr is not None:
- results.extend(pr)
- df = pd.DataFrame(results)
- return df
|