gdm_data_classes.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. import logging
  2. from dataclasses import dataclass, field
  3. import pandas as pd
  4. from typing import Dict, Mapping, Sequence
  5. from neo import AnalogSignal
  6. import quantities as pq
  7. import numpy as np
  8. @dataclass
  9. class GDMRow:
  10. """Class for a gdm row, which a pixel trace with associated metadata"""
  11. metadata: pd.Series
  12. time_series: AnalogSignal
  13. def copy(self):
  14. return GDMRow(metadata=self.metadata.copy(), time_series=self.time_series.copy())
  15. @classmethod
  16. def from_data_and_metadata(
  17. cls, metadata_dict: Mapping, trace: Sequence=None, sampling_period_ms: float=None,
  18. starting_time_s: float = 0, units: str = "au"
  19. ):
  20. metadata = pd.Series(metadata_dict)
  21. if trace is not None:
  22. time_series = AnalogSignal(
  23. signal=trace, sampling_period=sampling_period_ms * pq.ms, t_start=starting_time_s * pq.s,
  24. units=units
  25. )
  26. else:
  27. time_series = None
  28. return cls(metadata=metadata, time_series=time_series)
  29. def get_ASCII_exportable_format(self):
  30. signal_metadata = self.metadata.copy()
  31. signal_metadata["TraceOffset"] = self.time_series.t_start.magnitude
  32. signal_metadata["Cycle"] = (self.time_series.sampling_period / pq.ms).simplified.magnitude
  33. signal_metadata["NumFrames"] = self.time_series.shape[0]
  34. return signal_metadata, self.time_series.magnitude.T[0]
  35. @classmethod
  36. def parse_from_csv_df_row(cls, csv_df_row):
  37. try:
  38. sampling_period = csv_df_row["Cycle"]
  39. t_start = csv_df_row["TraceOffset"]
  40. n_samples = csv_df_row["NumFrames"]
  41. trace_start_pos = next(iter(i for i, x in enumerate(csv_df_row.index.values) if x == "PlaceHolder")) + 1
  42. trace = [float(x) for x in csv_df_row.iloc[trace_start_pos: trace_start_pos + n_samples].values]
  43. except (KeyError, StopIteration) as e:
  44. sampling_period = None
  45. t_start = 0
  46. trace_start_pos = len(csv_df_row)
  47. trace = None
  48. return cls.from_data_and_metadata(
  49. metadata_dict=csv_df_row.iloc[:trace_start_pos],
  50. sampling_period_ms=sampling_period,
  51. starting_time_s=t_start,
  52. trace=trace
  53. )
  54. @dataclass
  55. class GDMFile:
  56. """Class for a collection fo GDMRow objects, with CSV (and other) IO interfaces"""
  57. metadata_df: pd.DataFrame = field(default_factory=pd.DataFrame)
  58. data_dict: Dict = field(default_factory=dict)
  59. def copy(self):
  60. return GDMFile(metadata_df=self.metadata_df.copy(), data_dict={k: v.copy() for k, v in self.data_dict.items()})
  61. def __getitem__(self, item):
  62. return GDMRow(time_series=self.data_dict[item], metadata=self.metadata_df.loc[item])
  63. def indices_iterator(self):
  64. return iter(self.data_dict.keys())
  65. def get_trace(self, index):
  66. return self.data_dict[index]
  67. def subset_based_on_indices(self, indices):
  68. """
  69. Return a new GDMFile object with only those metadata and time series whose index is in indices
  70. :param Sequence indices: sequence of indices
  71. :return: GDMFile
  72. """
  73. assert all(x in self.data_dict for x in indices), "Not all indices specified are in the current GDMFile"
  74. gdm_file = __class__()
  75. gdm_file.metadata_df = self.metadata_df.loc[indices, :]
  76. gdm_file.data_dict = {k: v for k, v in self.data_dict.items() if k in indices}
  77. return gdm_file
  78. def subset_based_on_callable(self, metadata_filter):
  79. """
  80. Return a new GDMFile object with only those metadata and time series whose index is in indices
  81. :param Callable metadata_filter: a callable that can be used to select rows of metadata df
  82. :return: GDMFile
  83. """
  84. gdm_file = __class__()
  85. gdm_file.metadata_df = self.metadata_df.loc[metadata_filter, :]
  86. gdm_file.data_dict = {k: v for k, v in self.data_dict.items() if k in gdm_file.metadata_df.index.values}
  87. return gdm_file
  88. def append_from_a_gdm_file(self, gdm_file):
  89. if self.metadata_df.shape[0] == 0:
  90. self.metadata_df = gdm_file.metadata_df
  91. self.data_dict = gdm_file.data_dict
  92. else:
  93. current_max_ind = self.metadata_df.index.values.max()
  94. for enum_ind, (ind, metadata_row) in enumerate(gdm_file.metadata_df.iterrows()):
  95. new_ind = current_max_ind + enum_ind + 1
  96. self.metadata_df.loc[new_ind] = metadata_row
  97. self.data_dict[new_ind] = gdm_file.data_dict[ind]
  98. def append_gdm_row(self, gdm_row):
  99. new_index = self.metadata_df.shape[0]
  100. if new_index == 0:
  101. self.metadata_df = pd.DataFrame(gdm_row.metadata).T
  102. self.metadata_df.index = [0] # set index to 0 for the first row
  103. else:
  104. self.metadata_df.loc[new_index] = gdm_row.metadata
  105. self.data_dict[new_index] = gdm_row.time_series
  106. @classmethod
  107. def load_from_csv(cls, csv_file, metadata_only=False):
  108. """
  109. Load data and metadata from a csv file
  110. :param csv_file: absolute path of CSV file on file system
  111. :param bool metadata_only: whether to only read metadata, i.e., skip reading data
  112. :return: object of class GDMFile
  113. """
  114. gdm_file = cls()
  115. csv_df = read_chunks_gdm_csv(csv_file, metadata_only=metadata_only)
  116. for i, row in csv_df.iterrows():
  117. gdm_file.append_gdm_row(GDMRow.parse_from_csv_df_row(row))
  118. return gdm_file
  119. def write_to_csv(self, filename):
  120. metadata_df = pd.DataFrame()
  121. trace_df = pd.DataFrame()
  122. for gdm_ind in self.data_dict:
  123. gdm_row = self.__getitem__(gdm_ind)
  124. metadata_row, trace = gdm_row.get_ASCII_exportable_format()
  125. frame_values_s = pd.Series({f"Frame{k}": v for k, v in enumerate(trace)})
  126. metadata_df = metadata_df.append(pd.DataFrame(metadata_row).T, ignore_index=True)
  127. trace_df = trace_df.append(pd.DataFrame(frame_values_s).T, ignore_index=True)
  128. metadata_df["PlaceHolder"] = "Trace begins->"
  129. columns_before_trace = \
  130. ["StimONms", "StimLen", "Odour", "Stimulus", "OConc", "Cycle", "GloTag", "Measu", "Animal", "PlaceHolder"]
  131. metadata_df = metadata_df[
  132. [x for x in metadata_df.columns if x not in columns_before_trace] +
  133. [x for x in columns_before_trace if x in metadata_df.columns]
  134. ]
  135. df = pd.concat([metadata_df, trace_df], axis=1, sort=False)
  136. df.to_csv(filename, sep=';', header=True, index=False)
  137. logging.getLogger("VIEW").info(f"Finished writing {filename}")
  138. def get_data_as_numpy2D(self):
  139. """
  140. If all data have the same length, return them as a 2D numpy array containing one time series per row
  141. :rtype: numpy.ndarray
  142. """
  143. if len(self.metadata_df["NumFrames"].unique()) == 1:
  144. return np.array([x.magnitude for x in self.data_dict.values()])[:, :, 0]
  145. else:
  146. raise ValueError("GDMFile has data of different lengths. Cannot create a numpy array")
  147. def read_chunks_gdm_csv(input_csv, metadata_only=False):
  148. """
  149. Read a csv containing gdm and FID chunks, parsing date and time columns properly
  150. :param str input_csv: path to the input csv
  151. :param bool metadata_only: whether to only read metadata, i.e., skip reading data
  152. :return: pandas.DataFrame
  153. """
  154. print(f"Reading {input_csv}")
  155. if metadata_only:
  156. gdm_df = pd.read_csv(input_csv, sep=";", nrows=1, header=0)
  157. columns2read = []
  158. for x in gdm_df.columns:
  159. if x == "PlaceHolder":
  160. break
  161. else:
  162. columns2read.append(x)
  163. gdm_df = pd.read_csv(input_csv, sep=";", usecols=columns2read)
  164. else:
  165. gdm_df = pd.read_csv(input_csv, sep=";")
  166. def revise_line(line):
  167. if "_" in line:
  168. return line.split("_")[0]
  169. else:
  170. return line
  171. if "line" in gdm_df.columns:
  172. gdm_df["line"] = gdm_df["line"].apply(revise_line)
  173. return gdm_df
  174. def parse_stim_info(gdm_row_metadata, sort=True):
  175. stimulus_components = eval(gdm_row_metadata["Odour"])
  176. if type(gdm_row_metadata["StimONms"]) is str:
  177. stimulus_times = eval(gdm_row_metadata["StimONms"])
  178. else:
  179. stimulus_times = gdm_row_metadata["StimONms"]
  180. if type(gdm_row_metadata["StimLen"]) is str:
  181. stimulus_durations = eval(gdm_row_metadata["StimLen"])
  182. else:
  183. stimulus_durations = gdm_row_metadata["StimLen"]
  184. if type(stimulus_components) is str:
  185. stimulus_times = stimulus_times,
  186. stimulus_components = stimulus_components,
  187. stimulus_durations = stimulus_durations,
  188. if sort:
  189. arg_sort = np.argsort(stimulus_times)
  190. stimulus_times = [stimulus_times[x] for x in arg_sort]
  191. stimulus_components = [stimulus_components[x] for x in arg_sort]
  192. stimulus_durations = [stimulus_durations[x] for x in arg_sort]
  193. return stimulus_components, stimulus_times, stimulus_durations