__init__.py 10 KB


  1. """
  2. References:
  3. 1. Kato, S. et al., 2014.
  4. “Temporal Responses of C.elegans Chemosensory Neurons Are Preserved in Behavioral Dynamics.” Neuron, 81(3),
  5. pp.616–628. Available at: http://dx.doi.org/10.1016/j.neuron.2013.11.020.
  6. """
  7. import shutil
  8. import tempfile
  9. from typing import Sequence
  10. import numpy as np
  11. import math
  12. import pandas as pd
  13. from scipy.interpolate import interp1d
  14. import pathlib as pl
  15. from view.python_core.ctvs.cascade_model.gekko_related import fit_compare_models, GekkoSolver, ModelOneComp, \
  16. ModelTwoComp, ModelTwoCompNoDelay
  17. def estimate_delay_rising_phase_only(stimulus_trace: Sequence[float], output_trace: Sequence[float]):
  18. """
  19. Estimate delay between the signals <input> and <output> using a cross correlation and only those points with
  20. positive derivative
  21. :param Sequence stimulus_trace: sequence of floats
  22. :param Sequence output_trace: sequence of floats
  23. :return: delay (number of samples)
  24. :rtype: int
  25. """
  26. stimulus_trace_non_rising_mask = (np.diff(stimulus_trace) <= 0).tolist() + [False]
  27. stim_min, stim_max = min(stimulus_trace), max(stimulus_trace)
  28. stimulus_trace_copy = np.array(stimulus_trace, copy=True)
  29. stimulus_trace_copy[stimulus_trace_non_rising_mask] = np.median(stimulus_trace)
  30. stimulus_trace_copy[stimulus_trace_copy >= stim_min + 0.85 * (stim_max - stim_min)] = np.median(stimulus_trace)
  31. output_trace_non_rising_mask = (np.diff(output_trace) <= 0).tolist() + [False]
  32. output_min, output_max = min(output_trace), max(output_trace)
  33. output_trace_copy = np.array(output_trace, copy=True)
  34. output_trace_copy[output_trace_non_rising_mask] = np.median(output_trace)
  35. output_trace_copy[output_trace_copy >= output_min + 0.85 * (output_max - output_min)] = np.median(output_trace)
  36. corr = np.correlate(stimulus_trace_copy, output_trace_copy, "full")
  37. delay_estimated = np.argmax(corr) - stimulus_trace_copy.shape[0] + 1
  38. # # code for debugging starts here
  39. # from matplotlib import pyplot as plt
  40. # plt.ion()
  41. # fig, axs = plt.subplots(nrows=3, figsize=(10, 8), squeeze=True)
  42. # axs[0].plot(stimulus_trace, 'b-')
  43. # axs[0].plot(stimulus_trace_copy, "r-o")
  44. # axs[0].set_ylabel("Stimulus")
  45. #
  46. # axs[1].plot(output_trace, 'b-')
  47. # axs[1].plot(output_trace_copy, "r-o")
  48. # axs[1].set_ylabel("Response")
  49. #
  50. # axs[2].plot(corr, "b-")
  51. # axs[2].axvline(np.argmax(corr), color='r')
  52. # axs[2].set_ylabel("Correlation")
  53. # axs[2].set_title(f"Estimated delay in samples: {delay_estimated}")
  54. #
  55. # plt.draw()
  56. # input("Press any key to continue..")
  57. # plt.close()
  58. # # code for debugging ends here
  59. return delay_estimated
  60. def predict_into_future(fit_params, factor):
  61. output = factor * fit_params["output"]
  62. trace = np.array(output)
  63. n_pts = trace.shape[0]
  64. time_trace = fit_params["time_trace_fitted"]
  65. sampling_period = time_trace[1] - time_trace[0]
  66. stimulus_trace = np.array(fit_params["stimulus_trace_fitted"])
  67. stimulus_peak_pos = np.argmax(stimulus_trace)
  68. central_part_start = max(0, int(stimulus_peak_pos - 0.25 * n_pts))
  69. central_part_end = min(n_pts, int(stimulus_peak_pos + 0.25 * n_pts))
  70. central_part = trace[central_part_start: central_part_end + 1]
  71. max_pos = central_part.argmax() + central_part_start
  72. prediction_start = time_trace[-2]
  73. longest_time_constant = fit_params["kF"] + fit_params['kA']
  74. if "kS" in fit_params:
  75. longest_time_constant = max(fit_params["kS"], longest_time_constant)
  76. prediction_end = time_trace[max_pos] + 6 * longest_time_constant # units is seconds
  77. predicted_trace = None
  78. if prediction_end > prediction_start:
  79. prediction_time_trace = np.arange(prediction_start, prediction_end + sampling_period, sampling_period)
  80. gs = GekkoSolver.init_from_model(fit_params["model"])
  81. predicted_traces_dict = gs.solve(
  82. time_vec=prediction_time_trace,
  83. input_vec=np.zeros_like(prediction_time_trace),
  84. parameter_values={k: fit_params[k] for k in gs.parameters},
  85. state_variable_init_dict={k: fit_params[k][-2:] for k in gs.state_variables},
  86. output_init=output[-2:]
  87. )
  88. predicted_trace = predicted_traces_dict["output"]
  89. predicted_trace_no_overlap = None
  90. predicted_time_trace_no_overlap = None
  91. if predicted_trace is not None:
  92. if predicted_trace[-1] <= predicted_trace[-2]:
  93. predicted_trace_no_overlap = np.array(predicted_trace)[1:] * factor
  94. predicted_time_trace_no_overlap = np.array(prediction_time_trace)[1:]
  95. return predicted_trace_no_overlap, predicted_time_trace_no_overlap
  96. def fit_cascade_model(
  97. stimulus_trace, output_trace, time_trace, delays_to_test=np.arange(-15, 9, 1).astype(int)
  98. ):
  99. stimulus_trace = np.array(stimulus_trace)
  100. output_trace = np.array(output_trace)
  101. time_trace = np.array(time_trace)
  102. assert stimulus_trace.shape == output_trace.shape, "Stimulus and output traces have different shapes"
  103. assert time_trace.shape == output_trace.shape, "time trace and output trace have different shapes"
  104. sampling_period = time_trace[1] - time_trace[0]
  105. pcc = PrelimChunkClassifier(output_trace)
  106. factor = 1 if pcc.is_chunk_response_positive() else -1
  107. bics = []
  108. fit_params_all = []
  109. model_one_comp = ModelOneComp()
  110. for delay in delays_to_test:
  111. if delay > 0:
  112. output_trace_to_fit = output_trace[:-delay]
  113. stimulus_trace_to_fit = stimulus_trace[delay:]
  114. time_trace_to_fit = time_trace[delay:]
  115. else:
  116. output_trace_to_fit = output_trace[-delay:]
  117. stimulus_trace_to_fit = stimulus_trace[:stimulus_trace.shape[0] + delay]
  118. time_trace_to_fit = time_trace[:time_trace.shape[0] + delay]
  119. fit_params = fit_compare_models(
  120. time_vec=time_trace_to_fit,
  121. input_vec=stimulus_trace_to_fit, output_vec=factor * output_trace_to_fit,
  122. model2consider=model_one_comp
  123. )
  124. if fit_params is not None:
  125. fit_params["output"] = factor * fit_params["output"]
  126. fit_params["output_trace_expected"] = factor * fit_params["output_trace_expected"]
  127. fit_params_all.append(fit_params)
  128. bics.append(fit_params["bic"])
  129. if len(fit_params_all) == 0:
  130. return None
  131. else:
  132. best_ind = np.argmin(bics)
  133. fit_params_best_one_comp = fit_params_all[best_ind]
  134. delay_best = delays_to_test[best_ind]
  135. fit_params_best_one_comp["delay_input"] = delay_best * sampling_period
  136. cascade_model_output = dict(
  137. fit_params_one_comp=fit_params_best_one_comp
  138. )
  139. fit_params_all_second = []
  140. bics_second = []
  141. params_fixed = ["kA"]
  142. models_to_fit = []
  143. param_fixed_arg = {k: [fit_params_best_one_comp[k]] * 3 for k in params_fixed}
  144. param_fixed_arg["kF"] = [
  145. fit_params_best_one_comp["kF"] * 0.5,
  146. fit_params_best_one_comp["kF"],
  147. 1.5 * fit_params_best_one_comp["kF"]]
  148. param_inits = {
  149. k: fit_params_best_one_comp[k]
  150. for k in fit_params_best_one_comp["model_params"] if k not in param_fixed_arg}
  151. model_two_comp_no_delay = ModelTwoCompNoDelay(
  152. param_inits=param_inits,
  153. params_fixed=param_fixed_arg)
  154. models_to_fit.append(model_two_comp_no_delay)
  155. second_comp_delays = list(range(0, 20))
  156. for second_comp_delay in second_comp_delays[1:]:
  157. model_two_comp = ModelTwoComp(
  158. delay=second_comp_delay,
  159. param_inits=param_inits,
  160. params_fixed=param_fixed_arg
  161. )
  162. models_to_fit.append(model_two_comp)
  163. for model2fit, second_comp_delay in zip(models_to_fit, second_comp_delays):
  164. fit_params = fit_compare_models(
  165. time_vec=fit_params_best_one_comp["time_trace_fitted"],
  166. input_vec=fit_params_best_one_comp["stimulus_trace_fitted"],
  167. output_vec=factor * fit_params_best_one_comp["output_trace_expected"],
  168. model2consider=model2fit
  169. )
  170. if fit_params is not None:
  171. fit_params["output"] = factor * fit_params["output"]
  172. fit_params["output_trace_expected"] = factor * fit_params["output_trace_expected"]
  173. fit_params_all_second.append(fit_params)
  174. bics_second.append(fit_params["bic"])
  175. best_ind = None
  176. if len(fit_params_all):
  177. best_ind = np.argmin(bics_second)
  178. fit_params_best_two_comp = fit_params_all_second[best_ind]
  179. fit_params_best_two_comp["delay_second_comp"] = second_comp_delays[best_ind]
  180. cascade_model_output["fit_params_two_comp"] = fit_params_best_two_comp
  181. # https://imaging.mrc-cbu.cam.ac.uk/statswiki/FAQ/AICreg
  182. # difference of 50 chosen based fitting on a test set
  183. if fit_params_all_second[best_ind]["bic"] >= fit_params_best_one_comp["bic"] - 50:
  184. best_ind = None
  185. if best_ind is None:
  186. fit_params_best = fit_params_best_one_comp
  187. else:
  188. fit_params_best = fit_params_all_second[best_ind]
  189. predicted_trace, predicted_time_trace = predict_into_future(fit_params_best, factor)
  190. cascade_model_output.update(dict(
  191. sign_factor=factor, fit_params=fit_params_best,
  192. predicted_time_trace=predicted_time_trace, predicted_trace=predicted_trace
  193. ))
  194. tempdir = pl.Path(tempfile.gettempdir())
  195. for child in tempdir.iterdir():
  196. if child.is_dir() and child.name.startswith("tmp"):
  197. shutil.rmtree(child)
  198. return cascade_model_output
  199. class PrelimChunkClassifier(object):
  200. def __init__(self, chunk: Sequence[float]):
  201. self.chunk = np.asarray(chunk)
  202. self.chunk_abs = np.abs(chunk)
  203. self.chunk_abs_argmax = np.argmax(self.chunk_abs)
  204. def is_chunk_contaminated(self):
  205. # is the maximum of absolute values of chunk in its middle half?
  206. lower_limit = 0.25 * self.chunk.shape[0]
  207. upper_limit = 0.75 * self.chunk.shape[0]
  208. return not (lower_limit <= self.chunk_abs_argmax <= upper_limit)
  209. def is_chunk_response_positive(self):
  210. # is the maximum value of the chunk positive?
  211. return self.chunk[self.chunk_abs_argmax] > 0