123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- import os
- import numpy as np
- import scipy.signal as signal
- from joblib import Parallel, delayed
- from ..util import LIF, mutual_info, default_job_count
- def whitenoise(cflow, cfup, dt, duration, rng=np.random):
- """Band-limited white noise.
- Generates white noise with a flat power spectrum between `cflow` and
- `cfup` Hertz, zero mean and unit standard deviation. Note, that in
- particular for short segments of the generated noise the mean and
- standard deviation can deviate from zero and one.
- Parameters
- ----------
- cflow: float
- Lower cutoff frequency in Hertz.
- cfup: float
- Upper cutoff frequency in Hertz.
- dt: float
- Time step of the resulting array in seconds.
- duration: float
- Total duration of the resulting array in seconds.
- Returns
- -------
- noise: 1-D array
- White noise.
- """
- # next power of two:
- n = int(duration//dt)
- nn = int(2**(np.ceil(np.log2(n))))
- # draw random numbers in Fourier domain:
- inx0 = int(np.round(dt*nn*cflow))
- inx1 = int(np.round(dt*nn*cfup))
- if inx0 < 0:
- inx0 = 0
- if inx1 >= nn/2:
- inx1 = nn/2
- sigma = 0.5 / np.sqrt(float(inx1 - inx0))
- whitef = np.zeros((nn//2+1), dtype=complex)
- if inx0 == 0:
- whitef[0] = rng.randn()
- inx0 = 1
- if inx1 >= nn//2:
- whitef[nn//2] = rng.randn()
- inx1 = nn//2-1
- m = inx1 - inx0 + 1
- whitef[inx0:inx1+1] = rng.randn(m) + 1j*rng.randn(m)
- # inverse FFT:
- noise = np.real(np.fft.irfft(whitef))[:n]*sigma*nn
- return noise
- def create_noise_stimulus(duration, cutoff, dt, amplitude):
- """Create band-limited Gaussian noise stimulus.
- Parameters
- ----------
- duration : float
- duration of the stimulus in seconds.
- cutoff : float
- the cutoff frequency in Hertz.
- dt : float
- the temporal resolution in seconds.
- amplitude : float
- the amplitude of the stimulus.
- Returns
- -------
- np.ndarray
- the stimulus
- """
- print("generating stimulus...", end="")
- nyquist = 1./dt/2 # Hz
- stimulus = np.random.randn(int(np.round(duration/dt))) * amplitude
- b, a = signal.butter(8, cutoff/nyquist)
- stimulus = signal.filtfilt(b, a, stimulus)
- stimulus[0] = np.mean(stimulus)
- print(" done")
- return stimulus
- def create_lif_responses(num_trials, stimulus, dt, noise=0., tau_m=0.00125):
- """Creates a set of model responses to the stimulus.
- Parameters
- ----------
- num_trials : int
- number of responses
- stimulus : np.ndarray
- the stimulus
- dt : float
- the stepsize of the stimulation. Given in s.
- noise : float
- noise in the model. Defaults tp 0.0
- tau_m : float, optional
- the membrane time-constant. Defaults to 0.005 s.
- Returns
- -------
- list of np.ndarray
- list of spike respones.
- """
- spikes = []
- lif_model = LIF(stepsize=dt, offset=2.5, tau_m=tau_m, D=noise)
- for i in range(num_trials):
- _, _ , spike_times = lif_model.run_stimulus(stimulus)
- spikes.append(spike_times)
- return spikes
- def create_population(spike_respones, pop_size, cell_density=0, conduction_velocity=50.):
- """
- Creates a random population of the spike_responses from the set of passed responses. If cell_density is larger than 0 the responses will be delayed according to the conduction delay.
- The function assumes a constant distance between "cells".
- Parameters
- ----------
- spike_respones : list of lists
- Each entry is a list/array of spike times
- pop_size : int
- the population size.
- cell_density : float, optional
- cell density in m^-1. Defaults to 0, which indicates infinitely, i.e. no conduction delays.
- conduction_velocity :float, optional
- The conduction velocity. Defaults to 50 m/s
- Returns
- -------
- list of np.ndarray
- randomly selected and delayed responses.
- """
- assert(conduction_velocity > 0.0)
- assert(pop_size <= len(spike_respones))
- indices = np.arange(len(spike_respones), dtype=int)
- np.random.shuffle(indices)
- population = []
- for i in range(pop_size):
- spike_times = spike_respones[indices[i]]
- spike_times = np.asarray(spike_times)
- delay = 0.0
- if cell_density > 0:
- delay = i / cell_density / conduction_velocity
- population.append(spike_times + delay)
- return population
- def get_population_information(spike_responses, stimulus, population_size, density=0., cutoff=(0., 500.),
- kernel_sigma=0.0005, stepsize=0.0001, trial_duration=10., repeats=1,
- conduction_velocity=50.):
- """Estimate the amount of information carried by the population response.
- Parameters
- ----------
- spike_responses : list of np.ndarray
- list of spike times
- stimulus : np.ndarray
- the stimulus
- population_size : int
- population size
- density : float, optional
- cell density in m^-1. Defaults to 0/m, which indicates infinitely high density, i.e. no conduction delays.
- cutoff : tuple of float, optional
- the lower and upper cutoff frequency of the stimulus. Defaults to (0, 500)
- kernel_sigma : float, optional
- std of the Gaussian kernel used for firing rate estimation. Defaults to 0.0005.
- stepsize : float, optional
- Temporal resolution of the firing rate. Defaults to 0.0001.
- trial_duration : float, optional
- trial duration in seconds. Defaults to 10.
- repeats : int, optional
- number of random populations per population size. Defaults to 1.
- conduction_velocity : float, optional
- conduction velocity in m/s. Defaults to 50 m/s.
-
- Returns
- -------
- np.ndarray
- mutual information between stimulus and population response. Has the shape [num_population_sizes, repeats]
- """
- information = np.zeros(repeats)
-
- for i in range(repeats):
- population_spikes = create_population(spike_responses, population_size, density, conduction_velocity)
- _, _, mi, _, _ = mutual_info(population_spikes, 0.0, [False] * len(population_spikes),
- stimulus, cutoff, kernel_sigma, stepsize=stepsize, trial_duration=trial_duration)
- information[i] = mi[-1]
- return information
- def process_populations(spike_responses, stimulus, population_sizes, density, cutoff=(0., 500.), kernel_sigma=0.0005,
- stepsize=0.0001, trial_duration=10., repeats=1, conduction_velocity=50., num_cores=1):
- """_summary_
- Parameters
- ----------
- spike_responses : list of np.arrays
- containing the spike times in response to the stimulus presentations
- stimulus : np.array
- the stimulus waveform
- population_sizes : list
- list of population sizes that should be analysed
- density : float
- spatial density of model neurons in m^-1
- cutoff : tuple, optional
- lower and upper cutoff frequencies of the analyzed frequency bands by default (0., 500.)
- kernel_sigma : float, optional
- The sigma of the Gaussian kernel used for firing rate estimation, by default 0.0005
- stepsize : float, optional
- temporal stepsize of the simulation, by default 0.0001
- trial_duration : float, optional
- duration of the trials in seconds, by default 10.
- repeats : int, optional
- number of repetitions, by default 1
- conduction_velocity : float, optional
- simulated axonal conduction velocity, by default 50.
- num_cores : int, optional
- number of spawned parallel processes, by default 1
- Returns
- -------
- list
- the results
- """
- processed_list = Parallel(n_jobs=num_cores)(delayed(get_population_information)(spike_responses, stimulus, ps, density, cutoff, kernel_sigma, stepsize, trial_duration, repeats, conduction_velocity) for ps in population_sizes)
- information = np.zeros((len(population_sizes), repeats))
- for i, pr in enumerate(processed_list):
- information[i, :] = pr[0]
- return information
- def generate_responses(num_responses, stimulus, repeats, dt, noise, tau_m, num_cores):
- """Generate spike responses to whitenoise stimulli.
- Parameters
- ----------
- num_responses : int
- number of responses
- stimulus : np.adarray
- The stimulus waveform.
- repeats : int
- number of stimulus repetitions
- dt : float
- time-step of the simulation
- noise : float
- the noise in the LIF model
- tau_m : float
- membrane time constant
- num_cores : int
- number of parallel processes that should be spawned
- Returns
- -------
- list
- spike responses, i.e. np.arrays of spike times
- """
- num_calls = int(num_responses / 5)
- processed_list = Parallel(n_jobs=num_cores)(delayed(create_lif_responses)(repeats, stimulus, dt, noise, tau_m) for i in range(num_calls))
- spikes = []
- for pr in processed_list:
- spikes.extend(pr)
- return spikes
- def main(args=None):
- """Run the simulation.
- """
- if args is None:
- num_cores = default_job_count()
- else:
- num_cores = args.jobs
- dt = 0.00001 # s sr = 100 kHz
- duration = 2 # s
- lower_cutoffs = [0, 100, 200] # Hz
- upper_cutoffs = [100, 200, 300] # Hz
- amplitude = 0.0625
- num_responses = 300
- noise = 5
- tau_m = 0.0025
- repeats = 5
- kernel_sigma = 0.00125 # s
- density = 2000
- vel_efish = 50.
- vel_squid = 25.
- vel_corp_callosum = 7.0
- for lc, uc in zip(lower_cutoffs, upper_cutoffs):
- print("generating stimulus %i -- %i Hz..." % (lc, uc), end="\r")
- stimulus = whitenoise(lc, uc, dt, duration) * amplitude
- print("generating stimulus %i -- %i Hz... done" % (lc, uc))
- print("generating responses... ", end="\r")
- spikes = generate_responses(num_responses, stimulus, repeats, dt, noise, tau_m, num_cores)
- print("generating responses... done")
- population_sizes = list(range(1, int(np.round(2 * num_responses / 3) + 1), 2))
- print("analysing populations, no delay... ", end="\r", flush=True)
- information = process_populations(spikes, stimulus, population_sizes, 0.0, (lc, uc), kernel_sigma, dt, duration, repeats, vel_efish, num_cores)
- print(r"analysing populations, density: %i m^-1, vel.: %.1f m s^-1 ..." % (density, vel_efish), end="\r", flush=True)
- information_efish = process_populations(spikes, stimulus, population_sizes, density, (lc, uc), kernel_sigma, dt, duration, repeats, vel_efish, num_cores)
- print(r"analysing populations, density: %i m^-1, vel.: %.1f m s^-1 ..." % (density, vel_squid), end="\r", flush=True)
- information_squid = process_populations(spikes, stimulus, population_sizes, density, (lc, uc), kernel_sigma, dt, duration, repeats, vel_squid, num_cores)
- print(r"analysing populations, density: %i m^-1, vel.: %.1f m s^-1 ..." % (density, vel_corp_callosum), end="\r", flush=True)
- information_cc = process_populations(spikes, stimulus, population_sizes, density, (lc, uc), kernel_sigma, dt, duration, repeats, vel_corp_callosum, num_cores)
- print(r"analysing populations ... done" + " " * 80)
- velocities = {"efish": vel_efish, "squid": vel_squid, "corpus callosum": vel_corp_callosum}
- np.savez_compressed(os.path.join("derived_data", f"lif_simulation_{int(lc)}_{int(uc)}_test.npz"), population_sizes=population_sizes,
- info_no_delays=information, info_delay_efish=information_efish,
- info_delay_squid=information_squid, info_delay_cc=information_cc,
- cutoffs=(lc, uc), density=density, velocities=velocities)
- def command_line_parser(parent_parser):
- parser = parent_parser.add_parser("population", help="Calculate the mutual information as a function of delay and population size.")
- parser.add_argument("-j", "--jobs", type=int, default=default_job_count(), help=f"The number of parallel processes spawned for the analyses (defaults to {default_job_count()})")
- parser.set_defaults(func=main)
|