__init__.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import logging
  2. import pandas as pd
  3. from .stimuli_parser import StimuliParamsParser
  4. class BaseStimuliiHandler(object):
  5. def __init__(self):
  6. self.stimulus_frame = pd.DataFrame(columns=("Odor", "Concentration",
  7. "Pulse Start Time", "Pulse End Time",
  8. "Sampling Period"))
  9. self.stimulus_offset_td = pd.Timedelta(0)
  10. def initialize_stimulus_offset(self, mv_correctStimulusOnset, data_sampling_period):
  11. """
  12. Corrects all stimuli periods stored internally by adding a time interval specified by the flag
  13. "mv_correctStimulusOnset" to them.
  14. :param mv_correctStimulusOnset: float, flag mv_correct stimulus. Interpreted as frame number if it is less than
  15. 1000, else as a time interval in milliseconds
  16. :param data_sampling_period: pd.Timedelta, the inter-frame-interval
  17. :return: None
  18. """
  19. # interprets mv_correctStimulusOnset in ms and converts to a pd.Timedelta object representing an interval
  20. # changed Sept. 19: stimulus correction is always in ms!
  21. if mv_correctStimulusOnset == 0:
  22. self.stimulus_offset_td = pd.Timedelta(0)
  23. else:
  24. self.stimulus_offset_td = pd.Timedelta(f"{mv_correctStimulusOnset}ms")
  25. if self.stimulus_frame.shape[0]:
  26. self.stimulus_frame["Pulse Start Time"] += self.stimulus_offset_td
  27. self.stimulus_frame["Pulse End Time"] += self.stimulus_offset_td
  28. else:
  29. logging.getLogger("VIEW").warning("No stimulus found in object. Stimulus offset not initialized!")
  30. def add_odor_pulse(self, odor, concentration,
  31. on_frame: int = None, data_sampling_period: float = None,
  32. on_ms: float = None,
  33. off_frame: int = None, duration_ms: float = None):
  34. """
  35. Add an odor pulse stimlus, applying correction based on mv_correctOnsetStimulus.
  36. One of the following needs to specified to define stimulus pulse onset
  37. 1. on_frame and data_sampling_period
  38. 2. on_ms
  39. One of the following need to be specified to define stimulus pulse offset
  40. 1. off_frame and data_sampling_period
  41. 2. duration
  42. :param odor: string, odor applied
  43. :param concentration: float, logarithm to base 10 of the concentration of the odor applied
  44. :param on_frame: int, frame number of stimulus pulse onset
  45. :param float data_sampling_period: data sampling period in ms, i.e., 600 for 100 frames per minute
  46. :param on_ms: float, time of stimlus onset in milliseconds
  47. :param off_frame: int, frame number of stimulus pulse offset
  48. :param duration_ms: float, stimulus duration in milliseconds
  49. :return:
  50. """
  51. data_sampling_period_td = pd.Timedelta(f"{data_sampling_period}ms")
  52. if not pd.isnull(on_frame) and not pd.isnull(data_sampling_period) and pd.isnull(on_ms):
  53. # stimulus start, based on on_frame
  54. assert on_frame >= 0, f"on_frame must be >= 0. {on_frame} specified"
  55. on_time_from_frame = on_frame * data_sampling_period_td
  56. elif pd.isnull(on_frame) and not pd.isnull(on_ms):
  57. # stimulus start, based on on_ms
  58. assert on_ms >= 0, f"on_ms must be >= 0. {on_ms} specified"
  59. on_time_from_frame = pd.Timedelta(f"{on_ms}ms")
  60. elif not pd.isnull(on_frame) and not pd.isnull(on_ms) and not pd.isnull(data_sampling_period):
  61. # stimulus start, both on_ms and on_frame are given
  62. on_time_from_frame = on_frame * data_sampling_period_td
  63. on_time_from_time = pd.Timedelta(f"{on_ms}ms")
  64. assert on_time_from_time == on_time_from_frame, f"stimulus on_frame and on_time are contradictory: " \
  65. f"check stimulus time in .lst file"
  66. else:
  67. return 1
  68. if not pd.isnull(off_frame):
  69. off_time_from_frame = (off_frame + 1) * data_sampling_period_td
  70. if not pd.isnull(duration_ms):
  71. logging.getLogger("VIEW").warning(
  72. 'During stimulus parsing: stimulus length taken from Stim_off in frames, '
  73. 'stim_duration has been ignored! Check info in .lst file')
  74. elif not pd.isnull(duration_ms):
  75. off_time_from_frame = on_time_from_frame + pd.Timedelta(f"{duration_ms}ms")
  76. else:
  77. return 1
  78. temp_df = pd.DataFrame([[odor,
  79. concentration,
  80. on_time_from_frame,
  81. off_time_from_frame,
  82. data_sampling_period_td]],
  83. columns=self.stimulus_frame.columns)
  84. self.stimulus_frame = pd.concat([self.stimulus_frame, temp_df], ignore_index=True)
  85. return 0
  86. def get_number_of_stimuli(self):
  87. """
  88. returns the number of stimuli
  89. :return: int
  90. """
  91. return self.stimulus_frame.shape[0]
  92. def get_all_odors(self):
  93. """
  94. Returns an iterable of strings, containing all Odor stimuli applied
  95. :return: iterable
  96. """
  97. return self.stimulus_frame["Odour"].unique()
  98. def get_odor_info_at_times(self, times):
  99. """
  100. :param times: iterable of pandas.TimeDelta
  101. :returns odors, concs
  102. odors: list of str, containing odor information
  103. concs: list of str, containing concentration information
  104. """
  105. assert all(type(x) == pd.Timedelta for x in times), "times must be of type pandas.TimeDelta"
  106. odors = []
  107. concs = []
  108. for time in times:
  109. current_odors = []
  110. current_concs = []
  111. for row_index, row in self.stimulus_frame.iterrows():
  112. if (row["Pulse Start Time"] <= time) & (row["Pulse End Time"] >= time):
  113. current_odors.append(row["Odor"])
  114. current_concs.append(row["Concentration"])
  115. odors.append(current_odors)
  116. concs.append(current_concs)
  117. return odors, concs
  118. def get_pulse_start_times(self):
  119. """
  120. Returns the start times of all stimulus pulses
  121. :return: an Sequence, of pandas.Timedelta objects
  122. """
  123. return self.stimulus_frame["Pulse Start Time"].values
  124. def get_pulse_end_times(self):
  125. """
  126. Returns the end times of all stimulus pulses
  127. :return: an Sequence, of pandas.Timedelta objects
  128. """
  129. return self.stimulus_frame["Pulse End Time"].values
  130. def get_pulse_durations(self):
  131. """
  132. Returns the duration of all stimulus pulses
  133. :return: a Sequence of pandas.TimeDelta objects
  134. """
  135. return self.get_pulse_end_times() - self.get_pulse_start_times()
  136. def get_pulse_start_frames(self, allow_fractional_frames=False):
  137. """
  138. Return the array of pulse start times in frames.
  139. :param allow_fractional_frames: bool. If False, fractional frame numbers are replaced by None
  140. :return: an iterable, of floats or None
  141. """
  142. frame_numbers = [(x / y) for x, y in zip(self.stimulus_frame["Pulse Start Time"],
  143. self.stimulus_frame["Sampling Period"])]
  144. if not allow_fractional_frames:
  145. frame_numbers = [int(x) if int(x) == x else None for x in frame_numbers]
  146. return frame_numbers
  147. def get_pulse_end_frames(self, allow_fractional_frames=False):
  148. """
  149. Return the array of pulse end times in frames. Non-integral frame numbers are replaced by None
  150. :param allow_fractional_frames: bool. If False, fractional frame numbers are replaced by None
  151. :return: an iterable, of floats or None
  152. """
  153. frame_numbers = [(x / y) for x, y in zip(self.stimulus_frame["Pulse End Time"],
  154. self.stimulus_frame["Sampling Period"])]
  155. if not allow_fractional_frames:
  156. frame_numbers = [int(x) if int(x) == x else None for x in frame_numbers]
  157. return frame_numbers
  158. def get_pulse_start_end_frames(self, allow_fractional_frames=False):
  159. """
  160. Returns a list of tuples of start and end frames of stimulus pulses
  161. :param allow_fractional_frames: If False, fractional frame numbers are replaced by None
  162. :rtype: list
  163. """
  164. starts = self.get_pulse_start_frames(allow_fractional_frames)
  165. ends = self.get_pulse_end_frames(allow_fractional_frames)
  166. return list(zip(starts, ends))
  167. def get_first_stimulus_onset_frame(self):
  168. onset_frames = self.get_pulse_start_frames(allow_fractional_frames=True)
  169. if len(onset_frames):
  170. return int(onset_frames[0]) # round it if fractional
  171. else:
  172. return None
  173. def get_stimon_background_range(self, LE_StartBackground, LE_PrestimEndBackground, default_background):
  174. """
  175. Decides the start and end frames of background to use based on the onset of first stimulus,
  176. <LE_PrestimEndBackground>, <LE_StartBackground>
  177. :param int LE_StartBackground: see documentation of corresponding flag
  178. :param int LE_PrestimEndBackground: see documentation of corresponding flag
  179. :param tuple default_background: tuple of two ints, specifying the default background range, if
  180. the interpreted range is invalid
  181. :return: background_range, onset_frame_first_stimulus
  182. background_range: tuple of two ints, specifying background range to use in frame numbers
  183. onset_frame_first_stimulus: int, frame number of the onset of first stimulus.
  184. If the interpreted range is invalid, -1 will be returned
  185. """
  186. onset_frame_first_stimulus = self.get_first_stimulus_onset_frame()
  187. if onset_frame_first_stimulus is not None:
  188. end_background = onset_frame_first_stimulus - LE_PrestimEndBackground
  189. if end_background <= LE_StartBackground:
  190. logging.getLogger("VIEW").warning(
  191. f"Encountered end_background <= start_background, which is invalid. "
  192. f"Defaulting to the background range {default_background}")
  193. return default_background, -1
  194. else:
  195. logging.getLogger("VIEW").warning(
  196. f"No stimuli information specified in measurement list file. "
  197. f"Defaulting to the background range {default_background}")
  198. return default_background, -1
  199. return (LE_StartBackground, end_background), onset_frame_first_stimulus
  200. class PulsedStimuliiHandler(BaseStimuliiHandler):
  201. def __init__(self):
  202. super().__init__()
  203. @classmethod
  204. def create_from_row(cls, row):
  205. stimulus_params = StimuliParamsParser().parse_row(row)
  206. handler = cls()
  207. for stim_ind, stimulus in stimulus_params.iter_stimuli():
  208. handler.add_odor_pulse(data_sampling_period=row["Cycle"],
  209. on_frame=stimulus["StimON"],
  210. off_frame=stimulus["StimOFF"],
  211. on_ms=stimulus["StimONms"],
  212. duration_ms=stimulus["StimLen"],
  213. odor=stimulus["Odour"],
  214. concentration=stimulus["OConc"])
  215. return handler