import os import numpy as np import scipy.signal as signal from joblib import Parallel, delayed from ..util import LIF, mutual_info, default_job_count def whitenoise(cflow, cfup, dt, duration, rng=np.random): """Band-limited white noise. Generates white noise with a flat power spectrum between `cflow` and `cfup` Hertz, zero mean and unit standard deviation. Note, that in particular for short segments of the generated noise the mean and standard deviation can deviate from zero and one. Parameters ---------- cflow: float Lower cutoff frequency in Hertz. cfup: float Upper cutoff frequency in Hertz. dt: float Time step of the resulting array in seconds. duration: float Total duration of the resulting array in seconds. Returns ------- noise: 1-D array White noise. """ # next power of two: n = int(duration//dt) nn = int(2**(np.ceil(np.log2(n)))) # draw random numbers in Fourier domain: inx0 = int(np.round(dt*nn*cflow)) inx1 = int(np.round(dt*nn*cfup)) if inx0 < 0: inx0 = 0 if inx1 >= nn/2: inx1 = nn/2 sigma = 0.5 / np.sqrt(float(inx1 - inx0)) whitef = np.zeros((nn//2+1), dtype=complex) if inx0 == 0: whitef[0] = rng.randn() inx0 = 1 if inx1 >= nn//2: whitef[nn//2] = rng.randn() inx1 = nn//2-1 m = inx1 - inx0 + 1 whitef[inx0:inx1+1] = rng.randn(m) + 1j*rng.randn(m) # inverse FFT: noise = np.real(np.fft.irfft(whitef))[:n]*sigma*nn return noise def create_noise_stimulus(duration, cutoff, dt, amplitude): """Create band-limited Gaussian noise stimulus. Parameters ---------- duration : float duration of the stimulus in seconds. cutoff : float the cutoff frequency in Hertz. dt : float the temporal resolution in seconds. amplitude : float the amplitude of the stimulus. Returns ------- np.ndarray the stimulus """ print("generating stimulus...", end="") nyquist = 1./dt/2 # Hz stimulus = np.random.randn(int(np.round(duration/dt))) * amplitude b, a = signal.butter(8, cutoff/nyquist) stimulus = signal.filtfilt(b, a, stimulus) stimulus[0] = np.mean(stimulus) print(" done") return stimulus def create_lif_responses(num_trials, stimulus, dt, noise=0., tau_m=0.00125): """Creates a set of model responses to the stimulus. Parameters ---------- num_trials : int number of responses stimulus : np.ndarray the stimulus dt : float the stepsize of the stimulation. Given in s. noise : float noise in the model. Defaults tp 0.0 tau_m : float, optional the membrane time-constant. Defaults to 0.005 s. Returns ------- list of np.ndarray list of spike respones. """ spikes = [] lif_model = LIF(stepsize=dt, offset=2.5, tau_m=tau_m, D=noise) for i in range(num_trials): _, _ , spike_times = lif_model.run_stimulus(stimulus) spikes.append(spike_times) return spikes def create_population(spike_respones, pop_size, cell_density=0, conduction_velocity=50.): """ Creates a random population of the spike_responses from the set of passed responses. If cell_density is larger than 0 the responses will be delayed according to the conduction delay. The function assumes a constant distance between "cells". Parameters ---------- spike_respones : list of lists Each entry is a list/array of spike times pop_size : int the population size. cell_density : float, optional cell density in m^-1. Defaults to 0, which indicates infinitely, i.e. no conduction delays. conduction_velocity :float, optional The conduction velocity. Defaults to 50 m/s Returns ------- list of np.ndarray randomly selected and delayed responses. """ assert(conduction_velocity > 0.0) assert(pop_size <= len(spike_respones)) indices = np.arange(len(spike_respones), dtype=int) np.random.shuffle(indices) population = [] for i in range(pop_size): spike_times = spike_respones[indices[i]] spike_times = np.asarray(spike_times) delay = 0.0 if cell_density > 0: delay = i / cell_density / conduction_velocity population.append(spike_times + delay) return population def get_population_information(spike_responses, stimulus, population_size, density=0., cutoff=(0., 500.), kernel_sigma=0.0005, stepsize=0.0001, trial_duration=10., repeats=1, conduction_velocity=50.): """Estimate the amount of information carried by the population response. Parameters ---------- spike_responses : list of np.ndarray list of spike times stimulus : np.ndarray the stimulus population_size : int population size density : float, optional cell density in m^-1. Defaults to 0/m, which indicates infinitely high density, i.e. no conduction delays. cutoff : tuple of float, optional the lower and upper cutoff frequency of the stimulus. Defaults to (0, 500) kernel_sigma : float, optional std of the Gaussian kernel used for firing rate estimation. Defaults to 0.0005. stepsize : float, optional Temporal resolution of the firing rate. Defaults to 0.0001. trial_duration : float, optional trial duration in seconds. Defaults to 10. repeats : int, optional number of random populations per population size. Defaults to 1. conduction_velocity : float, optional conduction velocity in m/s. Defaults to 50 m/s. Returns ------- np.ndarray mutual information between stimulus and population response. Has the shape [num_population_sizes, repeats] """ information = np.zeros(repeats) for i in range(repeats): population_spikes = create_population(spike_responses, population_size, density, conduction_velocity) _, _, mi, _, _ = mutual_info(population_spikes, 0.0, [False] * len(population_spikes), stimulus, cutoff, kernel_sigma, stepsize=stepsize, trial_duration=trial_duration) information[i] = mi[-1] return information def process_populations(spike_responses, stimulus, population_sizes, density, cutoff=(0., 500.), kernel_sigma=0.0005, stepsize=0.0001, trial_duration=10., repeats=1, conduction_velocity=50., num_cores=1): """_summary_ Parameters ---------- spike_responses : list of np.arrays containing the spike times in response to the stimulus presentations stimulus : np.array the stimulus waveform population_sizes : list list of population sizes that should be analysed density : float spatial density of model neurons in m^-1 cutoff : tuple, optional lower and upper cutoff frequencies of the analyzed frequency bands by default (0., 500.) kernel_sigma : float, optional The sigma of the Gaussian kernel used for firing rate estimation, by default 0.0005 stepsize : float, optional temporal stepsize of the simulation, by default 0.0001 trial_duration : float, optional duration of the trials in seconds, by default 10. repeats : int, optional number of repetitions, by default 1 conduction_velocity : float, optional simulated axonal conduction velocity, by default 50. num_cores : int, optional number of spawned parallel processes, by default 1 Returns ------- list the results """ processed_list = Parallel(n_jobs=num_cores)(delayed(get_population_information)(spike_responses, stimulus, ps, density, cutoff, kernel_sigma, stepsize, trial_duration, repeats, conduction_velocity) for ps in population_sizes) information = np.zeros((len(population_sizes), repeats)) for i, pr in enumerate(processed_list): information[i, :] = pr[0] return information def generate_responses(num_responses, stimulus, repeats, dt, noise, tau_m, num_cores): """Generate spike responses to whitenoise stimulli. Parameters ---------- num_responses : int number of responses stimulus : np.adarray The stimulus waveform. repeats : int number of stimulus repetitions dt : float time-step of the simulation noise : float the noise in the LIF model tau_m : float membrane time constant num_cores : int number of parallel processes that should be spawned Returns ------- list spike responses, i.e. np.arrays of spike times """ num_calls = int(num_responses / 5) processed_list = Parallel(n_jobs=num_cores)(delayed(create_lif_responses)(repeats, stimulus, dt, noise, tau_m) for i in range(num_calls)) spikes = [] for pr in processed_list: spikes.extend(pr) return spikes def main(args=None): """Run the simulation. """ if args is None: num_cores = default_job_count() else: num_cores = args.jobs dt = 0.00001 # s sr = 100 kHz duration = 2 # s lower_cutoffs = [0, 100, 200] # Hz upper_cutoffs = [100, 200, 300] # Hz amplitude = 0.0625 num_responses = 300 noise = 5 tau_m = 0.0025 repeats = 5 kernel_sigma = 0.00125 # s density = 2000 vel_efish = 50. vel_squid = 25. vel_corp_callosum = 7.0 for lc, uc in zip(lower_cutoffs, upper_cutoffs): print("generating stimulus %i -- %i Hz..." % (lc, uc), end="\r") stimulus = whitenoise(lc, uc, dt, duration) * amplitude print("generating stimulus %i -- %i Hz... done" % (lc, uc)) print("generating responses... ", end="\r") spikes = generate_responses(num_responses, stimulus, repeats, dt, noise, tau_m, num_cores) print("generating responses... done") population_sizes = list(range(1, int(np.round(2 * num_responses / 3) + 1), 2)) print("analysing populations, no delay... ", end="\r", flush=True) information = process_populations(spikes, stimulus, population_sizes, 0.0, (lc, uc), kernel_sigma, dt, duration, repeats, vel_efish, num_cores) print(r"analysing populations, density: %i m^-1, vel.: %.1f m s^-1 ..." % (density, vel_efish), end="\r", flush=True) information_efish = process_populations(spikes, stimulus, population_sizes, density, (lc, uc), kernel_sigma, dt, duration, repeats, vel_efish, num_cores) print(r"analysing populations, density: %i m^-1, vel.: %.1f m s^-1 ..." % (density, vel_squid), end="\r", flush=True) information_squid = process_populations(spikes, stimulus, population_sizes, density, (lc, uc), kernel_sigma, dt, duration, repeats, vel_squid, num_cores) print(r"analysing populations, density: %i m^-1, vel.: %.1f m s^-1 ..." % (density, vel_corp_callosum), end="\r", flush=True) information_cc = process_populations(spikes, stimulus, population_sizes, density, (lc, uc), kernel_sigma, dt, duration, repeats, vel_corp_callosum, num_cores) print(r"analysing populations ... done" + " " * 80) velocities = {"efish": vel_efish, "squid": vel_squid, "corpus callosum": vel_corp_callosum} np.savez_compressed(os.path.join("derived_data", f"lif_simulation_{int(lc)}_{int(uc)}_test.npz"), population_sizes=population_sizes, info_no_delays=information, info_delay_efish=information_efish, info_delay_squid=information_squid, info_delay_cc=information_cc, cutoffs=(lc, uc), density=density, velocities=velocities) def command_line_parser(parent_parser): parser = parent_parser.add_parser("population", help="Calculate the mutual information as a function of delay and population size.") parser.add_argument("-j", "--jobs", type=int, default=default_job_count(), help=f"The number of parallel processes spawned for the analyses (defaults to {default_job_count()})") parser.set_defaults(func=main)