123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- import os
- import numpy as np
- import pandas as pd
- import itertools as it
- from .lif_simulation import whitenoise
- from .lif_simulation import generate_responses
- from ..util import mutual_info, default_job_count, DelayType
- from joblib import Parallel, delayed
- def create_population(spike_responses, pop_size, delay_type, additional_delay):
- """
- Creates a random population of the spike_responses from the set of passed responses. If cell_density is larger than 0 the responses will be delayed according to the conduction delay.
- The function assumes a constant distance between "cells".
- Parameters
- ----------
- spike_respones : list of lists
- Each entry is a list/array of spike times
- pop_size : int
- the population size.
- cell_density : float, optional
- cell density in m^-1. Defaults to 0, which indicates infinitely, i.e. no conduction delays.
- conduction_velocity :float, optional
- The conduction velocity. Defaults to 50 m/s
- Returns
- -------
- list of np.ndarray
- randomly selected and delayed responses.
- """
- indices = np.arange(len(spike_responses), dtype=int)
- rng = np.random.default_rng()
- rng.shuffle(indices)
- population = []
- for i in range(pop_size):
- spike_times = spike_responses[indices[i]]
- spike_times = np.asarray(spike_times)
- delay = 0.0
- if additional_delay > 0.0:
- if delay_type == DelayType.Equal:
- delay = (rng.random() - 0.5) * additional_delay * 2
- elif delay_type == DelayType.EqualPositive:
- delay = rng.random() * additional_delay * 2
- elif delay_type == DelayType.Gaussian:
- delay = rng.normal(0.0, additional_delay)
- elif delay_type == DelayType.GaussianPositive:
- delay = rng.normal(additional_delay * 4, additional_delay)
- population.append(spike_times + delay)
- return population
- def get_population_information(spike_responses, stimulus, population_size, delay_type, delay=0.0,
- kernel_sigma=0.0005, stepsize=0.0001, trial_duration=10., repeats=1,
- cutoff=(0., 300.)):
- """Estimate the amount of information carried by the population response.
- Parameters
- ----------
- spike_responses : list of np.ndarray
- list of spike times
- stimulus : np.ndarray
- the stimulus
- population_size : int
- population size
- density : float, optional
- cell density in m^-1. Defaults to 0/m, which indicates infinitely high density, i.e. no conduction delays.
- cutoff : tuple of float, optional
- the lower and upper cutoff frequency of the stimulus. Defaults to (0, 500)
- kernel_sigma : float, optional
- std of the Gaussian kernel used for firing rate estimation. Defaults to 0.0005.
- stepsize : float, optional
- Temporal resolution of the firing rate. Defaults to 0.0001.
- trial_duration : float, optional
- trial duration in seconds. Defaults to 10.
- repeats : int, optional
- number of random populations per population size. Defaults to 1.
- conduction_velocity : float, optional
- conduction velocity in m/s. Defaults to 50 m/s.
-
- Returns
- -------
- np.ndarray
- mutual information between stimulus and population response. Has the shape [num_population_sizes, repeats]
- """
- information = np.zeros(repeats)
- for i in range(repeats):
- population_spikes = create_population(spike_responses, population_size, delay_type, delay)
- _, _, mi, _, _ = mutual_info(population_spikes, 0.0, [False] * len(population_spikes),
- stimulus, cutoff, kernel_sigma, stepsize=stepsize, trial_duration=trial_duration)
- information[i] = mi[-1]
- info = {"delay_type": str(delay_type), "kernel_sigma": kernel_sigma, "pop_size": population_size, "cutoff": cutoff, "delay": delay, "mi": np.mean(information), "mi_err": np.std(information)}
- return info
- def process_populations(spike_responses, stimulus, population_size, kernels=[0.0005], delays=[0.0],
- delay_type=DelayType.Gaussian, stepsize=0.0001, trial_duration=10., repeats=1, num_cores=1):
- """_summary_
- Parameters
- ----------
- spike_responses : list of np.arrays
- containing the spike times in response to the stimulus presentations
- stimulus : np.array
- the stimulus waveform
- population_sizes : list
- list of population sizes that should be analysed
- density : float
- spatial density of model neurons in m^-1
- cutoff : tuple, optional
- lower and upper cutoff frequencies of the analyzed frequency bands by default (0., 500.)
- kernel_sigma : float, optional
- The sigma of the Gaussian kernel used for firing rate estimation, by default 0.0005
- stepsize : float, optional
- temporal stepsize of the simulation, by default 0.0001
- trial_duration : float, optional
- duration of the trials in seconds, by default 10.
- repeats : int, optional
- number of repetitions, by default 1
- conduction_velocity : float, optional
- simulated axonal conduction velocity, by default 50.
- num_cores : int, optional
- number of spawned parallel processes, by default 1
- Returns
- -------
- list
- the results
- """
- combinations = it.product(delays, kernels)
- processed_list = Parallel(n_jobs=num_cores)(delayed(get_population_information)(spike_responses, stimulus, population_size, delay_type, d, k, stepsize, trial_duration, repeats) for d, k in combinations)
- df = pd.DataFrame(processed_list)
- return df
- def run_simulations(args=None):
- delays = [0.0, 0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.007, 0.008, 0.009, 0.01, 0.0125, 0.015]
- kernels=[0.000125, 0.00025, 0.0005, 0.001, 0.002, 0.003, 0.005, 0.01, 0.015, 0.02, 0.025]
- delay_types = [DelayType.Equal, DelayType.Gaussian, DelayType.EqualPositive, DelayType.GaussianPositive]
- lc = 0 # lower cutoff
- uc = 300 # upper cutoff
- dt = 0.00001 # s sr = 100 kHz
- duration = 2 # s
- amplitude = 0.0625
- num_responses = 30
- noise = 5
- tau_m = 0.0025
- repeats = 5
- num_cores = default_job_count() if args is None else args.jobs
- population_size = 20 if args is None else args.pop_size
- outfile = "test.csv" if args is None else args.outfile
- print("generating stimulus %i -- %i Hz..." % (lc, uc), end="\r")
- stimulus = whitenoise(lc, uc, dt, duration) * amplitude
- print("generating stimulus %i -- %i Hz... done" % (lc, uc))
- print("generating responses... ", end="\r")
- spikes = generate_responses(num_responses, stimulus, repeats, dt, noise, tau_m, num_cores)
- print("generating responses... done")
- results = None
- for i, delay_type in enumerate(delay_types):
- message = f"analysing populations [{i+1}/{len(delay_types)}], delay type: {str(delay_type).upper()}... "
- print(message, end="\r", flush=True)
- result = process_populations(spikes, stimulus, population_size, kernels, delays, delay_type, dt, duration, repeats, num_cores)
- if results is None:
- results = result
- else:
- results = results.append(result, ignore_index=True)
- print(message + "\t done!", end="\n")
- results.to_csv(outfile, sep=";")
- def command_line_parser(parent_parser):
- parser = parent_parser.add_parser("surface", help="Calculate the mutual information for a large surface that spans the synaptic kernel and the delays for different delay_types.")
- parser.add_argument("-j", "--jobs", type=int, default=default_job_count(), help=f"The number of parallel processes spawned for the analyses (defaults to {default_job_count()})")
- parser.add_argument("-p", "--pop_size", type=int, default=20, help=f"The population size (defaults to 20)")
- parser.add_argument("-dt", "--delay_type", type=str, default=str(DelayType.Gaussian), help="The type of distribution from which delays have been drawn. Usually a Gaussian normal distribution.")
- parser.add_argument("-o", "--outfile", type=str, default=os.path.join("derived_data", "lif_mi_surface.csv"), help="The destination file.")
- parser.set_defaults(func=run_simulations)
- if __name__ == "__main__":
- run_simulations()
|