123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import os
- import logging
- import numpy as np
- import pandas as pd
- import rlxnix as rlx
- from scipy.stats import circmean
- from scipy.signal import vectorstrength
- from joblib import Parallel, delayed
- from ..util import read_file_list, detect_eod_times
- def burst_fraction(spike_times, eod_period, threshold=1.5):
- """Calculate the fraction of spontaneous spikes that are fired in bursts (0. -> 1.0).
- Bursts are assumed for spikes that follow preceding spikes with a inter-spike-interval
- less than threshold times the eod period.
- Parameters
- ----------
- spike_times : np.array
- the spike times in seconds
- eod_period : float
- the duration of the EOD period in seconds
- threshold : float, optional
- The multiplier of the EOD period that defines the burst threshold, by default 1.5
- Returns
- -------
- float
- the burst fraction.
- """
- logging.info("\t\tCalculating burst fraction ...")
- isis = np.diff(spike_times)
- bf = len(isis[isis < threshold * eod_period]) / len(isis)
- return bf
- def vector_strength(spike_times, eod_times):
- """Calculates the vector strength. Characterizes the temporal locking of the spontaneous P-unit spikes
- to the EOD using scipy's implementation of the vector strength.
- Parameters
- ----------
- spike_times : np.array
- the spike times in seconds
- eod_times : np.array
- The eod times, i.e. zero crossings in seconds
- Returns
- -------
- np.array
- the phase of each spike in radiant
- np.array
- the absolute delay between spike and eod time in seconds
- np.array
- the phase of each spike relative to the eod period.
- float
- the vector strength
- """
- logging.info("\t\tCalculating vector strength ...")
- valid_spikes = spike_times[(spike_times >= eod_times[0]) & (spike_times < eod_times[-1])]
- indices = np.searchsorted(eod_times, valid_spikes)
- starts = eod_times[indices-1]
- ends = eod_times[indices]
- delays_abs = valid_spikes - starts
- delays_rel = delays_abs / (ends - starts)
- phases = delays_rel * 2 * np.pi
- vs, _ = vectorstrength(phases, 2 * np.pi)
- return phases, delays_abs, delays_rel, vs
- def baseline_properties(baseline_repro):
- """Analyzes the spontaneous baseline activity and returns these properties
- in a dictionary
- Parameters
- ----------
- baseline_repro : rlxnix.plugins.efish.baseline.Baseline
- The run of the Baseline Repro that contains the spontaneous activity.
- Returns
- -------
- dict
- dictionary of baseline properties.
- """
- logging.info(f"\tBaseline properties ...")
- spike_times = baseline_repro.spikes()
- eod, time = baseline_repro.eod(trace_name='LocalEOD-1')
- bf = burst_fraction(spike_times, 1./baseline_repro.eod_frequency)
- eod_times, _ = detect_eod_times(time, eod - np.mean(eod), threshold=0.5*np.max(eod), running_average=3)
- eodf = len(eod_times) / baseline_repro.duration
- phi, da, _, vs = vector_strength(spike_times, eod_times)
- properties = {"eod_frequency": eodf,
- "eod_period": 1/eodf,
- "firing_rate": baseline_repro.baseline_rate,
- "cv": baseline_repro.baseline_cv,
- "vector_strength": vs, "burst_fraction": bf,
- "phase": circmean(phi), "phase_time": np.mean(da)}
- return properties
- def analyze_cell(dataset_name, data_folder):
- logging.info("Analyzing cell {dataset_name} ...")
- print(dataset_name)
- filename = os.path.join(data_folder, dataset_name + ".nix")
- if not os.path.exists(filename):
- logging.error(f"NIX file for dataset {dataset_name} not found {filename}!")
- return None
- d = rlx.Dataset(filename)
- baseline_repros = d.repro_runs("BaselineActivity")
- if len(baseline_repros) == 0:
- return None
- baseline = baseline_repros[0] # for now just analyze the first one
- baseline_results = baseline_properties(baseline)
- baseline_results["dataset_id"] = dataset_name
- return baseline_results
- def run_baseline_analysis(list_folder, data_folder, num_cores=1):
- logging.basicConfig(level=logging._nameToLevel["WARNING"], force=True)
- datasets = read_file_list(os.path.join(list_folder, "baseline_datasets.dat"))
- processed_list = Parallel(n_jobs=num_cores)(delayed(analyze_cell)(dataset, data_folder) for dataset in datasets)
- df = pd.DataFrame(processed_list)
- return df
|