123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import os
- import logging
- import numpy as np
- import pandas as pd
- import rlxnix as rlx
- from scipy.signal import csd
- from scipy.optimize import curve_fit
- from joblib import Parallel, delayed
- from ..util import read_file_list, firing_rate
- def gauss(x,a,x0,sigma):
- return a*np.exp(-(x-x0)**2/(2*sigma**2))
- def receptive_field_position(rr, dt=1./20000., dataset_name=""):
- logging.info(f"\tReceptive field position...")
- if len(rr.stimuli) == 0:
- return None
- durations = rr.stimulus_durations
- fish_length = rr.fish_length
- dfs = rr.stimulus_deltafs
- x_positions, y_positions, z_positions = rr.stimulus_positions
- stim_positions_relative = x_positions / fish_length
- power_at_df = []
- positions = []
- relative_positions = []
- for i, stim in enumerate(rr.stimuli):
- if "ReceptiveField" not in stim.name:
- continue
- if y_positions[i] != 0.0:
- logging.debug(f"skipping position ({x_positions[i]}, {y_positions[i]})")
- continue
- spike_times = rr.spikes(i)
- rate = firing_rate(spike_times, durations[i], sigma=0.001, dt=dt)
- freq, psd = csd(rate, rate, fs=1./dt, nperseg=2**14, noverlap=2**13)
- power_at_df.append(np.sum(psd[(freq > dfs[i] - 2) & (freq < dfs[i] + 2)]))
- positions.append(x_positions[i])
- relative_positions.append(stim_positions_relative[i])
- if len(np.unique(relative_positions)) < 3:
- logging.info(f"{dataset_name}.{rr.name}: less than 3 positions, skipping")
- return None
- relative_positions = np.array(relative_positions)
- power_at_df = np.array(power_at_df)
- rf_position = 0.0
- rf_sigma = 0.0
- position_uncertainty = 0.0
- try:
- popt, pcov = curve_fit(gauss, relative_positions, power_at_df,
- p0=[1, 0.5, 0.25])
- rf_position = popt[1]
- rf_sigma = popt[2]
- if np.any(np.isinf(pcov)):
- logging.info(f"{dataset_name}.{rr.name} fitting was not successful!")
- position_uncertainty = 1.0
- position_uncertainty = np.sqrt(np.diag(pcov))[1]
- except Exception as e:
- sorted_positions = np.sort(np.unique(relative_positions))
- means = np.zeros_like(sorted_positions)
- for i, pos in enumerate(sorted_positions):
- means[i] = np.mean(power_at_df[relative_positions == pos])
- rf_position = sorted_positions[means == np.max(means)][0]
- rf_sigma = 1.0
- position_uncertainty = 1.0
- logging.info(f"Receptive field: position fit failed for {dataset_name} RePro run {rr.name}, falling back to maximum power position...")
- results = {"total_length": fish_length, "trials":len(rr.stimuli),
- "receptor_pos_absolute": rf_position * fish_length,
- "receptor_pos_relative": rf_position,
- "receptive_field_sigma": rf_sigma,
- "position_uncertainty": position_uncertainty,
- }
- return results
- def receptive_field_estimation(dataset_name, data_folder):
- logging.info("Analyzing cell {dataset_name} ...")
- filename = os.path.join(data_folder, dataset_name + ".nix")
- if not os.path.exists(filename):
- logging.error(f"NIX file for dataset {dataset_name} not found {filename}!")
- return None
- d = rlx.Dataset(filename)
-
- dt = d.data_traces[0].sampling_interval
- results = []
- for rf in d.repro_runs("ReceptiveField"):
- r = receptive_field_position(rf, dt, dataset_name)
- if r is not None:
- results.append(r)
- if len(results) == 0:
- logging.warning(f"No receptive field results for {dataset_name}!")
- return None
- best_index = 0
- min_error = 100000
- for i, res in enumerate(results):
- if res["receptor_pos_absolute"] == 0.0:
- continue
- err = res["position_uncertainty"]
- if err < min_error:
- best_index = i
- min_error = err
- best_results = results[best_index]
- best_results["dataset_id"] = dataset_name
- return best_results
- def run_receptive_field_analysis(list_folder, data_folder, num_cores=1):
- datasets = read_file_list(os.path.join(list_folder, "baseline_datasets.dat"))
- processed_list = Parallel(n_jobs=num_cores)(delayed(receptive_field_estimation)(dataset, data_folder) for dataset in datasets[:])
- results = [r for r in processed_list if r is not None]
- # results = receptive_field_estimation("2018-08-14-ac-invivo-1", data_folder)
- df = pd.DataFrame(results)
- return df
|